mz_storage_client/
storage_collections.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An abstraction for dealing with storage collections.
11
12use std::cmp::Reverse;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::num::NonZeroI64;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use async_trait::async_trait;
20use differential_dataflow::lattice::Lattice;
21use futures::future::BoxFuture;
22use futures::stream::{BoxStream, FuturesUnordered};
23use futures::{Future, FutureExt, StreamExt};
24use itertools::Itertools;
25
26use mz_ore::collections::CollectionExt;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::now::{EpochMillis, NowFn};
29use mz_ore::task::AbortOnDropHandle;
30use mz_ore::{assert_none, instrument, soft_assert_or_log};
31use mz_persist_client::cache::PersistClientCache;
32use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
33use mz_persist_client::critical::SinceHandle;
34use mz_persist_client::read::{Cursor, ReadHandle};
35use mz_persist_client::schema::CaESchema;
36use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
37use mz_persist_client::write::WriteHandle;
38use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
39use mz_persist_types::Codec64;
40use mz_persist_types::codec_impls::UnitSchema;
41use mz_persist_types::txn::TxnsCodec;
42use mz_repr::{GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
43use mz_storage_types::StorageDiff;
44use mz_storage_types::configuration::StorageConfiguration;
45use mz_storage_types::connections::ConnectionContext;
46use mz_storage_types::connections::inline::InlinedConnection;
47use mz_storage_types::controller::{CollectionMetadata, StorageError, TxnsCodecRow};
48use mz_storage_types::dyncfgs::STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION;
49use mz_storage_types::parameters::StorageParameters;
50use mz_storage_types::read_holds::{ReadHold, ReadHoldError};
51use mz_storage_types::read_policy::ReadPolicy;
52use mz_storage_types::sources::{
53    GenericSourceConnection, SourceData, SourceDesc, SourceEnvelope, SourceExport,
54    SourceExportDataConfig, Timeline,
55};
56use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
57use mz_txn_wal::metrics::Metrics as TxnMetrics;
58use mz_txn_wal::txn_read::{DataSnapshot, TxnsRead};
59use mz_txn_wal::txns::TxnsHandle;
60use timely::PartialOrder;
61use timely::order::TotalOrder;
62use timely::progress::frontier::MutableAntichain;
63use timely::progress::{Antichain, ChangeBatch, Timestamp as TimelyTimestamp};
64use tokio::sync::{mpsc, oneshot};
65use tokio::time::MissedTickBehavior;
66use tracing::{debug, info, trace, warn};
67
68use crate::client::TimestamplessUpdateBuilder;
69use crate::controller::{
70    CollectionDescription, DataSource, PersistEpoch, StorageMetadata, StorageTxn,
71};
72use crate::storage_collections::metrics::{ShardIdSet, StorageCollectionsMetrics};
73
74mod metrics;
75
76/// An abstraction for keeping track of storage collections and managing access
77/// to them.
78///
79/// Responsibilities:
80///
81/// - Keeps a critical persist handle for holding the since of collections
82///   where it need to be.
83///
84/// - Drives the since forward based on the upper of a collection and a
85///   [ReadPolicy].
86///
87/// - Hands out [ReadHolds](ReadHold) that prevent a collection's since from
88/// advancing while it needs to be read at a specific time.
89#[async_trait]
90pub trait StorageCollections: Debug {
91    type Timestamp: TimelyTimestamp;
92
93    /// On boot, reconcile this [StorageCollections] with outside state. We get
94    /// a [StorageTxn] where we can record any durable state that we need.
95    ///
96    /// We get `init_ids`, which tells us about all collections that currently
97    /// exist, so that we can record durable state for those that _we_ don't
98    /// know yet about.
99    async fn initialize_state(
100        &self,
101        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
102        init_ids: BTreeSet<GlobalId>,
103    ) -> Result<(), StorageError<Self::Timestamp>>;
104
105    /// Update storage configuration with new parameters.
106    fn update_parameters(&self, config_params: StorageParameters);
107
108    /// Returns the [CollectionMetadata] of the collection identified by `id`.
109    fn collection_metadata(
110        &self,
111        id: GlobalId,
112    ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>>;
113
114    /// Acquire an iterator over [CollectionMetadata] for all active
115    /// collections.
116    ///
117    /// A collection is "active" when it has a non empty frontier of read
118    /// capabilties.
119    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
120
121    /// Returns the frontiers of the identified collection.
122    fn collection_frontiers(
123        &self,
124        id: GlobalId,
125    ) -> Result<CollectionFrontiers<Self::Timestamp>, StorageError<Self::Timestamp>> {
126        let frontiers = self
127            .collections_frontiers(vec![id])?
128            .expect_element(|| "known to exist");
129
130        Ok(frontiers)
131    }
132
133    /// Atomically gets and returns the frontiers of all the identified
134    /// collections.
135    fn collections_frontiers(
136        &self,
137        id: Vec<GlobalId>,
138    ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>>;
139
140    /// Atomically gets and returns the frontiers of all active collections.
141    ///
142    /// A collection is "active" when it has a non empty frontier of read
143    /// capabilties.
144    fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>>;
145
146    /// Checks whether a collection exists under the given `GlobalId`. Returns
147    /// an error if the collection does not exist.
148    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
149
150    /// Returns aggregate statistics about the contents of the local input named
151    /// `id` at `as_of`.
152    async fn snapshot_stats(
153        &self,
154        id: GlobalId,
155        as_of: Antichain<Self::Timestamp>,
156    ) -> Result<SnapshotStats, StorageError<Self::Timestamp>>;
157
158    /// Returns aggregate statistics about the contents of the local input named
159    /// `id` at `as_of`.
160    ///
161    /// Note that this async function itself returns a future. We may
162    /// need to block on the stats being available, but don't want to hold a reference
163    /// to the controller for too long... so the outer future holds a reference to the
164    /// controller but returns quickly, and the inner future is slow but does not
165    /// reference the controller.
166    async fn snapshot_parts_stats(
167        &self,
168        id: GlobalId,
169        as_of: Antichain<Self::Timestamp>,
170    ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>;
171
172    /// Returns a snapshot of the contents of collection `id` at `as_of`.
173    fn snapshot(
174        &self,
175        id: GlobalId,
176        as_of: Self::Timestamp,
177    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>;
178
179    /// Returns a snapshot of the contents of collection `id` at the largest
180    /// readable `as_of`.
181    async fn snapshot_latest(
182        &self,
183        id: GlobalId,
184    ) -> Result<Vec<Row>, StorageError<Self::Timestamp>>;
185
186    /// Returns a snapshot of the contents of collection `id` at `as_of`.
187    fn snapshot_cursor(
188        &self,
189        id: GlobalId,
190        as_of: Self::Timestamp,
191    ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
192    where
193        Self::Timestamp: Codec64 + TimelyTimestamp + Lattice;
194
195    /// Generates a snapshot of the contents of collection `id` at `as_of` and
196    /// streams out all of the updates in bounded memory.
197    ///
198    /// The output is __not__ consolidated.
199    fn snapshot_and_stream(
200        &self,
201        id: GlobalId,
202        as_of: Self::Timestamp,
203    ) -> BoxFuture<
204        'static,
205        Result<
206            BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
207            StorageError<Self::Timestamp>,
208        >,
209    >;
210
211    /// Create a [`TimestamplessUpdateBuilder`] that can be used to stage
212    /// updates for the provided [`GlobalId`].
213    fn create_update_builder(
214        &self,
215        id: GlobalId,
216    ) -> BoxFuture<
217        'static,
218        Result<
219            TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
220            StorageError<Self::Timestamp>,
221        >,
222    >
223    where
224        Self::Timestamp: Lattice + Codec64;
225
226    /// Update the given [`StorageTxn`] with the appropriate metadata given the
227    /// IDs to add and drop.
228    ///
229    /// The data modified in the `StorageTxn` must be made available in all
230    /// subsequent calls that require [`StorageMetadata`] as a parameter.
231    async fn prepare_state(
232        &self,
233        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
234        ids_to_add: BTreeSet<GlobalId>,
235        ids_to_drop: BTreeSet<GlobalId>,
236        ids_to_register: BTreeMap<GlobalId, ShardId>,
237    ) -> Result<(), StorageError<Self::Timestamp>>;
238
239    /// Create the collections described by the individual
240    /// [CollectionDescriptions](CollectionDescription).
241    ///
242    /// Each command carries the source id, the source description, and any
243    /// associated metadata needed to ingest the particular source.
244    ///
245    /// This command installs collection state for the indicated sources, and
246    /// they are now valid to use in queries at times beyond the initial `since`
247    /// frontiers. Each collection also acquires a read capability at this
248    /// frontier, which will need to be repeatedly downgraded with
249    /// `allow_compaction()` to permit compaction.
250    ///
251    /// This method is NOT idempotent; It can fail between processing of
252    /// different collections and leave the [StorageCollections] in an
253    /// inconsistent state. It is almost always wrong to do anything but abort
254    /// the process on `Err`.
255    ///
256    /// The `register_ts` is used as the initial timestamp that tables are
257    /// available for reads. (We might later give non-tables the same treatment,
258    /// but hold off on that initially.) Callers must provide a Some if any of
259    /// the collections is a table. A None may be given if none of the
260    /// collections are a table (i.e. all materialized views, sources, etc).
261    ///
262    /// `migrated_storage_collections` is a set of migrated storage collections to be excluded
263    /// from the txn-wal sub-system.
264    async fn create_collections_for_bootstrap(
265        &self,
266        storage_metadata: &StorageMetadata,
267        register_ts: Option<Self::Timestamp>,
268        collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
269        migrated_storage_collections: &BTreeSet<GlobalId>,
270    ) -> Result<(), StorageError<Self::Timestamp>>;
271
272    /// Alters the identified ingestion to use the provided [`SourceDesc`].
273    ///
274    /// NOTE: Ideally, [StorageCollections] would not care about these, but we
275    /// have to learn about changes such that when new subsources are created we
276    /// can correctly determine a since based on its depenencies' sinces. This
277    /// is really only relevant because newly created subsources depend on the
278    /// remap shard, and we can't just have them start at since 0.
279    async fn alter_ingestion_source_desc(
280        &self,
281        ingestion_id: GlobalId,
282        source_desc: SourceDesc,
283    ) -> Result<(), StorageError<Self::Timestamp>>;
284
285    /// Alters the data config for the specified source exports of the specified ingestions.
286    async fn alter_ingestion_export_data_configs(
287        &self,
288        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
289    ) -> Result<(), StorageError<Self::Timestamp>>;
290
291    /// Alters each identified collection to use the correlated
292    /// [`GenericSourceConnection`].
293    ///
294    /// See NOTE on [StorageCollections::alter_ingestion_source_desc].
295    async fn alter_ingestion_connections(
296        &self,
297        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
298    ) -> Result<(), StorageError<Self::Timestamp>>;
299
300    /// Updates the [`RelationDesc`] for the specified table.
301    async fn alter_table_desc(
302        &self,
303        existing_collection: GlobalId,
304        new_collection: GlobalId,
305        new_desc: RelationDesc,
306        expected_version: RelationVersion,
307    ) -> Result<(), StorageError<Self::Timestamp>>;
308
309    /// Drops the read capability for the sources and allows their resources to
310    /// be reclaimed.
311    ///
312    /// TODO(jkosh44): This method does not validate the provided identifiers.
313    /// Currently when the controller starts/restarts it has no durable state.
314    /// That means that it has no way of remembering any past commands sent. In
315    /// the future we plan on persisting state for the controller so that it is
316    /// aware of past commands. Therefore this method is for dropping sources
317    /// that we know to have been previously created, but have been forgotten by
318    /// the controller due to a restart. Once command history becomes durable we
319    /// can remove this method and use the normal `drop_sources`.
320    fn drop_collections_unvalidated(
321        &self,
322        storage_metadata: &StorageMetadata,
323        identifiers: Vec<GlobalId>,
324    );
325
326    /// Assigns a read policy to specific identifiers.
327    ///
328    /// The policies are assigned in the order presented, and repeated
329    /// identifiers should conclude with the last policy. Changing a policy will
330    /// immediately downgrade the read capability if appropriate, but it will
331    /// not "recover" the read capability if the prior capability is already
332    /// ahead of it.
333    ///
334    /// This [StorageCollections] may include its own overrides on these
335    /// policies.
336    ///
337    /// Identifiers not present in `policies` retain their existing read
338    /// policies.
339    fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>);
340
341    /// Acquires and returns the earliest possible read holds for the specified
342    /// collections.
343    fn acquire_read_holds(
344        &self,
345        desired_holds: Vec<GlobalId>,
346    ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError>;
347
348    /// Get the time dependence for a storage collection. Returns no value if unknown or if
349    /// the object isn't managed by storage.
350    fn determine_time_dependence(
351        &self,
352        id: GlobalId,
353    ) -> Result<Option<TimeDependence>, TimeDependenceError>;
354}
355
356/// A cursor over a snapshot, allowing us to read just part of a snapshot in its
357/// consolidated form.
358pub struct SnapshotCursor<T: Codec64 + TimelyTimestamp + Lattice> {
359    // We allocate a temporary read handle for each snapshot, and that handle needs to live at
360    // least as long as the cursor itself, which holds part leases. Bundling them together!
361    pub _read_handle: ReadHandle<SourceData, (), T, StorageDiff>,
362    pub cursor: Cursor<SourceData, (), T, StorageDiff>,
363}
364
365impl<T: Codec64 + TimelyTimestamp + Lattice + Sync> SnapshotCursor<T> {
366    pub async fn next(
367        &mut self,
368    ) -> Option<
369        impl Iterator<
370            Item = (
371                (Result<SourceData, String>, Result<(), String>),
372                T,
373                StorageDiff,
374            ),
375        > + Sized
376        + '_,
377    > {
378        self.cursor.next().await
379    }
380}
381
382/// Frontiers of the collection identified by `id`.
383#[derive(Debug)]
384pub struct CollectionFrontiers<T> {
385    /// The [GlobalId] of the collection that these frontiers belong to.
386    pub id: GlobalId,
387
388    /// The upper/write frontier of the collection.
389    pub write_frontier: Antichain<T>,
390
391    /// The since frontier that is implied by the collection's existence,
392    /// disregarding any read holds.
393    ///
394    /// Concretely, it is the since frontier that is implied by the combination
395    /// of the `write_frontier` and a [ReadPolicy]. The implied capability is
396    /// derived from the write frontier using the [ReadPolicy].
397    pub implied_capability: Antichain<T>,
398
399    /// The frontier of all oustanding [ReadHolds](ReadHold). This includes the
400    /// implied capability.
401    pub read_capabilities: Antichain<T>,
402}
403
404/// Implementation of [StorageCollections] that is shallow-cloneable and uses a
405/// background task for doing work concurrently, in the background.
406#[derive(Debug, Clone)]
407pub struct StorageCollectionsImpl<
408    T: TimelyTimestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
409> {
410    /// The fencing token for this instance of [StorageCollections], and really
411    /// all of the controllers and Coordinator.
412    envd_epoch: NonZeroI64,
413
414    /// Whether or not this [StorageCollections] is in read-only mode.
415    ///
416    /// When in read-only mode, we are not allowed to affect changes to external
417    /// systems, including, for example, acquiring and downgrading critical
418    /// [SinceHandles](SinceHandle)
419    read_only: bool,
420
421    /// The set of [ShardIds](ShardId) that we have to finalize. These will have
422    /// been persisted by the caller of [StorageCollections::prepare_state].
423    finalizable_shards: Arc<ShardIdSet>,
424
425    /// The set of [ShardIds](ShardId) that we have finalized. We keep track of
426    /// shards here until we are given a chance to let our callers know that
427    /// these have been finalized, for example via
428    /// [StorageCollections::prepare_state].
429    finalized_shards: Arc<ShardIdSet>,
430
431    /// Collections maintained by this [StorageCollections].
432    collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
433
434    /// A shared TxnsCache running in a task and communicated with over a channel.
435    txns_read: TxnsRead<T>,
436
437    /// Storage configuration parameters.
438    config: Arc<Mutex<StorageConfiguration>>,
439
440    /// The upper of the txn shard as it was when we booted. We forward the
441    /// upper of created/registered tables to make sure that their uppers are at
442    /// least not less than the initially known txn upper.
443    ///
444    /// NOTE: This works around a quirk in how the adapter chooses the as_of of
445    /// existing indexes when bootstrapping, where tables that have an upper
446    /// that is less than the initially known txn upper can lead to indexes that
447    /// cannot hydrate in read-only mode.
448    initial_txn_upper: Antichain<T>,
449
450    /// The persist location where all storage collections are being written to
451    persist_location: PersistLocation,
452
453    /// A persist client used to write to storage collections
454    persist: Arc<PersistClientCache>,
455
456    /// For sending commands to our internal task.
457    cmd_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
458
459    /// For sending updates about read holds to our internal task.
460    holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<T>)>,
461
462    /// Handles to tasks we own, making sure they're dropped when we are.
463    _background_task: Arc<AbortOnDropHandle<()>>,
464    _finalize_shards_task: Arc<AbortOnDropHandle<()>>,
465}
466
467// Supporting methods for implementing [StorageCollections].
468//
469// Almost all internal methods that are the backing implementation for a trait
470// method have the `_inner` suffix.
471//
472// We follow a pattern where `_inner` methods get a mutable reference to the
473// shared collections state, and it's the public-facing method that locks the
474// state for the duration of its invocation. This allows calling other `_inner`
475// methods from within `_inner` methods.
476impl<T> StorageCollectionsImpl<T>
477where
478    T: TimelyTimestamp
479        + Lattice
480        + Codec64
481        + From<EpochMillis>
482        + TimestampManipulation
483        + Into<mz_repr::Timestamp>
484        + Sync,
485{
486    /// Creates and returns a new [StorageCollections].
487    ///
488    /// Note that when creating a new [StorageCollections], you must also
489    /// reconcile it with the previous state using
490    /// [StorageCollections::initialize_state],
491    /// [StorageCollections::prepare_state], and
492    /// [StorageCollections::create_collections_for_bootstrap].
493    pub async fn new(
494        persist_location: PersistLocation,
495        persist_clients: Arc<PersistClientCache>,
496        metrics_registry: &MetricsRegistry,
497        _now: NowFn,
498        txns_metrics: Arc<TxnMetrics>,
499        envd_epoch: NonZeroI64,
500        read_only: bool,
501        connection_context: ConnectionContext,
502        txn: &dyn StorageTxn<T>,
503    ) -> Self {
504        let metrics = StorageCollectionsMetrics::register_into(metrics_registry);
505
506        // This value must be already installed because we must ensure it's
507        // durably recorded before it is used, otherwise we risk leaking persist
508        // state.
509        let txns_id = txn
510            .get_txn_wal_shard()
511            .expect("must call prepare initialization before creating StorageCollections");
512
513        let txns_client = persist_clients
514            .open(persist_location.clone())
515            .await
516            .expect("location should be valid");
517
518        // We have to initialize, so that TxnsRead::start() below does not
519        // block.
520        let _txns_handle: TxnsHandle<SourceData, (), T, StorageDiff, PersistEpoch, TxnsCodecRow> =
521            TxnsHandle::open(
522                T::minimum(),
523                txns_client.clone(),
524                txns_client.dyncfgs().clone(),
525                Arc::clone(&txns_metrics),
526                txns_id,
527            )
528            .await;
529
530        // For handing to the background task, for listening to upper updates.
531        let (txns_key_schema, txns_val_schema) = TxnsCodecRow::schemas();
532        let mut txns_write = txns_client
533            .open_writer(
534                txns_id,
535                Arc::new(txns_key_schema),
536                Arc::new(txns_val_schema),
537                Diagnostics {
538                    shard_name: "txns".to_owned(),
539                    handle_purpose: "commit txns".to_owned(),
540                },
541            )
542            .await
543            .expect("txns schema shouldn't change");
544
545        let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
546
547        let collections = Arc::new(std::sync::Mutex::new(BTreeMap::default()));
548        let finalizable_shards =
549            Arc::new(ShardIdSet::new(metrics.finalization_outstanding.clone()));
550        let finalized_shards =
551            Arc::new(ShardIdSet::new(metrics.finalization_pending_commit.clone()));
552        let config = Arc::new(Mutex::new(StorageConfiguration::new(
553            connection_context,
554            mz_dyncfgs::all_dyncfgs(),
555        )));
556
557        let initial_txn_upper = txns_write.fetch_recent_upper().await.to_owned();
558
559        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
560        let (holds_tx, holds_rx) = mpsc::unbounded_channel();
561        let mut background_task = BackgroundTask {
562            config: Arc::clone(&config),
563            cmds_tx: cmd_tx.clone(),
564            cmds_rx: cmd_rx,
565            holds_rx,
566            collections: Arc::clone(&collections),
567            finalizable_shards: Arc::clone(&finalizable_shards),
568            shard_by_id: BTreeMap::new(),
569            since_handles: BTreeMap::new(),
570            txns_handle: Some(txns_write),
571            txns_shards: Default::default(),
572        };
573
574        let background_task =
575            mz_ore::task::spawn(|| "storage_collections::background_task", async move {
576                background_task.run().await
577            });
578
579        let finalize_shards_task = mz_ore::task::spawn(
580            || "storage_collections::finalize_shards_task",
581            finalize_shards_task::<T>(FinalizeShardsTaskConfig {
582                envd_epoch: envd_epoch.clone(),
583                config: Arc::clone(&config),
584                metrics,
585                finalizable_shards: Arc::clone(&finalizable_shards),
586                finalized_shards: Arc::clone(&finalized_shards),
587                persist_location: persist_location.clone(),
588                persist: Arc::clone(&persist_clients),
589                read_only,
590            }),
591        );
592
593        Self {
594            finalizable_shards,
595            finalized_shards,
596            collections,
597            txns_read,
598            envd_epoch,
599            read_only,
600            config,
601            initial_txn_upper,
602            persist_location,
603            persist: persist_clients,
604            cmd_tx,
605            holds_tx,
606            _background_task: Arc::new(background_task.abort_on_drop()),
607            _finalize_shards_task: Arc::new(finalize_shards_task.abort_on_drop()),
608        }
609    }
610
611    /// Opens a [WriteHandle] and a [SinceHandleWrapper], for holding back the since.
612    ///
613    /// `since` is an optional since that the read handle will be forwarded to
614    /// if it is less than its current since.
615    ///
616    /// This will `halt!` the process if we cannot successfully acquire a
617    /// critical handle with our current epoch.
618    async fn open_data_handles(
619        &self,
620        id: &GlobalId,
621        shard: ShardId,
622        since: Option<&Antichain<T>>,
623        relation_desc: RelationDesc,
624        persist_client: &PersistClient,
625    ) -> (
626        WriteHandle<SourceData, (), T, StorageDiff>,
627        SinceHandleWrapper<T>,
628    ) {
629        let since_handle = if self.read_only {
630            let read_handle = self
631                .open_leased_handle(id, shard, relation_desc.clone(), since, persist_client)
632                .await;
633            SinceHandleWrapper::Leased(read_handle)
634        } else {
635            let since_handle = self
636                .open_critical_handle(id, shard, since, persist_client)
637                .await;
638
639            SinceHandleWrapper::Critical(since_handle)
640        };
641
642        let mut write_handle = self
643            .open_write_handle(id, shard, relation_desc, persist_client)
644            .await;
645
646        // N.B.
647        // Fetch the most recent upper for the write handle. Otherwise, this may
648        // be behind the since of the since handle. Its vital this happens AFTER
649        // we create the since handle as it needs to be linearized with that
650        // operation. It may be true that creating the write handle after the
651        // since handle already ensures this, but we do this out of an abundance
652        // of caution.
653        //
654        // Note that this returns the upper, but also sets it on the handle to
655        // be fetched later.
656        write_handle.fetch_recent_upper().await;
657
658        (write_handle, since_handle)
659    }
660
661    /// Opens a write handle for the given `shard`.
662    async fn open_write_handle(
663        &self,
664        id: &GlobalId,
665        shard: ShardId,
666        relation_desc: RelationDesc,
667        persist_client: &PersistClient,
668    ) -> WriteHandle<SourceData, (), T, StorageDiff> {
669        let diagnostics = Diagnostics {
670            shard_name: id.to_string(),
671            handle_purpose: format!("controller data for {}", id),
672        };
673
674        let write = persist_client
675            .open_writer(
676                shard,
677                Arc::new(relation_desc),
678                Arc::new(UnitSchema),
679                diagnostics.clone(),
680            )
681            .await
682            .expect("invalid persist usage");
683
684        write
685    }
686
687    /// Opens a critical since handle for the given `shard`.
688    ///
689    /// `since` is an optional since that the read handle will be forwarded to
690    /// if it is less than its current since.
691    ///
692    /// This will `halt!` the process if we cannot successfully acquire a
693    /// critical handle with our current epoch.
694    async fn open_critical_handle(
695        &self,
696        id: &GlobalId,
697        shard: ShardId,
698        since: Option<&Antichain<T>>,
699        persist_client: &PersistClient,
700    ) -> SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch> {
701        tracing::debug!(%id, ?since, "opening critical handle");
702
703        assert!(
704            !self.read_only,
705            "attempting to open critical SinceHandle in read-only mode"
706        );
707
708        let diagnostics = Diagnostics {
709            shard_name: id.to_string(),
710            handle_purpose: format!("controller data for {}", id),
711        };
712
713        // Construct the handle in a separate block to ensure all error paths
714        // are diverging
715        let since_handle = {
716            // This block's aim is to ensure the handle is in terms of our epoch
717            // by the time we return it.
718            let mut handle: SinceHandle<_, _, _, _, PersistEpoch> = persist_client
719                .open_critical_since(
720                    shard,
721                    PersistClient::CONTROLLER_CRITICAL_SINCE,
722                    diagnostics.clone(),
723                )
724                .await
725                .expect("invalid persist usage");
726
727            // Take the join of the handle's since and the provided `since`;
728            // this lets materialized views express the since at which their
729            // read handles "start."
730            let provided_since = match since {
731                Some(since) => since,
732                None => &Antichain::from_elem(T::minimum()),
733            };
734            let since = handle.since().join(provided_since);
735
736            let our_epoch = self.envd_epoch;
737
738            loop {
739                let current_epoch: PersistEpoch = handle.opaque().clone();
740
741                // Ensure the current epoch is <= our epoch.
742                let unchecked_success = current_epoch.0.map(|e| e <= our_epoch).unwrap_or(true);
743
744                if unchecked_success {
745                    // Update the handle's state so that it is in terms of our
746                    // epoch.
747                    let checked_success = handle
748                        .compare_and_downgrade_since(
749                            &current_epoch,
750                            (&PersistEpoch::from(our_epoch), &since),
751                        )
752                        .await
753                        .is_ok();
754                    if checked_success {
755                        break handle;
756                    }
757                } else {
758                    mz_ore::halt!("fenced by envd @ {current_epoch:?}. ours = {our_epoch}");
759                }
760            }
761        };
762
763        since_handle
764    }
765
766    /// Opens a leased [ReadHandle], for the purpose of holding back a since,
767    /// for the given `shard`.
768    ///
769    /// `since` is an optional since that the read handle will be forwarded to
770    /// if it is less than its current since.
771    async fn open_leased_handle(
772        &self,
773        id: &GlobalId,
774        shard: ShardId,
775        relation_desc: RelationDesc,
776        since: Option<&Antichain<T>>,
777        persist_client: &PersistClient,
778    ) -> ReadHandle<SourceData, (), T, StorageDiff> {
779        tracing::debug!(%id, ?since, "opening leased handle");
780
781        let diagnostics = Diagnostics {
782            shard_name: id.to_string(),
783            handle_purpose: format!("controller data for {}", id),
784        };
785
786        let use_critical_since = false;
787        let mut handle: ReadHandle<_, _, _, _> = persist_client
788            .open_leased_reader(
789                shard,
790                Arc::new(relation_desc),
791                Arc::new(UnitSchema),
792                diagnostics.clone(),
793                use_critical_since,
794            )
795            .await
796            .expect("invalid persist usage");
797
798        // Take the join of the handle's since and the provided `since`;
799        // this lets materialized views express the since at which their
800        // read handles "start."
801        let provided_since = match since {
802            Some(since) => since,
803            None => &Antichain::from_elem(T::minimum()),
804        };
805        let since = handle.since().join(provided_since);
806
807        handle.downgrade_since(&since).await;
808
809        handle
810    }
811
812    fn register_handles(
813        &self,
814        id: GlobalId,
815        is_in_txns: bool,
816        since_handle: SinceHandleWrapper<T>,
817        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
818    ) {
819        self.send(BackgroundCmd::Register {
820            id,
821            is_in_txns,
822            since_handle,
823            write_handle,
824        });
825    }
826
827    fn send(&self, cmd: BackgroundCmd<T>) {
828        let _ = self.cmd_tx.send(cmd);
829    }
830
831    async fn snapshot_stats_inner(
832        &self,
833        id: GlobalId,
834        as_of: SnapshotStatsAsOf<T>,
835    ) -> Result<SnapshotStats, StorageError<T>> {
836        // TODO: Pull this out of BackgroundTask. Unlike the other methods, the
837        // caller of this one drives it to completion.
838        //
839        // We'd need to either share the critical handle somehow or maybe have
840        // two instances around, one in the worker and one in the
841        // StorageCollections.
842        let (tx, rx) = oneshot::channel();
843        self.send(BackgroundCmd::SnapshotStats(id, as_of, tx));
844        rx.await.expect("BackgroundTask should be live").0.await
845    }
846
847    /// If this identified collection has a dependency, install a read hold on
848    /// it.
849    ///
850    /// This is necessary to ensure that the dependency's since does not advance
851    /// beyond its dependents'.
852    fn install_collection_dependency_read_holds_inner(
853        &self,
854        self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
855        id: GlobalId,
856    ) -> Result<(), StorageError<T>> {
857        let (deps, collection_implied_capability) = match self_collections.get(&id) {
858            Some(CollectionState {
859                storage_dependencies: deps,
860                implied_capability,
861                ..
862            }) => (deps.clone(), implied_capability),
863            _ => return Ok(()),
864        };
865
866        for dep in deps.iter() {
867            let dep_collection = self_collections
868                .get(dep)
869                .ok_or(StorageError::IdentifierMissing(id))?;
870
871            mz_ore::soft_assert_or_log!(
872                PartialOrder::less_equal(
873                    &dep_collection.implied_capability,
874                    collection_implied_capability
875                ),
876                "dependency since ({dep}@{:?}) cannot be in advance of dependent's since ({id}@{:?})",
877                dep_collection.implied_capability,
878                collection_implied_capability,
879            );
880        }
881
882        self.install_read_capabilities_inner(
883            self_collections,
884            id,
885            &deps,
886            collection_implied_capability.clone(),
887        )?;
888
889        Ok(())
890    }
891
892    /// Determine if this collection has another dependency.
893    ///
894    /// Currently, collections have either 0 or 1 dependencies.
895    fn determine_collection_dependencies(
896        &self,
897        self_collections: &BTreeMap<GlobalId, CollectionState<T>>,
898        data_source: &DataSource<T>,
899    ) -> Result<Vec<GlobalId>, StorageError<T>> {
900        let dependencies = match &data_source {
901            DataSource::Introspection(_)
902            | DataSource::Webhook
903            | DataSource::Table { primary: None }
904            | DataSource::Progress
905            | DataSource::Other => Vec::new(),
906            DataSource::Table {
907                primary: Some(primary),
908            } => vec![*primary],
909            DataSource::IngestionExport {
910                ingestion_id,
911                data_config,
912                ..
913            } => {
914                // Ingestion exports depend on their primary source's remap
915                // collection, except when they use a CDCv2 envelope.
916                let source = self_collections
917                    .get(ingestion_id)
918                    .ok_or(StorageError::IdentifierMissing(*ingestion_id))?;
919                let DataSource::Ingestion(ingestion) = &source.description.data_source else {
920                    panic!("SourceExport must refer to a primary source that already exists");
921                };
922
923                match data_config.envelope {
924                    SourceEnvelope::CdcV2 => Vec::new(),
925                    _ => vec![ingestion.remap_collection_id],
926                }
927            }
928            // Ingestions depend on their remap collection.
929            DataSource::Ingestion(ingestion) => vec![ingestion.remap_collection_id],
930            DataSource::Sink { desc } => vec![desc.sink.from],
931        };
932
933        Ok(dependencies)
934    }
935
936    /// Install read capabilities on the given `storage_dependencies`.
937    #[instrument(level = "debug")]
938    fn install_read_capabilities_inner(
939        &self,
940        self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
941        from_id: GlobalId,
942        storage_dependencies: &[GlobalId],
943        read_capability: Antichain<T>,
944    ) -> Result<(), StorageError<T>> {
945        let mut changes = ChangeBatch::new();
946        for time in read_capability.iter() {
947            changes.update(time.clone(), 1);
948        }
949
950        if tracing::span_enabled!(tracing::Level::TRACE) {
951            // Collecting `user_capabilities` is potentially slow, thus only do it when needed.
952            let user_capabilities = self_collections
953                .iter_mut()
954                .filter(|(id, _c)| id.is_user())
955                .map(|(id, c)| {
956                    let updates = c.read_capabilities.updates().cloned().collect_vec();
957                    (*id, c.implied_capability.clone(), updates)
958                })
959                .collect_vec();
960
961            trace!(
962                %from_id,
963                ?storage_dependencies,
964                ?read_capability,
965                ?user_capabilities,
966                "install_read_capabilities_inner");
967        }
968
969        let mut storage_read_updates = storage_dependencies
970            .iter()
971            .map(|id| (*id, changes.clone()))
972            .collect();
973
974        StorageCollectionsImpl::update_read_capabilities_inner(
975            &self.cmd_tx,
976            self_collections,
977            &mut storage_read_updates,
978        );
979
980        if tracing::span_enabled!(tracing::Level::TRACE) {
981            // Collecting `user_capabilities` is potentially slow, thus only do it when needed.
982            let user_capabilities = self_collections
983                .iter_mut()
984                .filter(|(id, _c)| id.is_user())
985                .map(|(id, c)| {
986                    let updates = c.read_capabilities.updates().cloned().collect_vec();
987                    (*id, c.implied_capability.clone(), updates)
988                })
989                .collect_vec();
990
991            trace!(
992                %from_id,
993                ?storage_dependencies,
994                ?read_capability,
995                ?user_capabilities,
996                "after install_read_capabilities_inner!");
997        }
998
999        Ok(())
1000    }
1001
1002    async fn recent_upper(&self, id: GlobalId) -> Result<Antichain<T>, StorageError<T>> {
1003        let metadata = &self.collection_metadata(id)?;
1004        let persist_client = self
1005            .persist
1006            .open(metadata.persist_location.clone())
1007            .await
1008            .unwrap();
1009        // Duplicate part of open_data_handles here because we don't need the
1010        // fetch_recent_upper call. The pubsub-updated shared_upper is enough.
1011        let diagnostics = Diagnostics {
1012            shard_name: id.to_string(),
1013            handle_purpose: format!("controller data for {}", id),
1014        };
1015        // NB: Opening a WriteHandle is cheap if it's never used in a
1016        // compare_and_append operation.
1017        let write = persist_client
1018            .open_writer::<SourceData, (), T, StorageDiff>(
1019                metadata.data_shard,
1020                Arc::new(metadata.relation_desc.clone()),
1021                Arc::new(UnitSchema),
1022                diagnostics.clone(),
1023            )
1024            .await
1025            .expect("invalid persist usage");
1026        Ok(write.shared_upper())
1027    }
1028
1029    async fn read_handle_for_snapshot(
1030        persist: Arc<PersistClientCache>,
1031        metadata: &CollectionMetadata,
1032        id: GlobalId,
1033    ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
1034        let persist_client = persist
1035            .open(metadata.persist_location.clone())
1036            .await
1037            .unwrap();
1038
1039        // We create a new read handle every time someone requests a snapshot
1040        // and then immediately expire it instead of keeping a read handle
1041        // permanently in our state to avoid having it heartbeat continually.
1042        // The assumption is that calls to snapshot are rare and therefore worth
1043        // it to always create a new handle.
1044        let read_handle = persist_client
1045            .open_leased_reader::<SourceData, (), _, _>(
1046                metadata.data_shard,
1047                Arc::new(metadata.relation_desc.clone()),
1048                Arc::new(UnitSchema),
1049                Diagnostics {
1050                    shard_name: id.to_string(),
1051                    handle_purpose: format!("snapshot {}", id),
1052                },
1053                USE_CRITICAL_SINCE_SNAPSHOT.get(&persist.cfg),
1054            )
1055            .await
1056            .expect("invalid persist usage");
1057        Ok(read_handle)
1058    }
1059
1060    // TODO(petrosagg): This signature is not very useful in the context of partially ordered times
1061    // where the as_of frontier might have multiple elements. In the current form the mutually
1062    // incomparable updates will be accumulated together to a state of the collection that never
1063    // actually existed. We should include the original time in the updates advanced by the as_of
1064    // frontier in the result and let the caller decide what to do with the information.
1065    fn snapshot(
1066        &self,
1067        id: GlobalId,
1068        as_of: T,
1069        txns_read: &TxnsRead<T>,
1070    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<T>>>
1071    where
1072        T: Codec64 + From<EpochMillis> + TimestampManipulation,
1073    {
1074        let metadata = match self.collection_metadata(id) {
1075            Ok(metadata) => metadata.clone(),
1076            Err(e) => return async { Err(e) }.boxed(),
1077        };
1078        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1079            assert_eq!(txns_id, txns_read.txns_id());
1080            txns_read.clone()
1081        });
1082        let persist = Arc::clone(&self.persist);
1083        async move {
1084            let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1085            let contents = match txns_read {
1086                None => {
1087                    // We're not using txn-wal for tables, so we can take a snapshot directly.
1088                    read_handle
1089                        .snapshot_and_fetch(Antichain::from_elem(as_of))
1090                        .await
1091                }
1092                Some(txns_read) => {
1093                    // We _are_ using txn-wal for tables. It advances the physical upper of the
1094                    // shard lazily, so we need to ask it for the snapshot to ensure the read is
1095                    // unblocked.
1096                    //
1097                    // Consider the following scenario:
1098                    // - Table A is written to via txns at time 5
1099                    // - Tables other than A are written to via txns consuming timestamps up to 10
1100                    // - We'd like to read A at 7
1101                    // - The application process of A's txn has advanced the upper to 5+1, but we need
1102                    //   it to be past 7, but the txns shard knows that (5,10) is empty of writes to A
1103                    // - This branch allows it to handle that advancing the physical upper of Table A to
1104                    //   10 (NB but only once we see it get past the write at 5!)
1105                    // - Then we can read it normally.
1106                    txns_read.update_gt(as_of.clone()).await;
1107                    let data_snapshot = txns_read
1108                        .data_snapshot(metadata.data_shard, as_of.clone())
1109                        .await;
1110                    data_snapshot.snapshot_and_fetch(&mut read_handle).await
1111                }
1112            };
1113            match contents {
1114                Ok(contents) => {
1115                    let mut snapshot = Vec::with_capacity(contents.len());
1116                    for ((data, _), _, diff) in contents {
1117                        // TODO(petrosagg): We should accumulate the errors too and let the user
1118                        // interprret the result
1119                        let row = data.expect("invalid protobuf data").0?;
1120                        snapshot.push((row, diff));
1121                    }
1122                    Ok(snapshot)
1123                }
1124                Err(_) => Err(StorageError::ReadBeforeSince(id)),
1125            }
1126        }
1127        .boxed()
1128    }
1129
1130    fn snapshot_and_stream(
1131        &self,
1132        id: GlobalId,
1133        as_of: T,
1134        txns_read: &TxnsRead<T>,
1135    ) -> BoxFuture<'static, Result<BoxStream<'static, (SourceData, T, StorageDiff)>, StorageError<T>>>
1136    {
1137        use futures::stream::StreamExt;
1138
1139        let metadata = match self.collection_metadata(id) {
1140            Ok(metadata) => metadata.clone(),
1141            Err(e) => return async { Err(e) }.boxed(),
1142        };
1143        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1144            assert_eq!(txns_id, txns_read.txns_id());
1145            txns_read.clone()
1146        });
1147        let persist = Arc::clone(&self.persist);
1148
1149        async move {
1150            let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1151            let stream = match txns_read {
1152                None => {
1153                    // We're not using txn-wal for tables, so we can take a snapshot directly.
1154                    read_handle
1155                        .snapshot_and_stream(Antichain::from_elem(as_of))
1156                        .await
1157                        .map_err(|_| StorageError::ReadBeforeSince(id))?
1158                        .boxed()
1159                }
1160                Some(txns_read) => {
1161                    txns_read.update_gt(as_of.clone()).await;
1162                    let data_snapshot = txns_read
1163                        .data_snapshot(metadata.data_shard, as_of.clone())
1164                        .await;
1165                    data_snapshot
1166                        .snapshot_and_stream(&mut read_handle)
1167                        .await
1168                        .map_err(|_| StorageError::ReadBeforeSince(id))?
1169                        .boxed()
1170                }
1171            };
1172
1173            // Map our stream, unwrapping Persist internal errors.
1174            let stream = stream
1175                .map(|((k, _v), t, d)| {
1176                    // TODO(parkmycar): We should accumulate the errors and pass them on to the
1177                    // caller.
1178                    let data = k.expect("error while streaming from Persist");
1179                    (data, t, d)
1180                })
1181                .boxed();
1182            Ok(stream)
1183        }
1184        .boxed()
1185    }
1186
1187    fn set_read_policies_inner(
1188        &self,
1189        collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1190        policies: Vec<(GlobalId, ReadPolicy<T>)>,
1191    ) {
1192        trace!("set_read_policies: {:?}", policies);
1193
1194        let mut read_capability_changes = BTreeMap::default();
1195
1196        for (id, policy) in policies.into_iter() {
1197            let collection = match collections.get_mut(&id) {
1198                Some(c) => c,
1199                None => {
1200                    panic!("Reference to absent collection {id}");
1201                }
1202            };
1203
1204            let mut new_read_capability = policy.frontier(collection.write_frontier.borrow());
1205
1206            if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
1207                let mut update = ChangeBatch::new();
1208                update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
1209                std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
1210                update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
1211                if !update.is_empty() {
1212                    read_capability_changes.insert(id, update);
1213                }
1214            }
1215
1216            collection.read_policy = policy;
1217        }
1218
1219        for (id, changes) in read_capability_changes.iter() {
1220            if id.is_user() {
1221                trace!(%id, ?changes, "in set_read_policies, capability changes");
1222            }
1223        }
1224
1225        if !read_capability_changes.is_empty() {
1226            StorageCollectionsImpl::update_read_capabilities_inner(
1227                &self.cmd_tx,
1228                collections,
1229                &mut read_capability_changes,
1230            );
1231        }
1232    }
1233
1234    // This is not an associated function so that we can share it with the task
1235    // that updates the persist handles and also has a reference to the shared
1236    // collections state.
1237    fn update_read_capabilities_inner(
1238        cmd_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
1239        collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1240        updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>,
1241    ) {
1242        // Location to record consequences that we need to act on.
1243        let mut collections_net = BTreeMap::new();
1244
1245        // We must not rely on any specific relative ordering of `GlobalId`s.
1246        // That said, it is reasonable to assume that collections generally have
1247        // greater IDs than their dependencies, so starting with the largest is
1248        // a useful optimization.
1249        while let Some(id) = updates.keys().rev().next().cloned() {
1250            let mut update = updates.remove(&id).unwrap();
1251
1252            if id.is_user() {
1253                trace!(id = ?id, update = ?update, "update_read_capabilities");
1254            }
1255
1256            let collection = if let Some(c) = collections.get_mut(&id) {
1257                c
1258            } else {
1259                let has_positive_updates = update.iter().any(|(_ts, diff)| *diff > 0);
1260                if has_positive_updates {
1261                    panic!(
1262                        "reference to absent collection {id} but we have positive updates: {:?}",
1263                        update
1264                    );
1265                } else {
1266                    // Continue purely negative updates. Someone has probably
1267                    // already dropped this collection!
1268                    continue;
1269                }
1270            };
1271
1272            let current_read_capabilities = collection.read_capabilities.frontier().to_owned();
1273            for (time, diff) in update.iter() {
1274                assert!(
1275                    collection.read_capabilities.count_for(time) + diff >= 0,
1276                    "update {:?} for collection {id} would lead to negative \
1277                        read capabilities, read capabilities before applying: {:?}",
1278                    update,
1279                    collection.read_capabilities
1280                );
1281
1282                if collection.read_capabilities.count_for(time) + diff > 0 {
1283                    assert!(
1284                        current_read_capabilities.less_equal(time),
1285                        "update {:?} for collection {id} is trying to \
1286                            install read capabilities before the current \
1287                            frontier of read capabilities, read capabilities before applying: {:?}",
1288                        update,
1289                        collection.read_capabilities
1290                    );
1291                }
1292            }
1293
1294            let changes = collection.read_capabilities.update_iter(update.drain());
1295            update.extend(changes);
1296
1297            if id.is_user() {
1298                trace!(
1299                %id,
1300                ?collection.storage_dependencies,
1301                ?update,
1302                "forwarding update to storage dependencies");
1303            }
1304
1305            for id in collection.storage_dependencies.iter() {
1306                updates
1307                    .entry(*id)
1308                    .or_insert_with(ChangeBatch::new)
1309                    .extend(update.iter().cloned());
1310            }
1311
1312            let (changes, frontier) = collections_net
1313                .entry(id)
1314                .or_insert_with(|| (<ChangeBatch<_>>::new(), Antichain::new()));
1315
1316            changes.extend(update.drain());
1317            *frontier = collection.read_capabilities.frontier().to_owned();
1318        }
1319
1320        // Translate our net compute actions into downgrades of persist sinces.
1321        // The actual downgrades are performed by a Tokio task asynchronously.
1322        let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
1323        for (key, (mut changes, frontier)) in collections_net {
1324            if !changes.is_empty() {
1325                // If the table has a "primary" collection, let that collection drive compaction.
1326                let collection = collections.get(&key).expect("must still exist");
1327                let should_emit_persist_compaction = !matches!(
1328                    collection.description.data_source,
1329                    DataSource::Table { primary: Some(_) }
1330                );
1331
1332                if frontier.is_empty() {
1333                    info!(id = %key, "removing collection state because the since advanced to []!");
1334                    collections.remove(&key).expect("must still exist");
1335                }
1336
1337                if should_emit_persist_compaction {
1338                    persist_compaction_commands.push((key, frontier));
1339                }
1340            }
1341        }
1342
1343        if !persist_compaction_commands.is_empty() {
1344            cmd_tx
1345                .send(BackgroundCmd::DowngradeSince(persist_compaction_commands))
1346                .expect("cannot fail to send");
1347        }
1348    }
1349
1350    /// Remove any shards that we know are finalized
1351    fn synchronize_finalized_shards(&self, storage_metadata: &StorageMetadata) {
1352        self.finalized_shards
1353            .lock()
1354            .retain(|shard| storage_metadata.unfinalized_shards.contains(shard));
1355    }
1356}
1357
1358// See comments on the above impl for StorageCollectionsImpl.
1359#[async_trait]
1360impl<T> StorageCollections for StorageCollectionsImpl<T>
1361where
1362    T: TimelyTimestamp
1363        + Lattice
1364        + Codec64
1365        + From<EpochMillis>
1366        + TimestampManipulation
1367        + Into<mz_repr::Timestamp>
1368        + Sync,
1369{
1370    type Timestamp = T;
1371
1372    async fn initialize_state(
1373        &self,
1374        txn: &mut (dyn StorageTxn<T> + Send),
1375        init_ids: BTreeSet<GlobalId>,
1376    ) -> Result<(), StorageError<T>> {
1377        let metadata = txn.get_collection_metadata();
1378        let existing_metadata: BTreeSet<_> = metadata.into_iter().map(|(id, _)| id).collect();
1379
1380        // Determine which collections we do not yet have metadata for.
1381        let new_collections: BTreeSet<GlobalId> =
1382            init_ids.difference(&existing_metadata).cloned().collect();
1383
1384        self.prepare_state(
1385            txn,
1386            new_collections,
1387            BTreeSet::default(),
1388            BTreeMap::default(),
1389        )
1390        .await?;
1391
1392        // All shards that belong to collections dropped in the last epoch are
1393        // eligible for finalization.
1394        //
1395        // n.b. this introduces an unlikely race condition: if a collection is
1396        // dropped from the catalog, but the dataflow is still running on a
1397        // worker, assuming the shard is safe to finalize on reboot may cause
1398        // the cluster to panic.
1399        let unfinalized_shards = txn.get_unfinalized_shards().into_iter().collect_vec();
1400
1401        info!(?unfinalized_shards, "initializing finalizable_shards");
1402
1403        self.finalizable_shards.lock().extend(unfinalized_shards);
1404
1405        Ok(())
1406    }
1407
1408    fn update_parameters(&self, config_params: StorageParameters) {
1409        // We serialize the dyncfg updates in StorageParameters, but configure
1410        // persist separately.
1411        config_params.dyncfg_updates.apply(self.persist.cfg());
1412
1413        self.config
1414            .lock()
1415            .expect("lock poisoned")
1416            .update(config_params);
1417    }
1418
1419    fn collection_metadata(
1420        &self,
1421        id: GlobalId,
1422    ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>> {
1423        let collections = self.collections.lock().expect("lock poisoned");
1424
1425        collections
1426            .get(&id)
1427            .map(|c| c.collection_metadata.clone())
1428            .ok_or(StorageError::IdentifierMissing(id))
1429    }
1430
1431    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
1432        let collections = self.collections.lock().expect("lock poisoned");
1433
1434        collections
1435            .iter()
1436            .filter(|(_id, c)| !c.is_dropped())
1437            .map(|(id, c)| (*id, c.collection_metadata.clone()))
1438            .collect()
1439    }
1440
1441    fn collections_frontiers(
1442        &self,
1443        ids: Vec<GlobalId>,
1444    ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>> {
1445        if ids.is_empty() {
1446            return Ok(vec![]);
1447        }
1448
1449        let collections = self.collections.lock().expect("lock poisoned");
1450
1451        let res = ids
1452            .into_iter()
1453            .map(|id| {
1454                collections
1455                    .get(&id)
1456                    .map(|c| CollectionFrontiers {
1457                        id: id.clone(),
1458                        write_frontier: c.write_frontier.clone(),
1459                        implied_capability: c.implied_capability.clone(),
1460                        read_capabilities: c.read_capabilities.frontier().to_owned(),
1461                    })
1462                    .ok_or(StorageError::IdentifierMissing(id))
1463            })
1464            .collect::<Result<Vec<_>, _>>()?;
1465
1466        Ok(res)
1467    }
1468
1469    fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
1470        let collections = self.collections.lock().expect("lock poisoned");
1471
1472        let res = collections
1473            .iter()
1474            .filter(|(_id, c)| !c.is_dropped())
1475            .map(|(id, c)| CollectionFrontiers {
1476                id: id.clone(),
1477                write_frontier: c.write_frontier.clone(),
1478                implied_capability: c.implied_capability.clone(),
1479                read_capabilities: c.read_capabilities.frontier().to_owned(),
1480            })
1481            .collect_vec();
1482
1483        res
1484    }
1485
1486    async fn snapshot_stats(
1487        &self,
1488        id: GlobalId,
1489        as_of: Antichain<Self::Timestamp>,
1490    ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
1491        let metadata = self.collection_metadata(id)?;
1492
1493        // See the comments in StorageController::snapshot for what's going on
1494        // here.
1495        let as_of = match metadata.txns_shard.as_ref() {
1496            None => SnapshotStatsAsOf::Direct(as_of),
1497            Some(txns_id) => {
1498                assert_eq!(txns_id, self.txns_read.txns_id());
1499                let as_of = as_of
1500                    .into_option()
1501                    .expect("cannot read as_of the empty antichain");
1502                self.txns_read.update_gt(as_of.clone()).await;
1503                let data_snapshot = self
1504                    .txns_read
1505                    .data_snapshot(metadata.data_shard, as_of.clone())
1506                    .await;
1507                SnapshotStatsAsOf::Txns(data_snapshot)
1508            }
1509        };
1510        self.snapshot_stats_inner(id, as_of).await
1511    }
1512
1513    async fn snapshot_parts_stats(
1514        &self,
1515        id: GlobalId,
1516        as_of: Antichain<Self::Timestamp>,
1517    ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
1518        let metadata = {
1519            let self_collections = self.collections.lock().expect("lock poisoned");
1520
1521            let collection_metadata = self_collections
1522                .get(&id)
1523                .ok_or(StorageError::IdentifierMissing(id))
1524                .map(|c| c.collection_metadata.clone());
1525
1526            match collection_metadata {
1527                Ok(m) => m,
1528                Err(e) => return Box::pin(async move { Err(e) }),
1529            }
1530        };
1531
1532        // See the comments in StorageController::snapshot for what's going on
1533        // here.
1534        let persist = Arc::clone(&self.persist);
1535        let read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await;
1536
1537        let data_snapshot = match (metadata, as_of.as_option()) {
1538            (
1539                CollectionMetadata {
1540                    txns_shard: Some(txns_id),
1541                    data_shard,
1542                    ..
1543                },
1544                Some(as_of),
1545            ) => {
1546                assert_eq!(txns_id, *self.txns_read.txns_id());
1547                self.txns_read.update_gt(as_of.clone()).await;
1548                let data_snapshot = self
1549                    .txns_read
1550                    .data_snapshot(data_shard, as_of.clone())
1551                    .await;
1552                Some(data_snapshot)
1553            }
1554            _ => None,
1555        };
1556
1557        Box::pin(async move {
1558            let read_handle = read_handle?;
1559            let result = match data_snapshot {
1560                Some(data_snapshot) => data_snapshot.snapshot_parts_stats(&read_handle).await,
1561                None => read_handle.snapshot_parts_stats(as_of).await,
1562            };
1563            read_handle.expire().await;
1564            result.map_err(|_| StorageError::ReadBeforeSince(id))
1565        })
1566    }
1567
1568    // TODO(petrosagg): This signature is not very useful in the context of partially ordered times
1569    // where the as_of frontier might have multiple elements. In the current form the mutually
1570    // incomparable updates will be accumulated together to a state of the collection that never
1571    // actually existed. We should include the original time in the updates advanced by the as_of
1572    // frontier in the result and let the caller decide what to do with the information.
1573    fn snapshot(
1574        &self,
1575        id: GlobalId,
1576        as_of: Self::Timestamp,
1577    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>> {
1578        self.snapshot(id, as_of, &self.txns_read)
1579    }
1580
1581    async fn snapshot_latest(
1582        &self,
1583        id: GlobalId,
1584    ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
1585        let upper = self.recent_upper(id).await?;
1586        let res = match upper.as_option() {
1587            Some(f) if f > &T::minimum() => {
1588                let as_of = f.step_back().unwrap();
1589
1590                let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap();
1591                snapshot
1592                    .into_iter()
1593                    .map(|(row, diff)| {
1594                        assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
1595                        row
1596                    })
1597                    .collect()
1598            }
1599            Some(_min) => {
1600                // The collection must be empty!
1601                Vec::new()
1602            }
1603            // The collection is closed, we cannot determine a latest read
1604            // timestamp based on the upper.
1605            _ => {
1606                return Err(StorageError::InvalidUsage(
1607                    "collection closed, cannot determine a read timestamp based on the upper"
1608                        .to_string(),
1609                ));
1610            }
1611        };
1612
1613        Ok(res)
1614    }
1615
1616    fn snapshot_cursor(
1617        &self,
1618        id: GlobalId,
1619        as_of: Self::Timestamp,
1620    ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
1621    where
1622        Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
1623    {
1624        let metadata = match self.collection_metadata(id) {
1625            Ok(metadata) => metadata.clone(),
1626            Err(e) => return async { Err(e) }.boxed(),
1627        };
1628        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1629            // Ensure the txn's shard the controller has is the same that this
1630            // collection is registered to.
1631            assert_eq!(txns_id, self.txns_read.txns_id());
1632            self.txns_read.clone()
1633        });
1634        let persist = Arc::clone(&self.persist);
1635
1636        // See the comments in Self::snapshot for what's going on here.
1637        async move {
1638            let mut handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1639            let cursor = match txns_read {
1640                None => {
1641                    let cursor = handle
1642                        .snapshot_cursor(Antichain::from_elem(as_of), |_| true)
1643                        .await
1644                        .map_err(|_| StorageError::ReadBeforeSince(id))?;
1645                    SnapshotCursor {
1646                        _read_handle: handle,
1647                        cursor,
1648                    }
1649                }
1650                Some(txns_read) => {
1651                    txns_read.update_gt(as_of.clone()).await;
1652                    let data_snapshot = txns_read
1653                        .data_snapshot(metadata.data_shard, as_of.clone())
1654                        .await;
1655                    let cursor = data_snapshot
1656                        .snapshot_cursor(&mut handle, |_| true)
1657                        .await
1658                        .map_err(|_| StorageError::ReadBeforeSince(id))?;
1659                    SnapshotCursor {
1660                        _read_handle: handle,
1661                        cursor,
1662                    }
1663                }
1664            };
1665
1666            Ok(cursor)
1667        }
1668        .boxed()
1669    }
1670
1671    fn snapshot_and_stream(
1672        &self,
1673        id: GlobalId,
1674        as_of: Self::Timestamp,
1675    ) -> BoxFuture<
1676        'static,
1677        Result<
1678            BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
1679            StorageError<Self::Timestamp>,
1680        >,
1681    >
1682    where
1683        Self::Timestamp: TimelyTimestamp + Lattice + Codec64 + 'static,
1684    {
1685        self.snapshot_and_stream(id, as_of, &self.txns_read)
1686    }
1687
1688    fn create_update_builder(
1689        &self,
1690        id: GlobalId,
1691    ) -> BoxFuture<
1692        'static,
1693        Result<
1694            TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
1695            StorageError<Self::Timestamp>,
1696        >,
1697    > {
1698        let metadata = match self.collection_metadata(id) {
1699            Ok(m) => m,
1700            Err(e) => return Box::pin(async move { Err(e) }),
1701        };
1702        let persist = Arc::clone(&self.persist);
1703
1704        async move {
1705            let persist_client = persist
1706                .open(metadata.persist_location.clone())
1707                .await
1708                .expect("invalid persist usage");
1709            let write_handle = persist_client
1710                .open_writer::<SourceData, (), Self::Timestamp, StorageDiff>(
1711                    metadata.data_shard,
1712                    Arc::new(metadata.relation_desc.clone()),
1713                    Arc::new(UnitSchema),
1714                    Diagnostics {
1715                        shard_name: id.to_string(),
1716                        handle_purpose: format!("create write batch {}", id),
1717                    },
1718                )
1719                .await
1720                .expect("invalid persist usage");
1721            let builder = TimestamplessUpdateBuilder::new(&write_handle);
1722
1723            Ok(builder)
1724        }
1725        .boxed()
1726    }
1727
1728    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
1729        let collections = self.collections.lock().expect("lock poisoned");
1730
1731        if collections.contains_key(&id) {
1732            Ok(())
1733        } else {
1734            Err(StorageError::IdentifierMissing(id))
1735        }
1736    }
1737
1738    async fn prepare_state(
1739        &self,
1740        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
1741        ids_to_add: BTreeSet<GlobalId>,
1742        ids_to_drop: BTreeSet<GlobalId>,
1743        ids_to_register: BTreeMap<GlobalId, ShardId>,
1744    ) -> Result<(), StorageError<T>> {
1745        txn.insert_collection_metadata(
1746            ids_to_add
1747                .into_iter()
1748                .map(|id| (id, ShardId::new()))
1749                .collect(),
1750        )?;
1751        txn.insert_collection_metadata(ids_to_register)?;
1752
1753        // Delete the metadata for any dropped collections.
1754        let dropped_mappings = txn.delete_collection_metadata(ids_to_drop);
1755
1756        let dropped_shards = dropped_mappings
1757            .into_iter()
1758            .map(|(_id, shard)| shard)
1759            .collect();
1760
1761        txn.insert_unfinalized_shards(dropped_shards)?;
1762
1763        // Reconcile any shards we've successfully finalized with the shard
1764        // finalization collection.
1765        let finalized_shards = self.finalized_shards.lock().iter().copied().collect();
1766        txn.mark_shards_as_finalized(finalized_shards);
1767
1768        Ok(())
1769    }
1770
1771    // TODO(aljoscha): It would be swell if we could refactor this Leviathan of
1772    // a method/move individual parts to their own methods.
1773    #[instrument(level = "debug")]
1774    async fn create_collections_for_bootstrap(
1775        &self,
1776        storage_metadata: &StorageMetadata,
1777        register_ts: Option<Self::Timestamp>,
1778        mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
1779        migrated_storage_collections: &BTreeSet<GlobalId>,
1780    ) -> Result<(), StorageError<Self::Timestamp>> {
1781        let is_in_txns = |id, metadata: &CollectionMetadata| {
1782            metadata.txns_shard.is_some()
1783                && !(self.read_only && migrated_storage_collections.contains(&id))
1784        };
1785
1786        // Validate first, to avoid corrupting state.
1787        // 1. create a dropped identifier, or
1788        // 2. create an existing identifier with a new description.
1789        // Make sure to check for errors within `ingestions` as well.
1790        collections.sort_by_key(|(id, _)| *id);
1791        collections.dedup();
1792        for pos in 1..collections.len() {
1793            if collections[pos - 1].0 == collections[pos].0 {
1794                return Err(StorageError::CollectionIdReused(collections[pos].0));
1795            }
1796        }
1797
1798        {
1799            // Early sanity check: if we knew about a collection already it's
1800            // description must match!
1801            //
1802            // NOTE: There could be concurrent modifications to
1803            // `self.collections`, but this sanity check is better than nothing.
1804            let self_collections = self.collections.lock().expect("lock poisoned");
1805            for (id, description) in collections.iter() {
1806                if let Some(existing_collection) = self_collections.get(id) {
1807                    if &existing_collection.description != description {
1808                        return Err(StorageError::CollectionIdReused(*id));
1809                    }
1810                }
1811            }
1812        }
1813
1814        // We first enrich each collection description with some additional
1815        // metadata...
1816        let enriched_with_metadata = collections
1817            .into_iter()
1818            .map(|(id, description)| {
1819                let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
1820
1821                // If the shard is being managed by txn-wal (initially,
1822                // tables), then we need to pass along the shard id for the txns
1823                // shard to dataflow rendering.
1824                let txns_shard = description
1825                    .data_source
1826                    .in_txns()
1827                    .then(|| *self.txns_read.txns_id());
1828
1829                let metadata = CollectionMetadata {
1830                    persist_location: self.persist_location.clone(),
1831                    data_shard,
1832                    relation_desc: description.desc.clone(),
1833                    txns_shard,
1834                };
1835
1836                Ok((id, description, metadata))
1837            })
1838            .collect_vec();
1839
1840        // So that we can open `SinceHandle`s for each collections concurrently.
1841        let persist_client = self
1842            .persist
1843            .open(self.persist_location.clone())
1844            .await
1845            .unwrap();
1846        let persist_client = &persist_client;
1847        // Reborrow the `&mut self` as immutable, as all the concurrent work to
1848        // be processed in this stream cannot all have exclusive access.
1849        use futures::stream::{StreamExt, TryStreamExt};
1850        let this = &*self;
1851        let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
1852            .map(|data: Result<_, StorageError<Self::Timestamp>>| {
1853                let register_ts = register_ts.clone();
1854                async move {
1855                    let (id, description, metadata) = data?;
1856
1857                    // should be replaced with real introspection
1858                    // (https://github.com/MaterializeInc/database-issues/issues/4078)
1859                    // but for now, it's helpful to have this mapping written down
1860                    // somewhere
1861                    debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
1862
1863                    let (write, mut since_handle) = this
1864                        .open_data_handles(
1865                            &id,
1866                            metadata.data_shard,
1867                            description.since.as_ref(),
1868                            metadata.relation_desc.clone(),
1869                            persist_client,
1870                        )
1871                        .await;
1872
1873                    // Present tables as springing into existence at the register_ts
1874                    // by advancing the since. Otherwise, we could end up in a
1875                    // situation where a table with a long compaction window appears
1876                    // to exist before the environment (and this the table) existed.
1877                    //
1878                    // We could potentially also do the same thing for other
1879                    // sources, in particular storage's internal sources and perhaps
1880                    // others, but leave them for now.
1881                    match description.data_source {
1882                        DataSource::Introspection(_)
1883                        | DataSource::IngestionExport { .. }
1884                        | DataSource::Webhook
1885                        | DataSource::Ingestion(_)
1886                        | DataSource::Progress
1887                        | DataSource::Other => {}
1888                        DataSource::Sink { .. } => {}
1889                        DataSource::Table { .. } => {
1890                            let register_ts = register_ts.expect(
1891                                "caller should have provided a register_ts when creating a table",
1892                            );
1893                            if since_handle.since().elements() == &[T::minimum()]
1894                                && !migrated_storage_collections.contains(&id)
1895                            {
1896                                debug!("advancing {} to initial since of {:?}", id, register_ts);
1897                                let token = since_handle.opaque();
1898                                let _ = since_handle
1899                                    .compare_and_downgrade_since(
1900                                        &token,
1901                                        (&token, &Antichain::from_elem(register_ts.clone())),
1902                                    )
1903                                    .await;
1904                            }
1905                        }
1906                    }
1907
1908                    Ok::<_, StorageError<Self::Timestamp>>((
1909                        id,
1910                        description,
1911                        write,
1912                        since_handle,
1913                        metadata,
1914                    ))
1915                }
1916            })
1917            // Poll each future for each collection concurrently, maximum of 50 at a time.
1918            .buffer_unordered(50)
1919            // HERE BE DRAGONS:
1920            //
1921            // There are at least 2 subtleties in using `FuturesUnordered`
1922            // (which `buffer_unordered` uses underneath:
1923            // - One is captured here
1924            //   <https://github.com/rust-lang/futures-rs/issues/2387>
1925            // - And the other is deadlocking if processing an OUTPUT of a
1926            //   `FuturesUnordered` stream attempts to obtain an async mutex that
1927            //   is also obtained in the futures being polled.
1928            //
1929            // Both of these could potentially be issues in all usages of
1930            // `buffer_unordered` in this method, so we stick the standard
1931            // advice: only use `try_collect` or `collect`!
1932            .try_collect()
1933            .await?;
1934
1935        // Reorder in dependency order.
1936        #[derive(Ord, PartialOrd, Eq, PartialEq)]
1937        enum DependencyOrder {
1938            /// Tables should always be registered first, and large ids before small ones.
1939            Table(Reverse<GlobalId>),
1940            /// For most collections the id order is the correct one.
1941            Collection(GlobalId),
1942            /// Sinks should always be registered last.
1943            Sink(GlobalId),
1944        }
1945        to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
1946            DataSource::Table { .. } => DependencyOrder::Table(Reverse(*id)),
1947            DataSource::Sink { .. } => DependencyOrder::Sink(*id),
1948            _ => DependencyOrder::Collection(*id),
1949        });
1950
1951        // We hold this lock for a very short amount of time, just doing some
1952        // hashmap inserts and unbounded channel sends.
1953        let mut self_collections = self.collections.lock().expect("lock poisoned");
1954
1955        for (id, description, write_handle, since_handle, metadata) in to_register {
1956            let write_frontier = write_handle.upper();
1957            let data_shard_since = since_handle.since().clone();
1958
1959            // Determine if this collection has any dependencies.
1960            let storage_dependencies = self
1961                .determine_collection_dependencies(&*self_collections, &description.data_source)?;
1962
1963            // Determine the initial since of the collection.
1964            let initial_since = match storage_dependencies
1965                .iter()
1966                .at_most_one()
1967                .expect("should have at most one dependency")
1968            {
1969                Some(dep) => {
1970                    let dependency_collection = self_collections
1971                        .get(dep)
1972                        .ok_or(StorageError::IdentifierMissing(*dep))?;
1973                    let dependency_since = dependency_collection.implied_capability.clone();
1974
1975                    // If an item has a dependency, its initial since must be
1976                    // advanced as far as its dependency, i.e. a dependency's
1977                    // since may never be in advance of its dependents.
1978                    //
1979                    // We have to do this every time we initialize the
1980                    // collection, though––the invariant might have been upheld
1981                    // correctly in the previous epoch, but the
1982                    // `data_shard_since` might not have compacted and, on
1983                    // establishing a new persist connection, still have data we
1984                    // said _could_ be compacted.
1985                    if PartialOrder::less_than(&data_shard_since, &dependency_since) {
1986                        // The dependency since cannot be beyond the dependent
1987                        // (our) upper unless the collection is new. In
1988                        // practice, the depdenency is the remap shard of a
1989                        // source (export), and if the since is allowed to
1990                        // "catch up" to the upper, that is `upper <= since`, a
1991                        // restarting ingestion cannot differentiate between
1992                        // updates that have already been written out to the
1993                        // backing persist shard and updates that have yet to be
1994                        // written. We would write duplicate updates.
1995                        //
1996                        // If this check fails, it means that the read hold
1997                        // installed on the dependency was probably not upheld
1998                        // –– if it were, the dependency's since could not have
1999                        // advanced as far the dependent's upper.
2000                        //
2001                        // We don't care about the dependency since when the
2002                        // write frontier is empty. In that case, no-one can
2003                        // write down any more updates.
2004                        mz_ore::soft_assert_or_log!(
2005                            write_frontier.elements() == &[T::minimum()]
2006                                || write_frontier.is_empty()
2007                                || PartialOrder::less_than(&dependency_since, write_frontier),
2008                            "dependency ({dep}) since has advanced past dependent ({id}) upper \n
2009                            dependent ({id}): since {:?}, upper {:?} \n
2010                            dependency ({dep}): since {:?}",
2011                            data_shard_since,
2012                            write_frontier,
2013                            dependency_since
2014                        );
2015
2016                        dependency_since
2017                    } else {
2018                        data_shard_since
2019                    }
2020                }
2021                None => data_shard_since,
2022            };
2023
2024            let mut collection_state = CollectionState::new(
2025                description,
2026                initial_since,
2027                write_frontier.clone(),
2028                storage_dependencies,
2029                metadata.clone(),
2030            );
2031
2032            // Install the collection state in the appropriate spot.
2033            match &collection_state.description.data_source {
2034                DataSource::Introspection(_) => {
2035                    self_collections.insert(id, collection_state);
2036                }
2037                DataSource::Webhook => {
2038                    self_collections.insert(id, collection_state);
2039                }
2040                DataSource::IngestionExport {
2041                    ingestion_id,
2042                    details,
2043                    data_config,
2044                } => {
2045                    // Adjust the source to contain this export.
2046                    let source_collection = self_collections
2047                        .get_mut(ingestion_id)
2048                        .expect("known to exist");
2049                    match &mut source_collection.description {
2050                        CollectionDescription {
2051                            data_source: DataSource::Ingestion(ingestion_desc),
2052                            ..
2053                        } => ingestion_desc.source_exports.insert(
2054                            id,
2055                            SourceExport {
2056                                storage_metadata: (),
2057                                details: details.clone(),
2058                                data_config: data_config.clone(),
2059                            },
2060                        ),
2061                        _ => unreachable!(
2062                            "SourceExport must only refer to primary sources that already exist"
2063                        ),
2064                    };
2065
2066                    self_collections.insert(id, collection_state);
2067                }
2068                DataSource::Table { .. } => {
2069                    // See comment on self.initial_txn_upper on why we're doing
2070                    // this.
2071                    if is_in_txns(id, &metadata)
2072                        && PartialOrder::less_than(
2073                            &collection_state.write_frontier,
2074                            &self.initial_txn_upper,
2075                        )
2076                    {
2077                        // We could try and be cute and use the join of the txn
2078                        // upper and the table upper. But that has more
2079                        // complicated reasoning for why it is or isn't correct,
2080                        // and we're only dealing with totally ordered times
2081                        // here.
2082                        collection_state
2083                            .write_frontier
2084                            .clone_from(&self.initial_txn_upper);
2085                    }
2086                    self_collections.insert(id, collection_state);
2087                }
2088                DataSource::Progress | DataSource::Other => {
2089                    self_collections.insert(id, collection_state);
2090                }
2091                DataSource::Ingestion(_) => {
2092                    self_collections.insert(id, collection_state);
2093                }
2094                DataSource::Sink { .. } => {
2095                    self_collections.insert(id, collection_state);
2096                }
2097            }
2098
2099            self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle);
2100
2101            // If this collection has a dependency, install a read hold on it.
2102            self.install_collection_dependency_read_holds_inner(&mut *self_collections, id)?;
2103        }
2104
2105        drop(self_collections);
2106
2107        self.synchronize_finalized_shards(storage_metadata);
2108
2109        Ok(())
2110    }
2111
2112    async fn alter_ingestion_source_desc(
2113        &self,
2114        ingestion_id: GlobalId,
2115        source_desc: SourceDesc,
2116    ) -> Result<(), StorageError<Self::Timestamp>> {
2117        // The StorageController checks the validity of these. And we just
2118        // accept them.
2119
2120        let mut self_collections = self.collections.lock().expect("lock poisoned");
2121        let collection = self_collections
2122            .get_mut(&ingestion_id)
2123            .ok_or(StorageError::IdentifierMissing(ingestion_id))?;
2124
2125        let curr_ingestion = match &mut collection.description.data_source {
2126            DataSource::Ingestion(active_ingestion) => active_ingestion,
2127            _ => unreachable!("verified collection refers to ingestion"),
2128        };
2129
2130        curr_ingestion.desc = source_desc;
2131        debug!("altered {ingestion_id}'s SourceDesc");
2132
2133        Ok(())
2134    }
2135
2136    async fn alter_ingestion_export_data_configs(
2137        &self,
2138        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
2139    ) -> Result<(), StorageError<Self::Timestamp>> {
2140        let mut self_collections = self.collections.lock().expect("lock poisoned");
2141
2142        for (source_export_id, new_data_config) in source_exports {
2143            // We need to adjust the data config on the CollectionState for
2144            // the source export collection directly
2145            let source_export_collection = self_collections
2146                .get_mut(&source_export_id)
2147                .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2148            let ingestion_id = match &mut source_export_collection.description.data_source {
2149                DataSource::IngestionExport {
2150                    ingestion_id,
2151                    details: _,
2152                    data_config,
2153                } => {
2154                    *data_config = new_data_config.clone();
2155                    *ingestion_id
2156                }
2157                o => {
2158                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2159                    Err(StorageError::IdentifierInvalid(source_export_id))?
2160                }
2161            };
2162            // We also need to adjust the data config on the CollectionState of the
2163            // Ingestion that the export is associated with.
2164            let ingestion_collection = self_collections
2165                .get_mut(&ingestion_id)
2166                .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
2167
2168            match &mut ingestion_collection.description.data_source {
2169                DataSource::Ingestion(ingestion_desc) => {
2170                    let source_export = ingestion_desc
2171                        .source_exports
2172                        .get_mut(&source_export_id)
2173                        .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2174
2175                    if source_export.data_config != new_data_config {
2176                        tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
2177                        source_export.data_config = new_data_config;
2178                    } else {
2179                        tracing::warn!(
2180                            "alter_ingestion_export_data_configs called on \
2181                                    export {source_export_id} of {ingestion_id} but \
2182                                    the data config was the same"
2183                        );
2184                    }
2185                }
2186                o => {
2187                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2188                    Err(StorageError::IdentifierInvalid(ingestion_id))?;
2189                }
2190            }
2191        }
2192
2193        Ok(())
2194    }
2195
2196    async fn alter_ingestion_connections(
2197        &self,
2198        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
2199    ) -> Result<(), StorageError<Self::Timestamp>> {
2200        let mut self_collections = self.collections.lock().expect("lock poisoned");
2201
2202        for (id, conn) in source_connections {
2203            let collection = self_collections
2204                .get_mut(&id)
2205                .ok_or_else(|| StorageError::IdentifierMissing(id))?;
2206
2207            match &mut collection.description.data_source {
2208                DataSource::Ingestion(ingestion) => {
2209                    // If the connection hasn't changed, there's no sense in
2210                    // re-rendering the dataflow.
2211                    if ingestion.desc.connection != conn {
2212                        info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
2213                        ingestion.desc.connection = conn;
2214                    } else {
2215                        warn!(
2216                            "update_source_connection called on {id} but the \
2217                            connection was the same"
2218                        );
2219                    }
2220                }
2221                o => {
2222                    warn!("update_source_connection called on {:?}", o);
2223                    Err(StorageError::IdentifierInvalid(id))?;
2224                }
2225            }
2226        }
2227
2228        Ok(())
2229    }
2230
2231    async fn alter_table_desc(
2232        &self,
2233        existing_collection: GlobalId,
2234        new_collection: GlobalId,
2235        new_desc: RelationDesc,
2236        expected_version: RelationVersion,
2237    ) -> Result<(), StorageError<Self::Timestamp>> {
2238        let data_shard = {
2239            let self_collections = self.collections.lock().expect("lock poisoned");
2240            let existing = self_collections
2241                .get(&existing_collection)
2242                .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;
2243
2244            // TODO(alter_table): Support changes to sources.
2245            if !matches!(&existing.description.data_source, DataSource::Table { .. }) {
2246                return Err(StorageError::IdentifierInvalid(existing_collection));
2247            }
2248
2249            existing.collection_metadata.data_shard
2250        };
2251
2252        let persist_client = self
2253            .persist
2254            .open(self.persist_location.clone())
2255            .await
2256            .unwrap();
2257
2258        // Evolve the schema of this shard.
2259        let diagnostics = Diagnostics {
2260            shard_name: existing_collection.to_string(),
2261            handle_purpose: "alter_table_desc".to_string(),
2262        };
2263        // We map the Adapter's RelationVersion 1:1 with SchemaId.
2264        let expected_schema = expected_version.into();
2265        let schema_result = persist_client
2266            .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
2267                data_shard,
2268                expected_schema,
2269                &new_desc,
2270                &UnitSchema,
2271                diagnostics,
2272            )
2273            .await
2274            .map_err(|e| StorageError::InvalidUsage(e.to_string()))?;
2275        tracing::info!(
2276            ?existing_collection,
2277            ?new_collection,
2278            ?new_desc,
2279            "evolved schema"
2280        );
2281
2282        match schema_result {
2283            CaESchema::Ok(id) => id,
2284            // TODO(alter_table): If we get an expected mismatch we should retry.
2285            CaESchema::ExpectedMismatch {
2286                schema_id,
2287                key,
2288                val,
2289            } => {
2290                mz_ore::soft_panic_or_log!(
2291                    "schema expectation mismatch {schema_id:?}, {key:?}, {val:?}"
2292                );
2293                return Err(StorageError::Generic(anyhow::anyhow!(
2294                    "schema expected mismatch, {existing_collection:?}",
2295                )));
2296            }
2297            CaESchema::Incompatible => {
2298                mz_ore::soft_panic_or_log!(
2299                    "incompatible schema! {existing_collection} {new_desc:?}"
2300                );
2301                return Err(StorageError::Generic(anyhow::anyhow!(
2302                    "schema incompatible, {existing_collection:?}"
2303                )));
2304            }
2305        };
2306
2307        // Once the new schema is registered we can open new data handles.
2308        let (write_handle, since_handle) = self
2309            .open_data_handles(
2310                &new_collection,
2311                data_shard,
2312                None,
2313                new_desc.clone(),
2314                &persist_client,
2315            )
2316            .await;
2317
2318        // TODO(alter_table): Do we need to advance the since of the table to match the time this
2319        // new version was registered with txn-wal?
2320
2321        // Great! Our new schema is registered with Persist, now we need to update our internal
2322        // data structures.
2323        {
2324            let mut self_collections = self.collections.lock().expect("lock poisoned");
2325
2326            // Update the existing collection so we know it's a "projection" of this new one.
2327            let existing = self_collections
2328                .get_mut(&existing_collection)
2329                .expect("existing collection missing");
2330
2331            // A higher level should already be asserting this, but let's make sure.
2332            assert!(matches!(
2333                existing.description.data_source,
2334                DataSource::Table { primary: None }
2335            ));
2336
2337            // The existing version of the table will depend on the new version.
2338            existing.description.data_source = DataSource::Table {
2339                primary: Some(new_collection),
2340            };
2341            existing.storage_dependencies.push(new_collection);
2342
2343            // Copy over the frontiers from the previous version.
2344            // The new table starts with two holds - the implied capability, and the hold from
2345            // the previous version - both at the previous version's read frontier.
2346            let implied_capability = existing.read_capabilities.frontier().to_owned();
2347            let write_frontier = existing.write_frontier.clone();
2348
2349            // Determine the relevant read capabilities on the new collection.
2350            //
2351            // Note(parkmycar): Originally we used `install_collection_dependency_read_holds_inner`
2352            // here, but that only installed a ReadHold on the new collection for the implied
2353            // capability of the existing collection. This would cause runtime panics because it
2354            // would eventually result in negative read capabilities.
2355            let mut changes = ChangeBatch::new();
2356            changes.extend(implied_capability.iter().map(|t| (t.clone(), 1)));
2357
2358            // Note: The new collection is now the "primary collection" so we specify `None` here.
2359            let collection_desc = CollectionDescription::for_table(new_desc.clone(), None);
2360            let collection_meta = CollectionMetadata {
2361                persist_location: self.persist_location.clone(),
2362                relation_desc: collection_desc.desc.clone(),
2363                data_shard,
2364                txns_shard: Some(self.txns_read.txns_id().clone()),
2365            };
2366            let collection_state = CollectionState::new(
2367                collection_desc,
2368                implied_capability,
2369                write_frontier,
2370                Vec::new(),
2371                collection_meta,
2372            );
2373
2374            // Add a record of the new collection.
2375            self_collections.insert(new_collection, collection_state);
2376
2377            let mut updates = BTreeMap::from([(new_collection, changes)]);
2378            StorageCollectionsImpl::update_read_capabilities_inner(
2379                &self.cmd_tx,
2380                &mut *self_collections,
2381                &mut updates,
2382            );
2383        };
2384
2385        // TODO(alter_table): Support changes to sources.
2386        self.register_handles(new_collection, true, since_handle, write_handle);
2387
2388        info!(%existing_collection, %new_collection, ?new_desc, "altered table");
2389
2390        Ok(())
2391    }
2392
2393    fn drop_collections_unvalidated(
2394        &self,
2395        storage_metadata: &StorageMetadata,
2396        identifiers: Vec<GlobalId>,
2397    ) {
2398        debug!(?identifiers, "drop_collections_unvalidated");
2399
2400        let mut self_collections = self.collections.lock().expect("lock poisoned");
2401
2402        for id in identifiers.iter() {
2403            let metadata = storage_metadata.get_collection_shard::<T>(*id);
2404            mz_ore::soft_assert_or_log!(
2405                matches!(metadata, Err(StorageError::IdentifierMissing(_))),
2406                "dropping {id}, but drop was not synchronized with storage \
2407                controller via `synchronize_collections`"
2408            );
2409
2410            let dropped_data_source = match self_collections.get(id) {
2411                Some(col) => col.description.data_source.clone(),
2412                None => continue,
2413            };
2414
2415            // If we are dropping source exports, we need to modify the
2416            // ingestion that it runs on.
2417            if let DataSource::IngestionExport { ingestion_id, .. } = dropped_data_source {
2418                // Adjust the source to remove this export.
2419                let ingestion = match self_collections.get_mut(&ingestion_id) {
2420                    Some(ingestion) => ingestion,
2421                    // Primary ingestion already dropped.
2422                    None => {
2423                        tracing::error!(
2424                            "primary source {ingestion_id} seemingly dropped before subsource {id}",
2425                        );
2426                        continue;
2427                    }
2428                };
2429
2430                match &mut ingestion.description {
2431                    CollectionDescription {
2432                        data_source: DataSource::Ingestion(ingestion_desc),
2433                        ..
2434                    } => {
2435                        let removed = ingestion_desc.source_exports.remove(id);
2436                        mz_ore::soft_assert_or_log!(
2437                            removed.is_some(),
2438                            "dropped subsource {id} already removed from source exports"
2439                        );
2440                    }
2441                    _ => unreachable!(
2442                        "SourceExport must only refer to primary sources that already exist"
2443                    ),
2444                };
2445            }
2446        }
2447
2448        // Policies that advance the since to the empty antichain. We do still
2449        // honor outstanding read holds, and collections will only be dropped
2450        // once those are removed as well.
2451        //
2452        // We don't explicitly remove read capabilities! Downgrading the
2453        // frontier of the source to `[]` (the empty Antichain), will propagate
2454        // to the storage dependencies.
2455        let mut finalized_policies = Vec::new();
2456
2457        for id in identifiers {
2458            // Make sure it's still there, might already have been deleted.
2459            if self_collections.contains_key(&id) {
2460                finalized_policies.push((id, ReadPolicy::ValidFrom(Antichain::new())));
2461            }
2462        }
2463        self.set_read_policies_inner(&mut self_collections, finalized_policies);
2464
2465        drop(self_collections);
2466
2467        self.synchronize_finalized_shards(storage_metadata);
2468    }
2469
2470    fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
2471        let mut collections = self.collections.lock().expect("lock poisoned");
2472
2473        if tracing::enabled!(tracing::Level::TRACE) {
2474            let user_capabilities = collections
2475                .iter_mut()
2476                .filter(|(id, _c)| id.is_user())
2477                .map(|(id, c)| {
2478                    let updates = c.read_capabilities.updates().cloned().collect_vec();
2479                    (*id, c.implied_capability.clone(), updates)
2480                })
2481                .collect_vec();
2482
2483            trace!(?policies, ?user_capabilities, "set_read_policies");
2484        }
2485
2486        self.set_read_policies_inner(&mut collections, policies);
2487
2488        if tracing::enabled!(tracing::Level::TRACE) {
2489            let user_capabilities = collections
2490                .iter_mut()
2491                .filter(|(id, _c)| id.is_user())
2492                .map(|(id, c)| {
2493                    let updates = c.read_capabilities.updates().cloned().collect_vec();
2494                    (*id, c.implied_capability.clone(), updates)
2495                })
2496                .collect_vec();
2497
2498            trace!(?user_capabilities, "after! set_read_policies");
2499        }
2500    }
2501
2502    fn acquire_read_holds(
2503        &self,
2504        desired_holds: Vec<GlobalId>,
2505    ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError> {
2506        if desired_holds.is_empty() {
2507            return Ok(vec![]);
2508        }
2509
2510        let mut collections = self.collections.lock().expect("lock poisoned");
2511
2512        let mut advanced_holds = Vec::new();
2513        // We advance the holds by our current since frontier. Can't acquire
2514        // holds for times that have been compacted away!
2515        //
2516        // NOTE: We acquire read holds at the earliest possible time rather than
2517        // at the implied capability. This is so that, for example, adapter can
2518        // acquire a read hold to hold back the frontier, giving the COMPUTE
2519        // controller a chance to also acquire a read hold at that early
2520        // frontier. If/when we change the interplay between adapter and COMPUTE
2521        // to pass around ReadHold tokens, we might tighten this up and instead
2522        // acquire read holds at the implied capability.
2523        for id in desired_holds.iter() {
2524            let collection = collections
2525                .get(id)
2526                .ok_or(ReadHoldError::CollectionMissing(*id))?;
2527            let since = collection.read_capabilities.frontier().to_owned();
2528            advanced_holds.push((*id, since));
2529        }
2530
2531        let mut updates = advanced_holds
2532            .iter()
2533            .map(|(id, hold)| {
2534                let mut changes = ChangeBatch::new();
2535                changes.extend(hold.iter().map(|time| (time.clone(), 1)));
2536                (*id, changes)
2537            })
2538            .collect::<BTreeMap<_, _>>();
2539
2540        StorageCollectionsImpl::update_read_capabilities_inner(
2541            &self.cmd_tx,
2542            &mut collections,
2543            &mut updates,
2544        );
2545
2546        let acquired_holds = advanced_holds
2547            .into_iter()
2548            .map(|(id, since)| ReadHold::with_channel(id, since, self.holds_tx.clone()))
2549            .collect_vec();
2550
2551        trace!(?desired_holds, ?acquired_holds, "acquire_read_holds");
2552
2553        Ok(acquired_holds)
2554    }
2555
2556    /// Determine time dependence information for the object.
2557    fn determine_time_dependence(
2558        &self,
2559        id: GlobalId,
2560    ) -> Result<Option<TimeDependence>, TimeDependenceError> {
2561        use TimeDependenceError::CollectionMissing;
2562        let collections = self.collections.lock().expect("lock poisoned");
2563        let mut collection = Some(collections.get(&id).ok_or(CollectionMissing(id))?);
2564
2565        let mut result = None;
2566
2567        while let Some(c) = collection.take() {
2568            use DataSource::*;
2569            if let Some(timeline) = &c.description.timeline {
2570                // Only the epoch timeline follows wall-clock.
2571                if *timeline != Timeline::EpochMilliseconds {
2572                    break;
2573                }
2574            }
2575            match &c.description.data_source {
2576                Ingestion(ingestion) => {
2577                    use GenericSourceConnection::*;
2578                    match ingestion.desc.connection {
2579                        // Kafka, Postgres, MySql, and SQL Server sources all
2580                        // follow wall clock.
2581                        Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => {
2582                            result = Some(TimeDependence::default())
2583                        }
2584                        // Load generators not further specified.
2585                        LoadGenerator(_) => {}
2586                    }
2587                }
2588                IngestionExport { ingestion_id, .. } => {
2589                    let c = collections
2590                        .get(ingestion_id)
2591                        .ok_or(CollectionMissing(*ingestion_id))?;
2592                    collection = Some(c);
2593                }
2594                // Introspection, other, progress, table, and webhook sources follow wall clock.
2595                Introspection(_) | Progress | Table { .. } | Webhook { .. } => {
2596                    result = Some(TimeDependence::default())
2597                }
2598                // Materialized views, continual tasks, etc, aren't managed by storage.
2599                Other => {}
2600                Sink { .. } => {}
2601            };
2602        }
2603        Ok(result)
2604    }
2605}
2606
2607/// Wraps either a "critical" [SinceHandle] or a leased [ReadHandle].
2608///
2609/// When a [StorageCollections] is in read-only mode, we will only ever acquire
2610/// [ReadHandle], because acquiring the [SinceHandle] and driving forward its
2611/// since is considered a write. Conversely, when in read-write mode, we acquire
2612/// [SinceHandle].
2613#[derive(Debug)]
2614enum SinceHandleWrapper<T>
2615where
2616    T: TimelyTimestamp + Lattice + Codec64,
2617{
2618    Critical(SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch>),
2619    Leased(ReadHandle<SourceData, (), T, StorageDiff>),
2620}
2621
2622impl<T> SinceHandleWrapper<T>
2623where
2624    T: TimelyTimestamp + Lattice + Codec64 + TotalOrder + Sync,
2625{
2626    pub fn since(&self) -> &Antichain<T> {
2627        match self {
2628            Self::Critical(handle) => handle.since(),
2629            Self::Leased(handle) => handle.since(),
2630        }
2631    }
2632
2633    pub fn opaque(&self) -> PersistEpoch {
2634        match self {
2635            Self::Critical(handle) => handle.opaque().clone(),
2636            Self::Leased(_handle) => {
2637                // The opaque is expected to be used with
2638                // `compare_and_downgrade_since`, and the leased handle doesn't
2639                // have a notion of an opaque. We pretend here and in
2640                // `compare_and_downgrade_since`.
2641                PersistEpoch(None)
2642            }
2643        }
2644    }
2645
2646    pub async fn compare_and_downgrade_since(
2647        &mut self,
2648        expected: &PersistEpoch,
2649        new: (&PersistEpoch, &Antichain<T>),
2650    ) -> Result<Antichain<T>, PersistEpoch> {
2651        match self {
2652            Self::Critical(handle) => handle.compare_and_downgrade_since(expected, new).await,
2653            Self::Leased(handle) => {
2654                let (opaque, since) = new;
2655                assert_none!(opaque.0);
2656
2657                handle.downgrade_since(since).await;
2658
2659                Ok(since.clone())
2660            }
2661        }
2662    }
2663
2664    pub async fn maybe_compare_and_downgrade_since(
2665        &mut self,
2666        expected: &PersistEpoch,
2667        new: (&PersistEpoch, &Antichain<T>),
2668    ) -> Option<Result<Antichain<T>, PersistEpoch>> {
2669        match self {
2670            Self::Critical(handle) => {
2671                handle
2672                    .maybe_compare_and_downgrade_since(expected, new)
2673                    .await
2674            }
2675            Self::Leased(handle) => {
2676                let (opaque, since) = new;
2677                assert_none!(opaque.0);
2678
2679                handle.maybe_downgrade_since(since).await;
2680
2681                Some(Ok(since.clone()))
2682            }
2683        }
2684    }
2685
2686    pub fn snapshot_stats(
2687        &self,
2688        id: GlobalId,
2689        as_of: Option<Antichain<T>>,
2690    ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2691        match self {
2692            Self::Critical(handle) => {
2693                let res = handle
2694                    .snapshot_stats(as_of)
2695                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2696                Box::pin(res)
2697            }
2698            Self::Leased(handle) => {
2699                let res = handle
2700                    .snapshot_stats(as_of)
2701                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2702                Box::pin(res)
2703            }
2704        }
2705    }
2706
2707    pub fn snapshot_stats_from_txn(
2708        &self,
2709        id: GlobalId,
2710        data_snapshot: DataSnapshot<T>,
2711    ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2712        match self {
2713            Self::Critical(handle) => Box::pin(
2714                data_snapshot
2715                    .snapshot_stats_from_critical(handle)
2716                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2717            ),
2718            Self::Leased(handle) => Box::pin(
2719                data_snapshot
2720                    .snapshot_stats_from_leased(handle)
2721                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2722            ),
2723        }
2724    }
2725}
2726
2727/// State maintained about individual collections.
2728#[derive(Debug, Clone)]
2729struct CollectionState<T> {
2730    /// Description with which the collection was created
2731    pub description: CollectionDescription<T>,
2732
2733    /// Accumulation of read capabilities for the collection.
2734    ///
2735    /// This accumulation will always contain `self.implied_capability`, but may
2736    /// also contain capabilities held by others who have read dependencies on
2737    /// this collection.
2738    pub read_capabilities: MutableAntichain<T>,
2739
2740    /// The implicit capability associated with collection creation.  This
2741    /// should never be less than the since of the associated persist
2742    /// collection.
2743    pub implied_capability: Antichain<T>,
2744
2745    /// The policy to use to downgrade `self.implied_capability`.
2746    pub read_policy: ReadPolicy<T>,
2747
2748    /// Storage identifiers on which this collection depends.
2749    pub storage_dependencies: Vec<GlobalId>,
2750
2751    /// Reported write frontier.
2752    pub write_frontier: Antichain<T>,
2753
2754    pub collection_metadata: CollectionMetadata,
2755}
2756
2757impl<T: TimelyTimestamp> CollectionState<T> {
2758    /// Creates a new collection state, with an initial read policy valid from
2759    /// `since`.
2760    pub fn new(
2761        description: CollectionDescription<T>,
2762        since: Antichain<T>,
2763        write_frontier: Antichain<T>,
2764        storage_dependencies: Vec<GlobalId>,
2765        metadata: CollectionMetadata,
2766    ) -> Self {
2767        let mut read_capabilities = MutableAntichain::new();
2768        read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2769        Self {
2770            description,
2771            read_capabilities,
2772            implied_capability: since.clone(),
2773            read_policy: ReadPolicy::NoPolicy {
2774                initial_since: since,
2775            },
2776            storage_dependencies,
2777            write_frontier,
2778            collection_metadata: metadata,
2779        }
2780    }
2781
2782    /// Returns whether the collection was dropped.
2783    pub fn is_dropped(&self) -> bool {
2784        self.read_capabilities.is_empty()
2785    }
2786}
2787
2788/// A task that keeps persist handles, downgrades sinces when asked,
2789/// periodically gets recent uppers from them, and updates the shard collection
2790/// state when needed.
2791///
2792/// This shares state with [StorageCollectionsImpl] via `Arcs` and channels.
2793#[derive(Debug)]
2794struct BackgroundTask<T: TimelyTimestamp + Lattice + Codec64> {
2795    config: Arc<Mutex<StorageConfiguration>>,
2796    cmds_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
2797    cmds_rx: mpsc::UnboundedReceiver<BackgroundCmd<T>>,
2798    holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch<T>)>,
2799    finalizable_shards: Arc<ShardIdSet>,
2800    collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
2801    // So we know what shard ID corresponds to what global ID, which we need
2802    // when re-enqueing futures for determining the next upper update.
2803    shard_by_id: BTreeMap<GlobalId, ShardId>,
2804    since_handles: BTreeMap<GlobalId, SinceHandleWrapper<T>>,
2805    txns_handle: Option<WriteHandle<SourceData, (), T, StorageDiff>>,
2806    txns_shards: BTreeSet<GlobalId>,
2807}
2808
2809#[derive(Debug)]
2810enum BackgroundCmd<T: TimelyTimestamp + Lattice + Codec64> {
2811    Register {
2812        id: GlobalId,
2813        is_in_txns: bool,
2814        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
2815        since_handle: SinceHandleWrapper<T>,
2816    },
2817    DowngradeSince(Vec<(GlobalId, Antichain<T>)>),
2818    SnapshotStats(
2819        GlobalId,
2820        SnapshotStatsAsOf<T>,
2821        oneshot::Sender<SnapshotStatsRes<T>>,
2822    ),
2823}
2824
2825/// A newtype wrapper to hang a Debug impl off of.
2826pub(crate) struct SnapshotStatsRes<T>(BoxFuture<'static, Result<SnapshotStats, StorageError<T>>>);
2827
2828impl<T> Debug for SnapshotStatsRes<T> {
2829    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2830        f.debug_struct("SnapshotStatsRes").finish_non_exhaustive()
2831    }
2832}
2833
2834impl<T> BackgroundTask<T>
2835where
2836    T: TimelyTimestamp
2837        + Lattice
2838        + Codec64
2839        + From<EpochMillis>
2840        + TimestampManipulation
2841        + Into<mz_repr::Timestamp>
2842        + Sync,
2843{
2844    async fn run(&mut self) {
2845        // Futures that fetch the recent upper from all other shards.
2846        let mut upper_futures: FuturesUnordered<
2847            std::pin::Pin<
2848                Box<
2849                    dyn Future<
2850                            Output = (
2851                                GlobalId,
2852                                WriteHandle<SourceData, (), T, StorageDiff>,
2853                                Antichain<T>,
2854                            ),
2855                        > + Send,
2856                >,
2857            >,
2858        > = FuturesUnordered::new();
2859
2860        let gen_upper_future =
2861            |id, mut handle: WriteHandle<_, _, _, _>, prev_upper: Antichain<T>| {
2862                let fut = async move {
2863                    soft_assert_or_log!(
2864                        !prev_upper.is_empty(),
2865                        "cannot await progress when upper is already empty"
2866                    );
2867                    handle.wait_for_upper_past(&prev_upper).await;
2868                    let new_upper = handle.shared_upper();
2869                    (id, handle, new_upper)
2870                };
2871
2872                fut
2873            };
2874
2875        let mut txns_upper_future = match self.txns_handle.take() {
2876            Some(txns_handle) => {
2877                let upper = txns_handle.upper().clone();
2878                let txns_upper_future =
2879                    gen_upper_future(GlobalId::Transient(1), txns_handle, upper);
2880                txns_upper_future.boxed()
2881            }
2882            None => async { std::future::pending().await }.boxed(),
2883        };
2884
2885        loop {
2886            tokio::select! {
2887                (id, handle, upper) = &mut txns_upper_future => {
2888                    trace!("new upper from txns shard: {:?}", upper);
2889                    let mut uppers = Vec::new();
2890                    for id in self.txns_shards.iter() {
2891                        uppers.push((*id, &upper));
2892                    }
2893                    self.update_write_frontiers(&uppers).await;
2894
2895                    let fut = gen_upper_future(id, handle, upper);
2896                    txns_upper_future = fut.boxed();
2897                }
2898                Some((id, handle, upper)) = upper_futures.next() => {
2899                    if id.is_user() {
2900                        trace!("new upper for collection {id}: {:?}", upper);
2901                    }
2902                    let current_shard = self.shard_by_id.get(&id);
2903                    if let Some(shard_id) = current_shard {
2904                        if shard_id == &handle.shard_id() {
2905                            // Still current, so process the update and enqueue
2906                            // again!
2907                            let uppers = &[(id, &upper)];
2908                            self.update_write_frontiers(uppers).await;
2909                            if !upper.is_empty() {
2910                                let fut = gen_upper_future(id, handle, upper);
2911                                upper_futures.push(fut.boxed());
2912                            }
2913                        } else {
2914                            // Be polite and expire the write handle. This can
2915                            // happen when we get an upper update for a write
2916                            // handle that has since been replaced via Update.
2917                            handle.expire().await;
2918                        }
2919                    }
2920                }
2921                cmd = self.cmds_rx.recv() => {
2922                    let cmd = if let Some(cmd) = cmd {
2923                        cmd
2924                    } else {
2925                        // We're done!
2926                        break;
2927                    };
2928
2929                    match cmd {
2930                        BackgroundCmd::Register{ id, is_in_txns, write_handle, since_handle } => {
2931                            debug!("registering handles for {}", id);
2932                            let previous = self.shard_by_id.insert(id, write_handle.shard_id());
2933                            if previous.is_some() {
2934                                panic!("already registered a WriteHandle for collection {id}");
2935                            }
2936
2937                            let previous = self.since_handles.insert(id, since_handle);
2938                            if previous.is_some() {
2939                                panic!("already registered a SinceHandle for collection {id}");
2940                            }
2941
2942                            if is_in_txns {
2943                                self.txns_shards.insert(id);
2944                            } else {
2945                                let upper = write_handle.upper().clone();
2946                                if !upper.is_empty() {
2947                                    let fut = gen_upper_future(id, write_handle, upper);
2948                                    upper_futures.push(fut.boxed());
2949                                }
2950                            }
2951
2952                        }
2953                        BackgroundCmd::DowngradeSince(cmds) => {
2954                            self.downgrade_sinces(cmds).await;
2955                        }
2956                        BackgroundCmd::SnapshotStats(id, as_of, tx) => {
2957                            // NB: The requested as_of could be arbitrarily far
2958                            // in the future. So, in order to avoid blocking
2959                            // this loop until it's available and the
2960                            // `snapshot_stats` call resolves, instead return
2961                            // the future to the caller and await it there.
2962                            let res = match self.since_handles.get(&id) {
2963                                Some(x) => {
2964                                    let fut: BoxFuture<
2965                                        'static,
2966                                        Result<SnapshotStats, StorageError<T>>,
2967                                    > = match as_of {
2968                                        SnapshotStatsAsOf::Direct(as_of) => {
2969                                            x.snapshot_stats(id, Some(as_of))
2970                                        }
2971                                        SnapshotStatsAsOf::Txns(data_snapshot) => {
2972                                            x.snapshot_stats_from_txn(id, data_snapshot)
2973                                        }
2974                                    };
2975                                    SnapshotStatsRes(fut)
2976                                }
2977                                None => SnapshotStatsRes(Box::pin(futures::future::ready(Err(
2978                                    StorageError::IdentifierMissing(id),
2979                                )))),
2980                            };
2981                            // It's fine if the listener hung up.
2982                            let _ = tx.send(res);
2983                        }
2984                    }
2985                }
2986                Some(holds_changes) = self.holds_rx.recv() => {
2987                    let mut batched_changes = BTreeMap::new();
2988                    batched_changes.insert(holds_changes.0, holds_changes.1);
2989
2990                    while let Ok(mut holds_changes) = self.holds_rx.try_recv() {
2991                        let entry = batched_changes.entry(holds_changes.0);
2992                        entry
2993                            .and_modify(|existing| existing.extend(holds_changes.1.drain()))
2994                            .or_insert_with(|| holds_changes.1);
2995                    }
2996
2997                    let mut collections = self.collections.lock().expect("lock poisoned");
2998
2999                    let user_changes = batched_changes
3000                        .iter()
3001                        .filter(|(id, _c)| id.is_user())
3002                        .map(|(id, c)| {
3003                            (id.clone(), c.clone())
3004                        })
3005                        .collect_vec();
3006
3007                    if !user_changes.is_empty() {
3008                        trace!(?user_changes, "applying holds changes from channel");
3009                    }
3010
3011                    StorageCollectionsImpl::update_read_capabilities_inner(
3012                        &self.cmds_tx,
3013                        &mut collections,
3014                        &mut batched_changes,
3015                    );
3016                }
3017            }
3018        }
3019
3020        warn!("BackgroundTask shutting down");
3021    }
3022
3023    #[instrument(level = "debug")]
3024    async fn update_write_frontiers(&mut self, updates: &[(GlobalId, &Antichain<T>)]) {
3025        let mut read_capability_changes = BTreeMap::default();
3026
3027        let mut self_collections = self.collections.lock().expect("lock poisoned");
3028
3029        for (id, new_upper) in updates.iter() {
3030            let collection = if let Some(c) = self_collections.get_mut(id) {
3031                c
3032            } else {
3033                trace!(
3034                    "Reference to absent collection {id}, due to concurrent removal of that collection"
3035                );
3036                continue;
3037            };
3038
3039            if PartialOrder::less_than(&collection.write_frontier, *new_upper) {
3040                collection.write_frontier.clone_from(new_upper);
3041            }
3042
3043            let mut new_read_capability = collection
3044                .read_policy
3045                .frontier(collection.write_frontier.borrow());
3046
3047            if id.is_user() {
3048                trace!(
3049                    %id,
3050                    implied_capability = ?collection.implied_capability,
3051                    policy = ?collection.read_policy,
3052                    write_frontier = ?collection.write_frontier,
3053                    ?new_read_capability,
3054                    "update_write_frontiers");
3055            }
3056
3057            if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
3058                let mut update = ChangeBatch::new();
3059                update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
3060                std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
3061                update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
3062
3063                if !update.is_empty() {
3064                    read_capability_changes.insert(*id, update);
3065                }
3066            }
3067        }
3068
3069        if !read_capability_changes.is_empty() {
3070            StorageCollectionsImpl::update_read_capabilities_inner(
3071                &self.cmds_tx,
3072                &mut self_collections,
3073                &mut read_capability_changes,
3074            );
3075        }
3076    }
3077
3078    async fn downgrade_sinces(&mut self, cmds: Vec<(GlobalId, Antichain<T>)>) {
3079        for (id, new_since) in cmds {
3080            let since_handle = if let Some(c) = self.since_handles.get_mut(&id) {
3081                c
3082            } else {
3083                // This can happen when someone concurrently drops a collection.
3084                trace!("downgrade_sinces: reference to absent collection {id}");
3085                continue;
3086            };
3087
3088            if id.is_user() {
3089                trace!("downgrading since of {} to {:?}", id, new_since);
3090            }
3091
3092            let epoch = since_handle.opaque().clone();
3093            let result = if new_since.is_empty() {
3094                // A shard's since reaching the empty frontier is a prereq for
3095                // being able to finalize a shard, so the final downgrade should
3096                // never be rate-limited.
3097                let res = Some(
3098                    since_handle
3099                        .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3100                        .await,
3101                );
3102
3103                info!(%id, "removing persist handles because the since advanced to []!");
3104
3105                let _since_handle = self.since_handles.remove(&id).expect("known to exist");
3106                let dropped_shard_id = if let Some(shard_id) = self.shard_by_id.remove(&id) {
3107                    shard_id
3108                } else {
3109                    panic!("missing GlobalId -> ShardId mapping for id {id}");
3110                };
3111
3112                // We're not responsible for writes to tables, so we also don't
3113                // de-register them from the txn system. Whoever is responsible
3114                // will remove them. We only make sure to remove the table from
3115                // our tracking.
3116                self.txns_shards.remove(&id);
3117
3118                if !self
3119                    .config
3120                    .lock()
3121                    .expect("lock poisoned")
3122                    .parameters
3123                    .finalize_shards
3124                {
3125                    info!(
3126                        "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3127                    );
3128                    return;
3129                }
3130
3131                info!(%id, %dropped_shard_id, "enqueing shard finalization due to dropped collection and dropped persist handle");
3132
3133                self.finalizable_shards.lock().insert(dropped_shard_id);
3134
3135                res
3136            } else {
3137                since_handle
3138                    .maybe_compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3139                    .await
3140            };
3141
3142            if let Some(Err(other_epoch)) = result {
3143                mz_ore::halt!("fenced by envd @ {other_epoch:?}. ours = {epoch:?}");
3144            }
3145        }
3146    }
3147}
3148
3149struct FinalizeShardsTaskConfig {
3150    envd_epoch: NonZeroI64,
3151    config: Arc<Mutex<StorageConfiguration>>,
3152    metrics: StorageCollectionsMetrics,
3153    finalizable_shards: Arc<ShardIdSet>,
3154    finalized_shards: Arc<ShardIdSet>,
3155    persist_location: PersistLocation,
3156    persist: Arc<PersistClientCache>,
3157    read_only: bool,
3158}
3159
3160async fn finalize_shards_task<T>(
3161    FinalizeShardsTaskConfig {
3162        envd_epoch,
3163        config,
3164        metrics,
3165        finalizable_shards,
3166        finalized_shards,
3167        persist_location,
3168        persist,
3169        read_only,
3170    }: FinalizeShardsTaskConfig,
3171) where
3172    T: TimelyTimestamp + TotalOrder + Lattice + Codec64 + Sync,
3173{
3174    if read_only {
3175        info!("disabling shard finalization in read only mode");
3176        return;
3177    }
3178
3179    let mut interval = tokio::time::interval(Duration::from_secs(5));
3180    interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
3181    loop {
3182        interval.tick().await;
3183
3184        if !config
3185            .lock()
3186            .expect("lock poisoned")
3187            .parameters
3188            .finalize_shards
3189        {
3190            debug!(
3191                "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3192            );
3193            continue;
3194        }
3195
3196        let current_finalizable_shards = {
3197            // We hold the lock for as short as possible and pull our cloned set
3198            // of shards.
3199            finalizable_shards.lock().iter().cloned().collect_vec()
3200        };
3201
3202        if current_finalizable_shards.is_empty() {
3203            debug!("no shards to finalize");
3204            continue;
3205        }
3206
3207        debug!(?current_finalizable_shards, "attempting to finalize shards");
3208
3209        // Open a persist client to delete unused shards.
3210        let persist_client = persist.open(persist_location.clone()).await.unwrap();
3211
3212        let metrics = &metrics;
3213        let finalizable_shards = &finalizable_shards;
3214        let finalized_shards = &finalized_shards;
3215        let persist_client = &persist_client;
3216        let diagnostics = &Diagnostics::from_purpose("finalizing shards");
3217
3218        let force_downgrade_since = STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION
3219            .get(config.lock().expect("lock poisoned").config_set());
3220
3221        let epoch = &PersistEpoch::from(envd_epoch);
3222
3223        futures::stream::iter(current_finalizable_shards.clone())
3224            .map(|shard_id| async move {
3225                let persist_client = persist_client.clone();
3226                let diagnostics = diagnostics.clone();
3227                let epoch = epoch.clone();
3228
3229                metrics.finalization_started.inc();
3230
3231                let is_finalized = persist_client
3232                    .is_finalized::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
3233                    .await
3234                    .expect("invalid persist usage");
3235
3236                if is_finalized {
3237                    debug!(%shard_id, "shard is already finalized!");
3238                    Some(shard_id)
3239                } else {
3240                    debug!(%shard_id, "finalizing shard");
3241                        let finalize = || async move {
3242                        // TODO: thread the global ID into the shard finalization WAL
3243                        let diagnostics = Diagnostics::from_purpose("finalizing shards");
3244
3245                        let schemas = persist_client.latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics.clone()).await.expect("codecs have not changed");
3246                        let (key_schema, val_schema) = match schemas {
3247                            Some((_, key_schema, val_schema)) => (key_schema, val_schema),
3248                            None => (RelationDesc::empty(), UnitSchema),
3249                        };
3250
3251                        let empty_batch: Vec<((SourceData, ()), T, StorageDiff)> = vec![];
3252                        let mut write_handle: WriteHandle<SourceData, (), T, StorageDiff> =
3253                            persist_client
3254                                .open_writer(
3255                                    shard_id,
3256                                    Arc::new(key_schema),
3257                                    Arc::new(val_schema),
3258                                    diagnostics,
3259                                )
3260                                .await
3261                                .expect("invalid persist usage");
3262
3263                        let upper = write_handle.upper();
3264
3265                        if !upper.is_empty() {
3266                            let append = write_handle
3267                                .append(empty_batch, upper.clone(), Antichain::new())
3268                                .await?;
3269
3270                            if let Err(e) = append {
3271                                warn!(%shard_id, "tried to finalize a shard with an advancing upper: {e:?}");
3272                                return Ok(());
3273                            }
3274                        }
3275                        write_handle.expire().await;
3276
3277                        if force_downgrade_since {
3278                            let mut since_handle: SinceHandle<
3279                                SourceData,
3280                                (),
3281                                T,
3282                                StorageDiff,
3283                                PersistEpoch,
3284                            > = persist_client
3285                                .open_critical_since(
3286                                    shard_id,
3287                                    PersistClient::CONTROLLER_CRITICAL_SINCE,
3288                                    Diagnostics::from_purpose("finalizing shards"),
3289                                )
3290                                .await
3291                                .expect("invalid persist usage");
3292                            let handle_epoch = since_handle.opaque().clone();
3293                            let our_epoch = epoch.clone();
3294                            let epoch = if our_epoch.0 > handle_epoch.0 {
3295                                // We're newer, but it's fine to use the
3296                                // handle's old epoch to try and downgrade.
3297                                handle_epoch
3298                            } else {
3299                                // Good luck, buddy! The downgrade below will
3300                                // not succeed. There's a process with a newer
3301                                // epoch out there and someone at some juncture
3302                                // will fence out this process.
3303                                our_epoch
3304                            };
3305                            let new_since = Antichain::new();
3306                            let downgrade = since_handle
3307                                .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3308                                .await;
3309                            if let Err(e) = downgrade {
3310                                warn!(
3311                                    "tried to finalize a shard with an advancing epoch: {e:?}"
3312                                );
3313                                return Ok(());
3314                            }
3315                            // Not available now, so finalization is broken.
3316                            // since_handle.expire().await;
3317                        }
3318
3319                        persist_client
3320                            .finalize_shard::<SourceData, (), T, StorageDiff>(
3321                                shard_id,
3322                                Diagnostics::from_purpose("finalizing shards"),
3323                            )
3324                            .await
3325                    };
3326
3327                    match finalize().await {
3328                        Err(e) => {
3329                            // Rather than error, just leave this shard as
3330                            // one to finalize later.
3331                            warn!("error during finalization of shard {shard_id}: {e:?}");
3332                            None
3333                        }
3334                        Ok(()) => {
3335                            debug!(%shard_id, "finalize success!");
3336                            Some(shard_id)
3337                        }
3338                    }
3339                }
3340            })
3341            // Poll each future for each collection concurrently, maximum of 10
3342            // at a time.
3343            // TODO(benesch): the concurrency here should be configurable
3344            // via LaunchDarkly.
3345            .buffer_unordered(10)
3346            // HERE BE DRAGONS: see warning on other uses of buffer_unordered.
3347            // The closure passed to `for_each` must remain fast or we risk
3348            // starving the finalization futures of calls to `poll`.
3349            .for_each(|shard_id| async move {
3350                match shard_id {
3351                    None => metrics.finalization_failed.inc(),
3352                    Some(shard_id) => {
3353                        // We make successfully finalized shards available for
3354                        // removal from the finalization WAL one by one, so that
3355                        // a handful of stuck shards don't prevent us from
3356                        // removing the shards that have made progress. The
3357                        // overhead of repeatedly acquiring and releasing the
3358                        // locks is negligible.
3359                        {
3360                            let mut finalizable_shards = finalizable_shards.lock();
3361                            let mut finalized_shards = finalized_shards.lock();
3362                            finalizable_shards.remove(&shard_id);
3363                            finalized_shards.insert(shard_id);
3364                        }
3365
3366                        metrics.finalization_succeeded.inc();
3367                    }
3368                }
3369            })
3370            .await;
3371
3372        debug!("done finalizing shards");
3373    }
3374}
3375
3376#[derive(Debug)]
3377pub(crate) enum SnapshotStatsAsOf<T: TimelyTimestamp + Lattice + Codec64> {
3378    /// Stats for a shard with an "eager" upper (one that continually advances
3379    /// as time passes, even if no writes are coming in).
3380    Direct(Antichain<T>),
3381    /// Stats for a shard with a "lazy" upper (one that only physically advances
3382    /// in response to writes).
3383    Txns(DataSnapshot<T>),
3384}
3385
3386#[cfg(test)]
3387mod tests {
3388    use std::str::FromStr;
3389    use std::sync::Arc;
3390
3391    use mz_build_info::DUMMY_BUILD_INFO;
3392    use mz_dyncfg::ConfigSet;
3393    use mz_ore::assert_err;
3394    use mz_ore::metrics::{MetricsRegistry, UIntGauge};
3395    use mz_ore::now::SYSTEM_TIME;
3396    use mz_ore::url::SensitiveUrl;
3397    use mz_persist_client::cache::PersistClientCache;
3398    use mz_persist_client::cfg::PersistConfig;
3399    use mz_persist_client::rpc::PubSubClientConnection;
3400    use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
3401    use mz_persist_types::codec_impls::UnitSchema;
3402    use mz_repr::{RelationDesc, Row};
3403    use mz_secrets::InMemorySecretsController;
3404
3405    use super::*;
3406
3407    #[mz_ore::test(tokio::test)]
3408    #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr`
3409    async fn test_snapshot_stats(&self) {
3410        let persist_location = PersistLocation {
3411            blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3412            consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3413        };
3414        let persist_client = PersistClientCache::new(
3415            PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
3416            &MetricsRegistry::new(),
3417            |_, _| PubSubClientConnection::noop(),
3418        );
3419        let persist_client = Arc::new(persist_client);
3420
3421        let (cmds_tx, mut background_task) =
3422            BackgroundTask::new_for_test(persist_location.clone(), Arc::clone(&persist_client));
3423        let background_task =
3424            mz_ore::task::spawn(|| "storage_collections::background_task", async move {
3425                background_task.run().await
3426            });
3427
3428        let persist = persist_client.open(persist_location).await.unwrap();
3429
3430        let shard_id = ShardId::new();
3431        let since_handle = persist
3432            .open_critical_since(
3433                shard_id,
3434                PersistClient::CONTROLLER_CRITICAL_SINCE,
3435                Diagnostics::for_tests(),
3436            )
3437            .await
3438            .unwrap();
3439        let write_handle = persist
3440            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3441                shard_id,
3442                Arc::new(RelationDesc::empty()),
3443                Arc::new(UnitSchema),
3444                Diagnostics::for_tests(),
3445            )
3446            .await
3447            .unwrap();
3448
3449        cmds_tx
3450            .send(BackgroundCmd::Register {
3451                id: GlobalId::User(1),
3452                is_in_txns: false,
3453                since_handle: SinceHandleWrapper::Critical(since_handle),
3454                write_handle,
3455            })
3456            .unwrap();
3457
3458        let mut write_handle = persist
3459            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3460                shard_id,
3461                Arc::new(RelationDesc::empty()),
3462                Arc::new(UnitSchema),
3463                Diagnostics::for_tests(),
3464            )
3465            .await
3466            .unwrap();
3467
3468        // No stats for unknown GlobalId.
3469        let stats =
3470            snapshot_stats(&cmds_tx, GlobalId::User(2), Antichain::from_elem(0.into())).await;
3471        assert_err!(stats);
3472
3473        // Stats don't resolve for as_of past the upper.
3474        let stats_fut = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3475        assert_none!(stats_fut.now_or_never());
3476
3477        // // Call it again because now_or_never consumed our future and it's not clone-able.
3478        let stats_ts1_fut =
3479            snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3480
3481        // Write some data.
3482        let data = (
3483            (SourceData(Ok(Row::default())), ()),
3484            mz_repr::Timestamp::from(0),
3485            1i64,
3486        );
3487        let () = write_handle
3488            .compare_and_append(
3489                &[data],
3490                Antichain::from_elem(0.into()),
3491                Antichain::from_elem(1.into()),
3492            )
3493            .await
3494            .unwrap()
3495            .unwrap();
3496
3497        // Verify that we can resolve stats for ts 0 while the ts 1 stats call is outstanding.
3498        let stats = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(0.into()))
3499            .await
3500            .unwrap();
3501        assert_eq!(stats.num_updates, 1);
3502
3503        // Write more data and unblock the ts 1 call
3504        let data = (
3505            (SourceData(Ok(Row::default())), ()),
3506            mz_repr::Timestamp::from(1),
3507            1i64,
3508        );
3509        let () = write_handle
3510            .compare_and_append(
3511                &[data],
3512                Antichain::from_elem(1.into()),
3513                Antichain::from_elem(2.into()),
3514            )
3515            .await
3516            .unwrap()
3517            .unwrap();
3518
3519        let stats = stats_ts1_fut.await.unwrap();
3520        assert_eq!(stats.num_updates, 2);
3521
3522        // Make sure it runs until at least here.
3523        drop(background_task);
3524    }
3525
3526    async fn snapshot_stats<T: TimelyTimestamp + Lattice + Codec64>(
3527        cmds_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
3528        id: GlobalId,
3529        as_of: Antichain<T>,
3530    ) -> Result<SnapshotStats, StorageError<T>> {
3531        let (tx, rx) = oneshot::channel();
3532        cmds_tx
3533            .send(BackgroundCmd::SnapshotStats(
3534                id,
3535                SnapshotStatsAsOf::Direct(as_of),
3536                tx,
3537            ))
3538            .unwrap();
3539        let res = rx.await.expect("BackgroundTask should be live").0;
3540
3541        res.await
3542    }
3543
3544    impl<T: TimelyTimestamp + Lattice + Codec64> BackgroundTask<T> {
3545        fn new_for_test(
3546            _persist_location: PersistLocation,
3547            _persist_client: Arc<PersistClientCache>,
3548        ) -> (mpsc::UnboundedSender<BackgroundCmd<T>>, Self) {
3549            let (cmds_tx, cmds_rx) = mpsc::unbounded_channel();
3550            let (_holds_tx, holds_rx) = mpsc::unbounded_channel();
3551            let connection_context =
3552                ConnectionContext::for_tests(Arc::new(InMemorySecretsController::new()));
3553
3554            let task = Self {
3555                config: Arc::new(Mutex::new(StorageConfiguration::new(
3556                    connection_context,
3557                    ConfigSet::default(),
3558                ))),
3559                cmds_tx: cmds_tx.clone(),
3560                cmds_rx,
3561                holds_rx,
3562                finalizable_shards: Arc::new(ShardIdSet::new(
3563                    UIntGauge::new("finalizable_shards", "dummy gauge for tests").unwrap(),
3564                )),
3565                collections: Arc::new(Mutex::new(BTreeMap::new())),
3566                shard_by_id: BTreeMap::new(),
3567                since_handles: BTreeMap::new(),
3568                txns_handle: None,
3569                txns_shards: BTreeSet::new(),
3570            };
3571
3572            (cmds_tx, task)
3573        }
3574    }
3575}