mz_storage_client/
storage_collections.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An abstraction for dealing with storage collections.
11
12use std::cmp::Reverse;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::num::NonZeroI64;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use async_trait::async_trait;
20use differential_dataflow::lattice::Lattice;
21use futures::future::BoxFuture;
22use futures::stream::{BoxStream, FuturesUnordered};
23use futures::{Future, FutureExt, StreamExt};
24use itertools::Itertools;
25
26use mz_ore::collections::CollectionExt;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::now::{EpochMillis, NowFn};
29use mz_ore::task::AbortOnDropHandle;
30use mz_ore::{assert_none, instrument, soft_assert_or_log};
31use mz_persist_client::cache::PersistClientCache;
32use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
33use mz_persist_client::critical::SinceHandle;
34use mz_persist_client::read::{Cursor, ReadHandle};
35use mz_persist_client::schema::CaESchema;
36use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
37use mz_persist_client::write::WriteHandle;
38use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
39use mz_persist_types::Codec64;
40use mz_persist_types::codec_impls::UnitSchema;
41use mz_persist_types::txn::TxnsCodec;
42use mz_repr::{GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
43use mz_storage_types::StorageDiff;
44use mz_storage_types::configuration::StorageConfiguration;
45use mz_storage_types::connections::ConnectionContext;
46use mz_storage_types::connections::inline::InlinedConnection;
47use mz_storage_types::controller::{CollectionMetadata, StorageError, TxnsCodecRow};
48use mz_storage_types::dyncfgs::STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION;
49use mz_storage_types::parameters::StorageParameters;
50use mz_storage_types::read_holds::{ReadHold, ReadHoldError};
51use mz_storage_types::read_policy::ReadPolicy;
52use mz_storage_types::sources::{
53    GenericSourceConnection, SourceData, SourceDesc, SourceEnvelope, SourceExport,
54    SourceExportDataConfig, Timeline,
55};
56use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
57use mz_txn_wal::metrics::Metrics as TxnMetrics;
58use mz_txn_wal::txn_read::{DataSnapshot, TxnsRead};
59use mz_txn_wal::txns::TxnsHandle;
60use timely::PartialOrder;
61use timely::order::TotalOrder;
62use timely::progress::frontier::MutableAntichain;
63use timely::progress::{Antichain, ChangeBatch, Timestamp as TimelyTimestamp};
64use tokio::sync::{mpsc, oneshot};
65use tokio::time::MissedTickBehavior;
66use tracing::{debug, info, trace, warn};
67
68use crate::client::TimestamplessUpdateBuilder;
69use crate::controller::{
70    CollectionDescription, DataSource, PersistEpoch, StorageMetadata, StorageTxn,
71};
72use crate::storage_collections::metrics::{ShardIdSet, StorageCollectionsMetrics};
73
74mod metrics;
75
76/// An abstraction for keeping track of storage collections and managing access
77/// to them.
78///
79/// Responsibilities:
80///
81/// - Keeps a critical persist handle for holding the since of collections
82///   where it need to be.
83///
84/// - Drives the since forward based on the upper of a collection and a
85///   [ReadPolicy].
86///
87/// - Hands out [ReadHolds](ReadHold) that prevent a collection's since from
88/// advancing while it needs to be read at a specific time.
89#[async_trait]
90pub trait StorageCollections: Debug {
91    type Timestamp: TimelyTimestamp;
92
93    /// On boot, reconcile this [StorageCollections] with outside state. We get
94    /// a [StorageTxn] where we can record any durable state that we need.
95    ///
96    /// We get `init_ids`, which tells us about all collections that currently
97    /// exist, so that we can record durable state for those that _we_ don't
98    /// know yet about.
99    async fn initialize_state(
100        &self,
101        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
102        init_ids: BTreeSet<GlobalId>,
103    ) -> Result<(), StorageError<Self::Timestamp>>;
104
105    /// Update storage configuration with new parameters.
106    fn update_parameters(&self, config_params: StorageParameters);
107
108    /// Returns the [CollectionMetadata] of the collection identified by `id`.
109    fn collection_metadata(
110        &self,
111        id: GlobalId,
112    ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>>;
113
114    /// Acquire an iterator over [CollectionMetadata] for all active
115    /// collections.
116    ///
117    /// A collection is "active" when it has a non empty frontier of read
118    /// capabilties.
119    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
120
121    /// Returns the frontiers of the identified collection.
122    fn collection_frontiers(
123        &self,
124        id: GlobalId,
125    ) -> Result<CollectionFrontiers<Self::Timestamp>, StorageError<Self::Timestamp>> {
126        let frontiers = self
127            .collections_frontiers(vec![id])?
128            .expect_element(|| "known to exist");
129
130        Ok(frontiers)
131    }
132
133    /// Atomically gets and returns the frontiers of all the identified
134    /// collections.
135    fn collections_frontiers(
136        &self,
137        id: Vec<GlobalId>,
138    ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>>;
139
140    /// Atomically gets and returns the frontiers of all active collections.
141    ///
142    /// A collection is "active" when it has a non empty frontier of read
143    /// capabilties.
144    fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>>;
145
146    /// Checks whether a collection exists under the given `GlobalId`. Returns
147    /// an error if the collection does not exist.
148    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
149
150    /// Returns aggregate statistics about the contents of the local input named
151    /// `id` at `as_of`.
152    async fn snapshot_stats(
153        &self,
154        id: GlobalId,
155        as_of: Antichain<Self::Timestamp>,
156    ) -> Result<SnapshotStats, StorageError<Self::Timestamp>>;
157
158    /// Returns aggregate statistics about the contents of the local input named
159    /// `id` at `as_of`.
160    ///
161    /// Note that this async function itself returns a future. We may
162    /// need to block on the stats being available, but don't want to hold a reference
163    /// to the controller for too long... so the outer future holds a reference to the
164    /// controller but returns quickly, and the inner future is slow but does not
165    /// reference the controller.
166    async fn snapshot_parts_stats(
167        &self,
168        id: GlobalId,
169        as_of: Antichain<Self::Timestamp>,
170    ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>;
171
172    /// Returns a snapshot of the contents of collection `id` at `as_of`.
173    fn snapshot(
174        &self,
175        id: GlobalId,
176        as_of: Self::Timestamp,
177    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>;
178
179    /// Returns a snapshot of the contents of collection `id` at the largest
180    /// readable `as_of`.
181    async fn snapshot_latest(
182        &self,
183        id: GlobalId,
184    ) -> Result<Vec<Row>, StorageError<Self::Timestamp>>;
185
186    /// Returns a snapshot of the contents of collection `id` at `as_of`.
187    fn snapshot_cursor(
188        &self,
189        id: GlobalId,
190        as_of: Self::Timestamp,
191    ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
192    where
193        Self::Timestamp: Codec64 + TimelyTimestamp + Lattice;
194
195    /// Generates a snapshot of the contents of collection `id` at `as_of` and
196    /// streams out all of the updates in bounded memory.
197    ///
198    /// The output is __not__ consolidated.
199    fn snapshot_and_stream(
200        &self,
201        id: GlobalId,
202        as_of: Self::Timestamp,
203    ) -> BoxFuture<
204        'static,
205        Result<
206            BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
207            StorageError<Self::Timestamp>,
208        >,
209    >;
210
211    /// Create a [`TimestamplessUpdateBuilder`] that can be used to stage
212    /// updates for the provided [`GlobalId`].
213    fn create_update_builder(
214        &self,
215        id: GlobalId,
216    ) -> BoxFuture<
217        'static,
218        Result<
219            TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
220            StorageError<Self::Timestamp>,
221        >,
222    >
223    where
224        Self::Timestamp: Lattice + Codec64;
225
226    /// Update the given [`StorageTxn`] with the appropriate metadata given the
227    /// IDs to add and drop.
228    ///
229    /// The data modified in the `StorageTxn` must be made available in all
230    /// subsequent calls that require [`StorageMetadata`] as a parameter.
231    async fn prepare_state(
232        &self,
233        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
234        ids_to_add: BTreeSet<GlobalId>,
235        ids_to_drop: BTreeSet<GlobalId>,
236        ids_to_register: BTreeMap<GlobalId, ShardId>,
237    ) -> Result<(), StorageError<Self::Timestamp>>;
238
239    /// Create the collections described by the individual
240    /// [CollectionDescriptions](CollectionDescription).
241    ///
242    /// Each command carries the source id, the source description, and any
243    /// associated metadata needed to ingest the particular source.
244    ///
245    /// This command installs collection state for the indicated sources, and
246    /// they are now valid to use in queries at times beyond the initial `since`
247    /// frontiers. Each collection also acquires a read capability at this
248    /// frontier, which will need to be repeatedly downgraded with
249    /// `allow_compaction()` to permit compaction.
250    ///
251    /// This method is NOT idempotent; It can fail between processing of
252    /// different collections and leave the [StorageCollections] in an
253    /// inconsistent state. It is almost always wrong to do anything but abort
254    /// the process on `Err`.
255    ///
256    /// The `register_ts` is used as the initial timestamp that tables are
257    /// available for reads. (We might later give non-tables the same treatment,
258    /// but hold off on that initially.) Callers must provide a Some if any of
259    /// the collections is a table. A None may be given if none of the
260    /// collections are a table (i.e. all materialized views, sources, etc).
261    ///
262    /// `migrated_storage_collections` is a set of migrated storage collections to be excluded
263    /// from the txn-wal sub-system.
264    async fn create_collections_for_bootstrap(
265        &self,
266        storage_metadata: &StorageMetadata,
267        register_ts: Option<Self::Timestamp>,
268        collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
269        migrated_storage_collections: &BTreeSet<GlobalId>,
270    ) -> Result<(), StorageError<Self::Timestamp>>;
271
272    /// Alters the identified ingestion to use the provided [`SourceDesc`].
273    ///
274    /// NOTE: Ideally, [StorageCollections] would not care about these, but we
275    /// have to learn about changes such that when new subsources are created we
276    /// can correctly determine a since based on its depenencies' sinces. This
277    /// is really only relevant because newly created subsources depend on the
278    /// remap shard, and we can't just have them start at since 0.
279    async fn alter_ingestion_source_desc(
280        &self,
281        ingestion_id: GlobalId,
282        source_desc: SourceDesc,
283    ) -> Result<(), StorageError<Self::Timestamp>>;
284
285    /// Alters the data config for the specified source exports of the specified ingestions.
286    async fn alter_ingestion_export_data_configs(
287        &self,
288        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
289    ) -> Result<(), StorageError<Self::Timestamp>>;
290
291    /// Alters each identified collection to use the correlated
292    /// [`GenericSourceConnection`].
293    ///
294    /// See NOTE on [StorageCollections::alter_ingestion_source_desc].
295    async fn alter_ingestion_connections(
296        &self,
297        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
298    ) -> Result<(), StorageError<Self::Timestamp>>;
299
300    /// Updates the [`RelationDesc`] for the specified table.
301    async fn alter_table_desc(
302        &self,
303        existing_collection: GlobalId,
304        new_collection: GlobalId,
305        new_desc: RelationDesc,
306        expected_version: RelationVersion,
307    ) -> Result<(), StorageError<Self::Timestamp>>;
308
309    /// Drops the read capability for the sources and allows their resources to
310    /// be reclaimed.
311    ///
312    /// TODO(jkosh44): This method does not validate the provided identifiers.
313    /// Currently when the controller starts/restarts it has no durable state.
314    /// That means that it has no way of remembering any past commands sent. In
315    /// the future we plan on persisting state for the controller so that it is
316    /// aware of past commands. Therefore this method is for dropping sources
317    /// that we know to have been previously created, but have been forgotten by
318    /// the controller due to a restart. Once command history becomes durable we
319    /// can remove this method and use the normal `drop_sources`.
320    fn drop_collections_unvalidated(
321        &self,
322        storage_metadata: &StorageMetadata,
323        identifiers: Vec<GlobalId>,
324    );
325
326    /// Assigns a read policy to specific identifiers.
327    ///
328    /// The policies are assigned in the order presented, and repeated
329    /// identifiers should conclude with the last policy. Changing a policy will
330    /// immediately downgrade the read capability if appropriate, but it will
331    /// not "recover" the read capability if the prior capability is already
332    /// ahead of it.
333    ///
334    /// This [StorageCollections] may include its own overrides on these
335    /// policies.
336    ///
337    /// Identifiers not present in `policies` retain their existing read
338    /// policies.
339    fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>);
340
341    /// Acquires and returns the earliest possible read holds for the specified
342    /// collections.
343    fn acquire_read_holds(
344        &self,
345        desired_holds: Vec<GlobalId>,
346    ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError>;
347
348    /// Get the time dependence for a storage collection. Returns no value if unknown or if
349    /// the object isn't managed by storage.
350    fn determine_time_dependence(
351        &self,
352        id: GlobalId,
353    ) -> Result<Option<TimeDependence>, TimeDependenceError>;
354}
355
356/// A cursor over a snapshot, allowing us to read just part of a snapshot in its
357/// consolidated form.
358pub struct SnapshotCursor<T: Codec64 + TimelyTimestamp + Lattice> {
359    // We allocate a temporary read handle for each snapshot, and that handle needs to live at
360    // least as long as the cursor itself, which holds part leases. Bundling them together!
361    pub _read_handle: ReadHandle<SourceData, (), T, StorageDiff>,
362    pub cursor: Cursor<SourceData, (), T, StorageDiff>,
363}
364
365impl<T: Codec64 + TimelyTimestamp + Lattice + Sync> SnapshotCursor<T> {
366    pub async fn next(
367        &mut self,
368    ) -> Option<
369        impl Iterator<
370            Item = (
371                (Result<SourceData, String>, Result<(), String>),
372                T,
373                StorageDiff,
374            ),
375        > + Sized
376        + '_,
377    > {
378        self.cursor.next().await
379    }
380}
381
382/// Frontiers of the collection identified by `id`.
383#[derive(Debug)]
384pub struct CollectionFrontiers<T> {
385    /// The [GlobalId] of the collection that these frontiers belong to.
386    pub id: GlobalId,
387
388    /// The upper/write frontier of the collection.
389    pub write_frontier: Antichain<T>,
390
391    /// The since frontier that is implied by the collection's existence,
392    /// disregarding any read holds.
393    ///
394    /// Concretely, it is the since frontier that is implied by the combination
395    /// of the `write_frontier` and a [ReadPolicy]. The implied capability is
396    /// derived from the write frontier using the [ReadPolicy].
397    pub implied_capability: Antichain<T>,
398
399    /// The frontier of all oustanding [ReadHolds](ReadHold). This includes the
400    /// implied capability.
401    pub read_capabilities: Antichain<T>,
402}
403
404/// Implementation of [StorageCollections] that is shallow-cloneable and uses a
405/// background task for doing work concurrently, in the background.
406#[derive(Debug, Clone)]
407pub struct StorageCollectionsImpl<
408    T: TimelyTimestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
409> {
410    /// The fencing token for this instance of [StorageCollections], and really
411    /// all of the controllers and Coordinator.
412    envd_epoch: NonZeroI64,
413
414    /// Whether or not this [StorageCollections] is in read-only mode.
415    ///
416    /// When in read-only mode, we are not allowed to affect changes to external
417    /// systems, including, for example, acquiring and downgrading critical
418    /// [SinceHandles](SinceHandle)
419    read_only: bool,
420
421    /// The set of [ShardIds](ShardId) that we have to finalize. These will have
422    /// been persisted by the caller of [StorageCollections::prepare_state].
423    finalizable_shards: Arc<ShardIdSet>,
424
425    /// The set of [ShardIds](ShardId) that we have finalized. We keep track of
426    /// shards here until we are given a chance to let our callers know that
427    /// these have been finalized, for example via
428    /// [StorageCollections::prepare_state].
429    finalized_shards: Arc<ShardIdSet>,
430
431    /// Collections maintained by this [StorageCollections].
432    collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
433
434    /// A shared TxnsCache running in a task and communicated with over a channel.
435    txns_read: TxnsRead<T>,
436
437    /// Storage configuration parameters.
438    config: Arc<Mutex<StorageConfiguration>>,
439
440    /// The upper of the txn shard as it was when we booted. We forward the
441    /// upper of created/registered tables to make sure that their uppers are at
442    /// least not less than the initially known txn upper.
443    ///
444    /// NOTE: This works around a quirk in how the adapter chooses the as_of of
445    /// existing indexes when bootstrapping, where tables that have an upper
446    /// that is less than the initially known txn upper can lead to indexes that
447    /// cannot hydrate in read-only mode.
448    initial_txn_upper: Antichain<T>,
449
450    /// The persist location where all storage collections are being written to
451    persist_location: PersistLocation,
452
453    /// A persist client used to write to storage collections
454    persist: Arc<PersistClientCache>,
455
456    /// For sending commands to our internal task.
457    cmd_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
458
459    /// For sending updates about read holds to our internal task.
460    holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<T>)>,
461
462    /// Handles to tasks we own, making sure they're dropped when we are.
463    _background_task: Arc<AbortOnDropHandle<()>>,
464    _finalize_shards_task: Arc<AbortOnDropHandle<()>>,
465}
466
467// Supporting methods for implementing [StorageCollections].
468//
469// Almost all internal methods that are the backing implementation for a trait
470// method have the `_inner` suffix.
471//
472// We follow a pattern where `_inner` methods get a mutable reference to the
473// shared collections state, and it's the public-facing method that locks the
474// state for the duration of its invocation. This allows calling other `_inner`
475// methods from within `_inner` methods.
476impl<T> StorageCollectionsImpl<T>
477where
478    T: TimelyTimestamp
479        + Lattice
480        + Codec64
481        + From<EpochMillis>
482        + TimestampManipulation
483        + Into<mz_repr::Timestamp>
484        + Sync,
485{
486    /// Creates and returns a new [StorageCollections].
487    ///
488    /// Note that when creating a new [StorageCollections], you must also
489    /// reconcile it with the previous state using
490    /// [StorageCollections::initialize_state],
491    /// [StorageCollections::prepare_state], and
492    /// [StorageCollections::create_collections_for_bootstrap].
493    pub async fn new(
494        persist_location: PersistLocation,
495        persist_clients: Arc<PersistClientCache>,
496        metrics_registry: &MetricsRegistry,
497        _now: NowFn,
498        txns_metrics: Arc<TxnMetrics>,
499        envd_epoch: NonZeroI64,
500        read_only: bool,
501        connection_context: ConnectionContext,
502        txn: &dyn StorageTxn<T>,
503    ) -> Self {
504        let metrics = StorageCollectionsMetrics::register_into(metrics_registry);
505
506        // This value must be already installed because we must ensure it's
507        // durably recorded before it is used, otherwise we risk leaking persist
508        // state.
509        let txns_id = txn
510            .get_txn_wal_shard()
511            .expect("must call prepare initialization before creating StorageCollections");
512
513        let txns_client = persist_clients
514            .open(persist_location.clone())
515            .await
516            .expect("location should be valid");
517
518        // We have to initialize, so that TxnsRead::start() below does not
519        // block.
520        let _txns_handle: TxnsHandle<SourceData, (), T, StorageDiff, PersistEpoch, TxnsCodecRow> =
521            TxnsHandle::open(
522                T::minimum(),
523                txns_client.clone(),
524                txns_client.dyncfgs().clone(),
525                Arc::clone(&txns_metrics),
526                txns_id,
527            )
528            .await;
529
530        // For handing to the background task, for listening to upper updates.
531        let (txns_key_schema, txns_val_schema) = TxnsCodecRow::schemas();
532        let mut txns_write = txns_client
533            .open_writer(
534                txns_id,
535                Arc::new(txns_key_schema),
536                Arc::new(txns_val_schema),
537                Diagnostics {
538                    shard_name: "txns".to_owned(),
539                    handle_purpose: "commit txns".to_owned(),
540                },
541            )
542            .await
543            .expect("txns schema shouldn't change");
544
545        let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
546
547        let collections = Arc::new(std::sync::Mutex::new(BTreeMap::default()));
548        let finalizable_shards =
549            Arc::new(ShardIdSet::new(metrics.finalization_outstanding.clone()));
550        let finalized_shards =
551            Arc::new(ShardIdSet::new(metrics.finalization_pending_commit.clone()));
552        let config = Arc::new(Mutex::new(StorageConfiguration::new(
553            connection_context,
554            mz_dyncfgs::all_dyncfgs(),
555        )));
556
557        let initial_txn_upper = txns_write.fetch_recent_upper().await.to_owned();
558
559        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
560        let (holds_tx, holds_rx) = mpsc::unbounded_channel();
561        let mut background_task = BackgroundTask {
562            config: Arc::clone(&config),
563            cmds_tx: cmd_tx.clone(),
564            cmds_rx: cmd_rx,
565            holds_rx,
566            collections: Arc::clone(&collections),
567            finalizable_shards: Arc::clone(&finalizable_shards),
568            shard_by_id: BTreeMap::new(),
569            since_handles: BTreeMap::new(),
570            txns_handle: Some(txns_write),
571            txns_shards: Default::default(),
572        };
573
574        let background_task =
575            mz_ore::task::spawn(|| "storage_collections::background_task", async move {
576                background_task.run().await
577            });
578
579        let finalize_shards_task = mz_ore::task::spawn(
580            || "storage_collections::finalize_shards_task",
581            finalize_shards_task::<T>(FinalizeShardsTaskConfig {
582                envd_epoch: envd_epoch.clone(),
583                config: Arc::clone(&config),
584                metrics,
585                finalizable_shards: Arc::clone(&finalizable_shards),
586                finalized_shards: Arc::clone(&finalized_shards),
587                persist_location: persist_location.clone(),
588                persist: Arc::clone(&persist_clients),
589                read_only,
590            }),
591        );
592
593        Self {
594            finalizable_shards,
595            finalized_shards,
596            collections,
597            txns_read,
598            envd_epoch,
599            read_only,
600            config,
601            initial_txn_upper,
602            persist_location,
603            persist: persist_clients,
604            cmd_tx,
605            holds_tx,
606            _background_task: Arc::new(background_task.abort_on_drop()),
607            _finalize_shards_task: Arc::new(finalize_shards_task.abort_on_drop()),
608        }
609    }
610
611    /// Opens a [WriteHandle] and a [SinceHandleWrapper], for holding back the since.
612    ///
613    /// `since` is an optional since that the read handle will be forwarded to
614    /// if it is less than its current since.
615    ///
616    /// This will `halt!` the process if we cannot successfully acquire a
617    /// critical handle with our current epoch.
618    async fn open_data_handles(
619        &self,
620        id: &GlobalId,
621        shard: ShardId,
622        since: Option<&Antichain<T>>,
623        relation_desc: RelationDesc,
624        persist_client: &PersistClient,
625    ) -> (
626        WriteHandle<SourceData, (), T, StorageDiff>,
627        SinceHandleWrapper<T>,
628    ) {
629        let since_handle = if self.read_only {
630            let read_handle = self
631                .open_leased_handle(id, shard, relation_desc.clone(), since, persist_client)
632                .await;
633            SinceHandleWrapper::Leased(read_handle)
634        } else {
635            // We're managing the data for this shard in read-write mode, which would fence out other
636            // processes in read-only mode; it's safe to upgrade the metadata version.
637            persist_client
638                .upgrade_version::<SourceData, (), T, StorageDiff>(
639                    shard,
640                    Diagnostics {
641                        shard_name: id.to_string(),
642                        handle_purpose: format!("controller data for {}", id),
643                    },
644                )
645                .await
646                .expect("invalid persist usage");
647
648            let since_handle = self
649                .open_critical_handle(id, shard, since, persist_client)
650                .await;
651
652            SinceHandleWrapper::Critical(since_handle)
653        };
654
655        let mut write_handle = self
656            .open_write_handle(id, shard, relation_desc, persist_client)
657            .await;
658
659        // N.B.
660        // Fetch the most recent upper for the write handle. Otherwise, this may
661        // be behind the since of the since handle. Its vital this happens AFTER
662        // we create the since handle as it needs to be linearized with that
663        // operation. It may be true that creating the write handle after the
664        // since handle already ensures this, but we do this out of an abundance
665        // of caution.
666        //
667        // Note that this returns the upper, but also sets it on the handle to
668        // be fetched later.
669        write_handle.fetch_recent_upper().await;
670
671        (write_handle, since_handle)
672    }
673
674    /// Opens a write handle for the given `shard`.
675    async fn open_write_handle(
676        &self,
677        id: &GlobalId,
678        shard: ShardId,
679        relation_desc: RelationDesc,
680        persist_client: &PersistClient,
681    ) -> WriteHandle<SourceData, (), T, StorageDiff> {
682        let diagnostics = Diagnostics {
683            shard_name: id.to_string(),
684            handle_purpose: format!("controller data for {}", id),
685        };
686
687        let write = persist_client
688            .open_writer(
689                shard,
690                Arc::new(relation_desc),
691                Arc::new(UnitSchema),
692                diagnostics.clone(),
693            )
694            .await
695            .expect("invalid persist usage");
696
697        write
698    }
699
700    /// Opens a critical since handle for the given `shard`.
701    ///
702    /// `since` is an optional since that the read handle will be forwarded to
703    /// if it is less than its current since.
704    ///
705    /// This will `halt!` the process if we cannot successfully acquire a
706    /// critical handle with our current epoch.
707    async fn open_critical_handle(
708        &self,
709        id: &GlobalId,
710        shard: ShardId,
711        since: Option<&Antichain<T>>,
712        persist_client: &PersistClient,
713    ) -> SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch> {
714        tracing::debug!(%id, ?since, "opening critical handle");
715
716        assert!(
717            !self.read_only,
718            "attempting to open critical SinceHandle in read-only mode"
719        );
720
721        let diagnostics = Diagnostics {
722            shard_name: id.to_string(),
723            handle_purpose: format!("controller data for {}", id),
724        };
725
726        // Construct the handle in a separate block to ensure all error paths
727        // are diverging
728        let since_handle = {
729            // This block's aim is to ensure the handle is in terms of our epoch
730            // by the time we return it.
731            let mut handle: SinceHandle<_, _, _, _, PersistEpoch> = persist_client
732                .open_critical_since(
733                    shard,
734                    PersistClient::CONTROLLER_CRITICAL_SINCE,
735                    diagnostics.clone(),
736                )
737                .await
738                .expect("invalid persist usage");
739
740            // Take the join of the handle's since and the provided `since`;
741            // this lets materialized views express the since at which their
742            // read handles "start."
743            let provided_since = match since {
744                Some(since) => since,
745                None => &Antichain::from_elem(T::minimum()),
746            };
747            let since = handle.since().join(provided_since);
748
749            let our_epoch = self.envd_epoch;
750
751            loop {
752                let current_epoch: PersistEpoch = handle.opaque().clone();
753
754                // Ensure the current epoch is <= our epoch.
755                let unchecked_success = current_epoch.0.map(|e| e <= our_epoch).unwrap_or(true);
756
757                if unchecked_success {
758                    // Update the handle's state so that it is in terms of our
759                    // epoch.
760                    let checked_success = handle
761                        .compare_and_downgrade_since(
762                            &current_epoch,
763                            (&PersistEpoch::from(our_epoch), &since),
764                        )
765                        .await
766                        .is_ok();
767                    if checked_success {
768                        break handle;
769                    }
770                } else {
771                    mz_ore::halt!("fenced by envd @ {current_epoch:?}. ours = {our_epoch}");
772                }
773            }
774        };
775
776        since_handle
777    }
778
779    /// Opens a leased [ReadHandle], for the purpose of holding back a since,
780    /// for the given `shard`.
781    ///
782    /// `since` is an optional since that the read handle will be forwarded to
783    /// if it is less than its current since.
784    async fn open_leased_handle(
785        &self,
786        id: &GlobalId,
787        shard: ShardId,
788        relation_desc: RelationDesc,
789        since: Option<&Antichain<T>>,
790        persist_client: &PersistClient,
791    ) -> ReadHandle<SourceData, (), T, StorageDiff> {
792        tracing::debug!(%id, ?since, "opening leased handle");
793
794        let diagnostics = Diagnostics {
795            shard_name: id.to_string(),
796            handle_purpose: format!("controller data for {}", id),
797        };
798
799        let use_critical_since = false;
800        let mut handle: ReadHandle<_, _, _, _> = persist_client
801            .open_leased_reader(
802                shard,
803                Arc::new(relation_desc),
804                Arc::new(UnitSchema),
805                diagnostics.clone(),
806                use_critical_since,
807            )
808            .await
809            .expect("invalid persist usage");
810
811        // Take the join of the handle's since and the provided `since`;
812        // this lets materialized views express the since at which their
813        // read handles "start."
814        let provided_since = match since {
815            Some(since) => since,
816            None => &Antichain::from_elem(T::minimum()),
817        };
818        let since = handle.since().join(provided_since);
819
820        handle.downgrade_since(&since).await;
821
822        handle
823    }
824
825    fn register_handles(
826        &self,
827        id: GlobalId,
828        is_in_txns: bool,
829        since_handle: SinceHandleWrapper<T>,
830        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
831    ) {
832        self.send(BackgroundCmd::Register {
833            id,
834            is_in_txns,
835            since_handle,
836            write_handle,
837        });
838    }
839
840    fn send(&self, cmd: BackgroundCmd<T>) {
841        let _ = self.cmd_tx.send(cmd);
842    }
843
844    async fn snapshot_stats_inner(
845        &self,
846        id: GlobalId,
847        as_of: SnapshotStatsAsOf<T>,
848    ) -> Result<SnapshotStats, StorageError<T>> {
849        // TODO: Pull this out of BackgroundTask. Unlike the other methods, the
850        // caller of this one drives it to completion.
851        //
852        // We'd need to either share the critical handle somehow or maybe have
853        // two instances around, one in the worker and one in the
854        // StorageCollections.
855        let (tx, rx) = oneshot::channel();
856        self.send(BackgroundCmd::SnapshotStats(id, as_of, tx));
857        rx.await.expect("BackgroundTask should be live").0.await
858    }
859
860    /// If this identified collection has a dependency, install a read hold on
861    /// it.
862    ///
863    /// This is necessary to ensure that the dependency's since does not advance
864    /// beyond its dependents'.
865    fn install_collection_dependency_read_holds_inner(
866        &self,
867        self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
868        id: GlobalId,
869    ) -> Result<(), StorageError<T>> {
870        let (deps, collection_implied_capability) = match self_collections.get(&id) {
871            Some(CollectionState {
872                storage_dependencies: deps,
873                implied_capability,
874                ..
875            }) => (deps.clone(), implied_capability),
876            _ => return Ok(()),
877        };
878
879        for dep in deps.iter() {
880            let dep_collection = self_collections
881                .get(dep)
882                .ok_or(StorageError::IdentifierMissing(id))?;
883
884            mz_ore::soft_assert_or_log!(
885                PartialOrder::less_equal(
886                    &dep_collection.implied_capability,
887                    collection_implied_capability
888                ),
889                "dependency since ({dep}@{:?}) cannot be in advance of dependent's since ({id}@{:?})",
890                dep_collection.implied_capability,
891                collection_implied_capability,
892            );
893        }
894
895        self.install_read_capabilities_inner(
896            self_collections,
897            id,
898            &deps,
899            collection_implied_capability.clone(),
900        )?;
901
902        Ok(())
903    }
904
905    /// Determine if this collection has another dependency.
906    ///
907    /// Currently, collections have either 0 or 1 dependencies.
908    fn determine_collection_dependencies(
909        &self,
910        self_collections: &BTreeMap<GlobalId, CollectionState<T>>,
911        source_id: GlobalId,
912        data_source: &DataSource<T>,
913    ) -> Result<Vec<GlobalId>, StorageError<T>> {
914        let dependencies = match &data_source {
915            DataSource::Introspection(_)
916            | DataSource::Webhook
917            | DataSource::Table { primary: None }
918            | DataSource::Progress
919            | DataSource::Other => Vec::new(),
920            DataSource::Table {
921                primary: Some(primary),
922            } => vec![*primary],
923            DataSource::IngestionExport {
924                ingestion_id,
925                data_config,
926                ..
927            } => {
928                // Ingestion exports depend on their primary source's remap
929                // collection, except when they use a CDCv2 envelope.
930                let source = self_collections
931                    .get(ingestion_id)
932                    .ok_or(StorageError::IdentifierMissing(*ingestion_id))?;
933                let DataSource::Ingestion(ingestion) = &source.description.data_source else {
934                    panic!("SourceExport must refer to a primary source that already exists");
935                };
936
937                match data_config.envelope {
938                    SourceEnvelope::CdcV2 => Vec::new(),
939                    _ => vec![ingestion.remap_collection_id],
940                }
941            }
942            // Ingestions depend on their remap collection.
943            DataSource::Ingestion(ingestion) => {
944                if ingestion.remap_collection_id == source_id {
945                    vec![]
946                } else {
947                    vec![ingestion.remap_collection_id]
948                }
949            }
950            DataSource::Sink { desc } => vec![desc.sink.from],
951        };
952
953        Ok(dependencies)
954    }
955
956    /// Install read capabilities on the given `storage_dependencies`.
957    #[instrument(level = "debug")]
958    fn install_read_capabilities_inner(
959        &self,
960        self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
961        from_id: GlobalId,
962        storage_dependencies: &[GlobalId],
963        read_capability: Antichain<T>,
964    ) -> Result<(), StorageError<T>> {
965        let mut changes = ChangeBatch::new();
966        for time in read_capability.iter() {
967            changes.update(time.clone(), 1);
968        }
969
970        if tracing::span_enabled!(tracing::Level::TRACE) {
971            // Collecting `user_capabilities` is potentially slow, thus only do it when needed.
972            let user_capabilities = self_collections
973                .iter_mut()
974                .filter(|(id, _c)| id.is_user())
975                .map(|(id, c)| {
976                    let updates = c.read_capabilities.updates().cloned().collect_vec();
977                    (*id, c.implied_capability.clone(), updates)
978                })
979                .collect_vec();
980
981            trace!(
982                %from_id,
983                ?storage_dependencies,
984                ?read_capability,
985                ?user_capabilities,
986                "install_read_capabilities_inner");
987        }
988
989        let mut storage_read_updates = storage_dependencies
990            .iter()
991            .map(|id| (*id, changes.clone()))
992            .collect();
993
994        StorageCollectionsImpl::update_read_capabilities_inner(
995            &self.cmd_tx,
996            self_collections,
997            &mut storage_read_updates,
998        );
999
1000        if tracing::span_enabled!(tracing::Level::TRACE) {
1001            // Collecting `user_capabilities` is potentially slow, thus only do it when needed.
1002            let user_capabilities = self_collections
1003                .iter_mut()
1004                .filter(|(id, _c)| id.is_user())
1005                .map(|(id, c)| {
1006                    let updates = c.read_capabilities.updates().cloned().collect_vec();
1007                    (*id, c.implied_capability.clone(), updates)
1008                })
1009                .collect_vec();
1010
1011            trace!(
1012                %from_id,
1013                ?storage_dependencies,
1014                ?read_capability,
1015                ?user_capabilities,
1016                "after install_read_capabilities_inner!");
1017        }
1018
1019        Ok(())
1020    }
1021
1022    async fn recent_upper(&self, id: GlobalId) -> Result<Antichain<T>, StorageError<T>> {
1023        let metadata = &self.collection_metadata(id)?;
1024        let persist_client = self
1025            .persist
1026            .open(metadata.persist_location.clone())
1027            .await
1028            .unwrap();
1029        // Duplicate part of open_data_handles here because we don't need the
1030        // fetch_recent_upper call. The pubsub-updated shared_upper is enough.
1031        let diagnostics = Diagnostics {
1032            shard_name: id.to_string(),
1033            handle_purpose: format!("controller data for {}", id),
1034        };
1035        // NB: Opening a WriteHandle is cheap if it's never used in a
1036        // compare_and_append operation.
1037        let write = persist_client
1038            .open_writer::<SourceData, (), T, StorageDiff>(
1039                metadata.data_shard,
1040                Arc::new(metadata.relation_desc.clone()),
1041                Arc::new(UnitSchema),
1042                diagnostics.clone(),
1043            )
1044            .await
1045            .expect("invalid persist usage");
1046        Ok(write.shared_upper())
1047    }
1048
1049    async fn read_handle_for_snapshot(
1050        persist: Arc<PersistClientCache>,
1051        metadata: &CollectionMetadata,
1052        id: GlobalId,
1053    ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
1054        let persist_client = persist
1055            .open(metadata.persist_location.clone())
1056            .await
1057            .unwrap();
1058
1059        // We create a new read handle every time someone requests a snapshot
1060        // and then immediately expire it instead of keeping a read handle
1061        // permanently in our state to avoid having it heartbeat continually.
1062        // The assumption is that calls to snapshot are rare and therefore worth
1063        // it to always create a new handle.
1064        let read_handle = persist_client
1065            .open_leased_reader::<SourceData, (), _, _>(
1066                metadata.data_shard,
1067                Arc::new(metadata.relation_desc.clone()),
1068                Arc::new(UnitSchema),
1069                Diagnostics {
1070                    shard_name: id.to_string(),
1071                    handle_purpose: format!("snapshot {}", id),
1072                },
1073                USE_CRITICAL_SINCE_SNAPSHOT.get(&persist.cfg),
1074            )
1075            .await
1076            .expect("invalid persist usage");
1077        Ok(read_handle)
1078    }
1079
1080    // TODO(petrosagg): This signature is not very useful in the context of partially ordered times
1081    // where the as_of frontier might have multiple elements. In the current form the mutually
1082    // incomparable updates will be accumulated together to a state of the collection that never
1083    // actually existed. We should include the original time in the updates advanced by the as_of
1084    // frontier in the result and let the caller decide what to do with the information.
1085    fn snapshot(
1086        &self,
1087        id: GlobalId,
1088        as_of: T,
1089        txns_read: &TxnsRead<T>,
1090    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<T>>>
1091    where
1092        T: Codec64 + From<EpochMillis> + TimestampManipulation,
1093    {
1094        let metadata = match self.collection_metadata(id) {
1095            Ok(metadata) => metadata.clone(),
1096            Err(e) => return async { Err(e) }.boxed(),
1097        };
1098        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1099            assert_eq!(txns_id, txns_read.txns_id());
1100            txns_read.clone()
1101        });
1102        let persist = Arc::clone(&self.persist);
1103        async move {
1104            let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1105            let contents = match txns_read {
1106                None => {
1107                    // We're not using txn-wal for tables, so we can take a snapshot directly.
1108                    read_handle
1109                        .snapshot_and_fetch(Antichain::from_elem(as_of))
1110                        .await
1111                }
1112                Some(txns_read) => {
1113                    // We _are_ using txn-wal for tables. It advances the physical upper of the
1114                    // shard lazily, so we need to ask it for the snapshot to ensure the read is
1115                    // unblocked.
1116                    //
1117                    // Consider the following scenario:
1118                    // - Table A is written to via txns at time 5
1119                    // - Tables other than A are written to via txns consuming timestamps up to 10
1120                    // - We'd like to read A at 7
1121                    // - The application process of A's txn has advanced the upper to 5+1, but we need
1122                    //   it to be past 7, but the txns shard knows that (5,10) is empty of writes to A
1123                    // - This branch allows it to handle that advancing the physical upper of Table A to
1124                    //   10 (NB but only once we see it get past the write at 5!)
1125                    // - Then we can read it normally.
1126                    txns_read.update_gt(as_of.clone()).await;
1127                    let data_snapshot = txns_read
1128                        .data_snapshot(metadata.data_shard, as_of.clone())
1129                        .await;
1130                    data_snapshot.snapshot_and_fetch(&mut read_handle).await
1131                }
1132            };
1133            match contents {
1134                Ok(contents) => {
1135                    let mut snapshot = Vec::with_capacity(contents.len());
1136                    for ((data, _), _, diff) in contents {
1137                        // TODO(petrosagg): We should accumulate the errors too and let the user
1138                        // interprret the result
1139                        let row = data.expect("invalid protobuf data").0?;
1140                        snapshot.push((row, diff));
1141                    }
1142                    Ok(snapshot)
1143                }
1144                Err(_) => Err(StorageError::ReadBeforeSince(id)),
1145            }
1146        }
1147        .boxed()
1148    }
1149
1150    fn snapshot_and_stream(
1151        &self,
1152        id: GlobalId,
1153        as_of: T,
1154        txns_read: &TxnsRead<T>,
1155    ) -> BoxFuture<'static, Result<BoxStream<'static, (SourceData, T, StorageDiff)>, StorageError<T>>>
1156    {
1157        use futures::stream::StreamExt;
1158
1159        let metadata = match self.collection_metadata(id) {
1160            Ok(metadata) => metadata.clone(),
1161            Err(e) => return async { Err(e) }.boxed(),
1162        };
1163        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1164            assert_eq!(txns_id, txns_read.txns_id());
1165            txns_read.clone()
1166        });
1167        let persist = Arc::clone(&self.persist);
1168
1169        async move {
1170            let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1171            let stream = match txns_read {
1172                None => {
1173                    // We're not using txn-wal for tables, so we can take a snapshot directly.
1174                    read_handle
1175                        .snapshot_and_stream(Antichain::from_elem(as_of))
1176                        .await
1177                        .map_err(|_| StorageError::ReadBeforeSince(id))?
1178                        .boxed()
1179                }
1180                Some(txns_read) => {
1181                    txns_read.update_gt(as_of.clone()).await;
1182                    let data_snapshot = txns_read
1183                        .data_snapshot(metadata.data_shard, as_of.clone())
1184                        .await;
1185                    data_snapshot
1186                        .snapshot_and_stream(&mut read_handle)
1187                        .await
1188                        .map_err(|_| StorageError::ReadBeforeSince(id))?
1189                        .boxed()
1190                }
1191            };
1192
1193            // Map our stream, unwrapping Persist internal errors.
1194            let stream = stream
1195                .map(|((k, _v), t, d)| {
1196                    // TODO(parkmycar): We should accumulate the errors and pass them on to the
1197                    // caller.
1198                    let data = k.expect("error while streaming from Persist");
1199                    (data, t, d)
1200                })
1201                .boxed();
1202            Ok(stream)
1203        }
1204        .boxed()
1205    }
1206
1207    fn set_read_policies_inner(
1208        &self,
1209        collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1210        policies: Vec<(GlobalId, ReadPolicy<T>)>,
1211    ) {
1212        trace!("set_read_policies: {:?}", policies);
1213
1214        let mut read_capability_changes = BTreeMap::default();
1215
1216        for (id, policy) in policies.into_iter() {
1217            let collection = match collections.get_mut(&id) {
1218                Some(c) => c,
1219                None => {
1220                    panic!("Reference to absent collection {id}");
1221                }
1222            };
1223
1224            let mut new_read_capability = policy.frontier(collection.write_frontier.borrow());
1225
1226            if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
1227                let mut update = ChangeBatch::new();
1228                update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
1229                std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
1230                update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
1231                if !update.is_empty() {
1232                    read_capability_changes.insert(id, update);
1233                }
1234            }
1235
1236            collection.read_policy = policy;
1237        }
1238
1239        for (id, changes) in read_capability_changes.iter() {
1240            if id.is_user() {
1241                trace!(%id, ?changes, "in set_read_policies, capability changes");
1242            }
1243        }
1244
1245        if !read_capability_changes.is_empty() {
1246            StorageCollectionsImpl::update_read_capabilities_inner(
1247                &self.cmd_tx,
1248                collections,
1249                &mut read_capability_changes,
1250            );
1251        }
1252    }
1253
1254    // This is not an associated function so that we can share it with the task
1255    // that updates the persist handles and also has a reference to the shared
1256    // collections state.
1257    fn update_read_capabilities_inner(
1258        cmd_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
1259        collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1260        updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>,
1261    ) {
1262        // Location to record consequences that we need to act on.
1263        let mut collections_net = BTreeMap::new();
1264
1265        // We must not rely on any specific relative ordering of `GlobalId`s.
1266        // That said, it is reasonable to assume that collections generally have
1267        // greater IDs than their dependencies, so starting with the largest is
1268        // a useful optimization.
1269        while let Some(id) = updates.keys().rev().next().cloned() {
1270            let mut update = updates.remove(&id).unwrap();
1271
1272            if id.is_user() {
1273                trace!(id = ?id, update = ?update, "update_read_capabilities");
1274            }
1275
1276            let collection = if let Some(c) = collections.get_mut(&id) {
1277                c
1278            } else {
1279                let has_positive_updates = update.iter().any(|(_ts, diff)| *diff > 0);
1280                if has_positive_updates {
1281                    panic!(
1282                        "reference to absent collection {id} but we have positive updates: {:?}",
1283                        update
1284                    );
1285                } else {
1286                    // Continue purely negative updates. Someone has probably
1287                    // already dropped this collection!
1288                    continue;
1289                }
1290            };
1291
1292            let current_read_capabilities = collection.read_capabilities.frontier().to_owned();
1293            for (time, diff) in update.iter() {
1294                assert!(
1295                    collection.read_capabilities.count_for(time) + diff >= 0,
1296                    "update {:?} for collection {id} would lead to negative \
1297                        read capabilities, read capabilities before applying: {:?}",
1298                    update,
1299                    collection.read_capabilities
1300                );
1301
1302                if collection.read_capabilities.count_for(time) + diff > 0 {
1303                    assert!(
1304                        current_read_capabilities.less_equal(time),
1305                        "update {:?} for collection {id} is trying to \
1306                            install read capabilities before the current \
1307                            frontier of read capabilities, read capabilities before applying: {:?}",
1308                        update,
1309                        collection.read_capabilities
1310                    );
1311                }
1312            }
1313
1314            let changes = collection.read_capabilities.update_iter(update.drain());
1315            update.extend(changes);
1316
1317            if id.is_user() {
1318                trace!(
1319                %id,
1320                ?collection.storage_dependencies,
1321                ?update,
1322                "forwarding update to storage dependencies");
1323            }
1324
1325            for id in collection.storage_dependencies.iter() {
1326                updates
1327                    .entry(*id)
1328                    .or_insert_with(ChangeBatch::new)
1329                    .extend(update.iter().cloned());
1330            }
1331
1332            let (changes, frontier) = collections_net
1333                .entry(id)
1334                .or_insert_with(|| (<ChangeBatch<_>>::new(), Antichain::new()));
1335
1336            changes.extend(update.drain());
1337            *frontier = collection.read_capabilities.frontier().to_owned();
1338        }
1339
1340        // Translate our net compute actions into downgrades of persist sinces.
1341        // The actual downgrades are performed by a Tokio task asynchronously.
1342        let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
1343        for (key, (mut changes, frontier)) in collections_net {
1344            if !changes.is_empty() {
1345                // If the table has a "primary" collection, let that collection drive compaction.
1346                let collection = collections.get(&key).expect("must still exist");
1347                let should_emit_persist_compaction = !matches!(
1348                    collection.description.data_source,
1349                    DataSource::Table { primary: Some(_) }
1350                );
1351
1352                if frontier.is_empty() {
1353                    info!(id = %key, "removing collection state because the since advanced to []!");
1354                    collections.remove(&key).expect("must still exist");
1355                }
1356
1357                if should_emit_persist_compaction {
1358                    persist_compaction_commands.push((key, frontier));
1359                }
1360            }
1361        }
1362
1363        if !persist_compaction_commands.is_empty() {
1364            cmd_tx
1365                .send(BackgroundCmd::DowngradeSince(persist_compaction_commands))
1366                .expect("cannot fail to send");
1367        }
1368    }
1369
1370    /// Remove any shards that we know are finalized
1371    fn synchronize_finalized_shards(&self, storage_metadata: &StorageMetadata) {
1372        self.finalized_shards
1373            .lock()
1374            .retain(|shard| storage_metadata.unfinalized_shards.contains(shard));
1375    }
1376}
1377
1378// See comments on the above impl for StorageCollectionsImpl.
1379#[async_trait]
1380impl<T> StorageCollections for StorageCollectionsImpl<T>
1381where
1382    T: TimelyTimestamp
1383        + Lattice
1384        + Codec64
1385        + From<EpochMillis>
1386        + TimestampManipulation
1387        + Into<mz_repr::Timestamp>
1388        + Sync,
1389{
1390    type Timestamp = T;
1391
1392    async fn initialize_state(
1393        &self,
1394        txn: &mut (dyn StorageTxn<T> + Send),
1395        init_ids: BTreeSet<GlobalId>,
1396    ) -> Result<(), StorageError<T>> {
1397        let metadata = txn.get_collection_metadata();
1398        let existing_metadata: BTreeSet<_> = metadata.into_iter().map(|(id, _)| id).collect();
1399
1400        // Determine which collections we do not yet have metadata for.
1401        let new_collections: BTreeSet<GlobalId> =
1402            init_ids.difference(&existing_metadata).cloned().collect();
1403
1404        self.prepare_state(
1405            txn,
1406            new_collections,
1407            BTreeSet::default(),
1408            BTreeMap::default(),
1409        )
1410        .await?;
1411
1412        // All shards that belong to collections dropped in the last epoch are
1413        // eligible for finalization.
1414        //
1415        // n.b. this introduces an unlikely race condition: if a collection is
1416        // dropped from the catalog, but the dataflow is still running on a
1417        // worker, assuming the shard is safe to finalize on reboot may cause
1418        // the cluster to panic.
1419        let unfinalized_shards = txn.get_unfinalized_shards().into_iter().collect_vec();
1420
1421        info!(?unfinalized_shards, "initializing finalizable_shards");
1422
1423        self.finalizable_shards.lock().extend(unfinalized_shards);
1424
1425        Ok(())
1426    }
1427
1428    fn update_parameters(&self, config_params: StorageParameters) {
1429        // We serialize the dyncfg updates in StorageParameters, but configure
1430        // persist separately.
1431        config_params.dyncfg_updates.apply(self.persist.cfg());
1432
1433        self.config
1434            .lock()
1435            .expect("lock poisoned")
1436            .update(config_params);
1437    }
1438
1439    fn collection_metadata(
1440        &self,
1441        id: GlobalId,
1442    ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>> {
1443        let collections = self.collections.lock().expect("lock poisoned");
1444
1445        collections
1446            .get(&id)
1447            .map(|c| c.collection_metadata.clone())
1448            .ok_or(StorageError::IdentifierMissing(id))
1449    }
1450
1451    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
1452        let collections = self.collections.lock().expect("lock poisoned");
1453
1454        collections
1455            .iter()
1456            .filter(|(_id, c)| !c.is_dropped())
1457            .map(|(id, c)| (*id, c.collection_metadata.clone()))
1458            .collect()
1459    }
1460
1461    fn collections_frontiers(
1462        &self,
1463        ids: Vec<GlobalId>,
1464    ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>> {
1465        if ids.is_empty() {
1466            return Ok(vec![]);
1467        }
1468
1469        let collections = self.collections.lock().expect("lock poisoned");
1470
1471        let res = ids
1472            .into_iter()
1473            .map(|id| {
1474                collections
1475                    .get(&id)
1476                    .map(|c| CollectionFrontiers {
1477                        id: id.clone(),
1478                        write_frontier: c.write_frontier.clone(),
1479                        implied_capability: c.implied_capability.clone(),
1480                        read_capabilities: c.read_capabilities.frontier().to_owned(),
1481                    })
1482                    .ok_or(StorageError::IdentifierMissing(id))
1483            })
1484            .collect::<Result<Vec<_>, _>>()?;
1485
1486        Ok(res)
1487    }
1488
1489    fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
1490        let collections = self.collections.lock().expect("lock poisoned");
1491
1492        let res = collections
1493            .iter()
1494            .filter(|(_id, c)| !c.is_dropped())
1495            .map(|(id, c)| CollectionFrontiers {
1496                id: id.clone(),
1497                write_frontier: c.write_frontier.clone(),
1498                implied_capability: c.implied_capability.clone(),
1499                read_capabilities: c.read_capabilities.frontier().to_owned(),
1500            })
1501            .collect_vec();
1502
1503        res
1504    }
1505
1506    async fn snapshot_stats(
1507        &self,
1508        id: GlobalId,
1509        as_of: Antichain<Self::Timestamp>,
1510    ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
1511        let metadata = self.collection_metadata(id)?;
1512
1513        // See the comments in StorageController::snapshot for what's going on
1514        // here.
1515        let as_of = match metadata.txns_shard.as_ref() {
1516            None => SnapshotStatsAsOf::Direct(as_of),
1517            Some(txns_id) => {
1518                assert_eq!(txns_id, self.txns_read.txns_id());
1519                let as_of = as_of
1520                    .into_option()
1521                    .expect("cannot read as_of the empty antichain");
1522                self.txns_read.update_gt(as_of.clone()).await;
1523                let data_snapshot = self
1524                    .txns_read
1525                    .data_snapshot(metadata.data_shard, as_of.clone())
1526                    .await;
1527                SnapshotStatsAsOf::Txns(data_snapshot)
1528            }
1529        };
1530        self.snapshot_stats_inner(id, as_of).await
1531    }
1532
1533    async fn snapshot_parts_stats(
1534        &self,
1535        id: GlobalId,
1536        as_of: Antichain<Self::Timestamp>,
1537    ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
1538        let metadata = {
1539            let self_collections = self.collections.lock().expect("lock poisoned");
1540
1541            let collection_metadata = self_collections
1542                .get(&id)
1543                .ok_or(StorageError::IdentifierMissing(id))
1544                .map(|c| c.collection_metadata.clone());
1545
1546            match collection_metadata {
1547                Ok(m) => m,
1548                Err(e) => return Box::pin(async move { Err(e) }),
1549            }
1550        };
1551
1552        // See the comments in StorageController::snapshot for what's going on
1553        // here.
1554        let persist = Arc::clone(&self.persist);
1555        let read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await;
1556
1557        let data_snapshot = match (metadata, as_of.as_option()) {
1558            (
1559                CollectionMetadata {
1560                    txns_shard: Some(txns_id),
1561                    data_shard,
1562                    ..
1563                },
1564                Some(as_of),
1565            ) => {
1566                assert_eq!(txns_id, *self.txns_read.txns_id());
1567                self.txns_read.update_gt(as_of.clone()).await;
1568                let data_snapshot = self
1569                    .txns_read
1570                    .data_snapshot(data_shard, as_of.clone())
1571                    .await;
1572                Some(data_snapshot)
1573            }
1574            _ => None,
1575        };
1576
1577        Box::pin(async move {
1578            let read_handle = read_handle?;
1579            let result = match data_snapshot {
1580                Some(data_snapshot) => data_snapshot.snapshot_parts_stats(&read_handle).await,
1581                None => read_handle.snapshot_parts_stats(as_of).await,
1582            };
1583            read_handle.expire().await;
1584            result.map_err(|_| StorageError::ReadBeforeSince(id))
1585        })
1586    }
1587
1588    // TODO(petrosagg): This signature is not very useful in the context of partially ordered times
1589    // where the as_of frontier might have multiple elements. In the current form the mutually
1590    // incomparable updates will be accumulated together to a state of the collection that never
1591    // actually existed. We should include the original time in the updates advanced by the as_of
1592    // frontier in the result and let the caller decide what to do with the information.
1593    fn snapshot(
1594        &self,
1595        id: GlobalId,
1596        as_of: Self::Timestamp,
1597    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>> {
1598        self.snapshot(id, as_of, &self.txns_read)
1599    }
1600
1601    async fn snapshot_latest(
1602        &self,
1603        id: GlobalId,
1604    ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
1605        let upper = self.recent_upper(id).await?;
1606        let res = match upper.as_option() {
1607            Some(f) if f > &T::minimum() => {
1608                let as_of = f.step_back().unwrap();
1609
1610                let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap();
1611                snapshot
1612                    .into_iter()
1613                    .map(|(row, diff)| {
1614                        assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
1615                        row
1616                    })
1617                    .collect()
1618            }
1619            Some(_min) => {
1620                // The collection must be empty!
1621                Vec::new()
1622            }
1623            // The collection is closed, we cannot determine a latest read
1624            // timestamp based on the upper.
1625            _ => {
1626                return Err(StorageError::InvalidUsage(
1627                    "collection closed, cannot determine a read timestamp based on the upper"
1628                        .to_string(),
1629                ));
1630            }
1631        };
1632
1633        Ok(res)
1634    }
1635
1636    fn snapshot_cursor(
1637        &self,
1638        id: GlobalId,
1639        as_of: Self::Timestamp,
1640    ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
1641    where
1642        Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
1643    {
1644        let metadata = match self.collection_metadata(id) {
1645            Ok(metadata) => metadata.clone(),
1646            Err(e) => return async { Err(e) }.boxed(),
1647        };
1648        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1649            // Ensure the txn's shard the controller has is the same that this
1650            // collection is registered to.
1651            assert_eq!(txns_id, self.txns_read.txns_id());
1652            self.txns_read.clone()
1653        });
1654        let persist = Arc::clone(&self.persist);
1655
1656        // See the comments in Self::snapshot for what's going on here.
1657        async move {
1658            let mut handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1659            let cursor = match txns_read {
1660                None => {
1661                    let cursor = handle
1662                        .snapshot_cursor(Antichain::from_elem(as_of), |_| true)
1663                        .await
1664                        .map_err(|_| StorageError::ReadBeforeSince(id))?;
1665                    SnapshotCursor {
1666                        _read_handle: handle,
1667                        cursor,
1668                    }
1669                }
1670                Some(txns_read) => {
1671                    txns_read.update_gt(as_of.clone()).await;
1672                    let data_snapshot = txns_read
1673                        .data_snapshot(metadata.data_shard, as_of.clone())
1674                        .await;
1675                    let cursor = data_snapshot
1676                        .snapshot_cursor(&mut handle, |_| true)
1677                        .await
1678                        .map_err(|_| StorageError::ReadBeforeSince(id))?;
1679                    SnapshotCursor {
1680                        _read_handle: handle,
1681                        cursor,
1682                    }
1683                }
1684            };
1685
1686            Ok(cursor)
1687        }
1688        .boxed()
1689    }
1690
1691    fn snapshot_and_stream(
1692        &self,
1693        id: GlobalId,
1694        as_of: Self::Timestamp,
1695    ) -> BoxFuture<
1696        'static,
1697        Result<
1698            BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
1699            StorageError<Self::Timestamp>,
1700        >,
1701    >
1702    where
1703        Self::Timestamp: TimelyTimestamp + Lattice + Codec64 + 'static,
1704    {
1705        self.snapshot_and_stream(id, as_of, &self.txns_read)
1706    }
1707
1708    fn create_update_builder(
1709        &self,
1710        id: GlobalId,
1711    ) -> BoxFuture<
1712        'static,
1713        Result<
1714            TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
1715            StorageError<Self::Timestamp>,
1716        >,
1717    > {
1718        let metadata = match self.collection_metadata(id) {
1719            Ok(m) => m,
1720            Err(e) => return Box::pin(async move { Err(e) }),
1721        };
1722        let persist = Arc::clone(&self.persist);
1723
1724        async move {
1725            let persist_client = persist
1726                .open(metadata.persist_location.clone())
1727                .await
1728                .expect("invalid persist usage");
1729            let write_handle = persist_client
1730                .open_writer::<SourceData, (), Self::Timestamp, StorageDiff>(
1731                    metadata.data_shard,
1732                    Arc::new(metadata.relation_desc.clone()),
1733                    Arc::new(UnitSchema),
1734                    Diagnostics {
1735                        shard_name: id.to_string(),
1736                        handle_purpose: format!("create write batch {}", id),
1737                    },
1738                )
1739                .await
1740                .expect("invalid persist usage");
1741            let builder = TimestamplessUpdateBuilder::new(&write_handle);
1742
1743            Ok(builder)
1744        }
1745        .boxed()
1746    }
1747
1748    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
1749        let collections = self.collections.lock().expect("lock poisoned");
1750
1751        if collections.contains_key(&id) {
1752            Ok(())
1753        } else {
1754            Err(StorageError::IdentifierMissing(id))
1755        }
1756    }
1757
1758    async fn prepare_state(
1759        &self,
1760        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
1761        ids_to_add: BTreeSet<GlobalId>,
1762        ids_to_drop: BTreeSet<GlobalId>,
1763        ids_to_register: BTreeMap<GlobalId, ShardId>,
1764    ) -> Result<(), StorageError<T>> {
1765        txn.insert_collection_metadata(
1766            ids_to_add
1767                .into_iter()
1768                .map(|id| (id, ShardId::new()))
1769                .collect(),
1770        )?;
1771        txn.insert_collection_metadata(ids_to_register)?;
1772
1773        // Delete the metadata for any dropped collections.
1774        let dropped_mappings = txn.delete_collection_metadata(ids_to_drop);
1775
1776        let dropped_shards = dropped_mappings
1777            .into_iter()
1778            .map(|(_id, shard)| shard)
1779            .collect();
1780
1781        txn.insert_unfinalized_shards(dropped_shards)?;
1782
1783        // Reconcile any shards we've successfully finalized with the shard
1784        // finalization collection.
1785        let finalized_shards = self.finalized_shards.lock().iter().copied().collect();
1786        txn.mark_shards_as_finalized(finalized_shards);
1787
1788        Ok(())
1789    }
1790
1791    // TODO(aljoscha): It would be swell if we could refactor this Leviathan of
1792    // a method/move individual parts to their own methods.
1793    #[instrument(level = "debug")]
1794    async fn create_collections_for_bootstrap(
1795        &self,
1796        storage_metadata: &StorageMetadata,
1797        register_ts: Option<Self::Timestamp>,
1798        mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
1799        migrated_storage_collections: &BTreeSet<GlobalId>,
1800    ) -> Result<(), StorageError<Self::Timestamp>> {
1801        let is_in_txns = |id, metadata: &CollectionMetadata| {
1802            metadata.txns_shard.is_some()
1803                && !(self.read_only && migrated_storage_collections.contains(&id))
1804        };
1805
1806        // Validate first, to avoid corrupting state.
1807        // 1. create a dropped identifier, or
1808        // 2. create an existing identifier with a new description.
1809        // Make sure to check for errors within `ingestions` as well.
1810        collections.sort_by_key(|(id, _)| *id);
1811        collections.dedup();
1812        for pos in 1..collections.len() {
1813            if collections[pos - 1].0 == collections[pos].0 {
1814                return Err(StorageError::CollectionIdReused(collections[pos].0));
1815            }
1816        }
1817
1818        {
1819            // Early sanity check: if we knew about a collection already it's
1820            // description must match!
1821            //
1822            // NOTE: There could be concurrent modifications to
1823            // `self.collections`, but this sanity check is better than nothing.
1824            let self_collections = self.collections.lock().expect("lock poisoned");
1825            for (id, description) in collections.iter() {
1826                if let Some(existing_collection) = self_collections.get(id) {
1827                    if &existing_collection.description != description {
1828                        return Err(StorageError::CollectionIdReused(*id));
1829                    }
1830                }
1831            }
1832        }
1833
1834        // We first enrich each collection description with some additional
1835        // metadata...
1836        let enriched_with_metadata = collections
1837            .into_iter()
1838            .map(|(id, description)| {
1839                let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
1840
1841                // If the shard is being managed by txn-wal (initially,
1842                // tables), then we need to pass along the shard id for the txns
1843                // shard to dataflow rendering.
1844                let txns_shard = description
1845                    .data_source
1846                    .in_txns()
1847                    .then(|| *self.txns_read.txns_id());
1848
1849                let metadata = CollectionMetadata {
1850                    persist_location: self.persist_location.clone(),
1851                    data_shard,
1852                    relation_desc: description.desc.clone(),
1853                    txns_shard,
1854                };
1855
1856                Ok((id, description, metadata))
1857            })
1858            .collect_vec();
1859
1860        // So that we can open `SinceHandle`s for each collections concurrently.
1861        let persist_client = self
1862            .persist
1863            .open(self.persist_location.clone())
1864            .await
1865            .unwrap();
1866        let persist_client = &persist_client;
1867        // Reborrow the `&mut self` as immutable, as all the concurrent work to
1868        // be processed in this stream cannot all have exclusive access.
1869        use futures::stream::{StreamExt, TryStreamExt};
1870        let this = &*self;
1871        let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
1872            .map(|data: Result<_, StorageError<Self::Timestamp>>| {
1873                let register_ts = register_ts.clone();
1874                async move {
1875                    let (id, description, metadata) = data?;
1876
1877                    // should be replaced with real introspection
1878                    // (https://github.com/MaterializeInc/database-issues/issues/4078)
1879                    // but for now, it's helpful to have this mapping written down
1880                    // somewhere
1881                    debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
1882
1883                    let (write, mut since_handle) = this
1884                        .open_data_handles(
1885                            &id,
1886                            metadata.data_shard,
1887                            description.since.as_ref(),
1888                            metadata.relation_desc.clone(),
1889                            persist_client,
1890                        )
1891                        .await;
1892
1893                    // Present tables as springing into existence at the register_ts
1894                    // by advancing the since. Otherwise, we could end up in a
1895                    // situation where a table with a long compaction window appears
1896                    // to exist before the environment (and this the table) existed.
1897                    //
1898                    // We could potentially also do the same thing for other
1899                    // sources, in particular storage's internal sources and perhaps
1900                    // others, but leave them for now.
1901                    match description.data_source {
1902                        DataSource::Introspection(_)
1903                        | DataSource::IngestionExport { .. }
1904                        | DataSource::Webhook
1905                        | DataSource::Ingestion(_)
1906                        | DataSource::Progress
1907                        | DataSource::Other => {}
1908                        DataSource::Sink { .. } => {}
1909                        DataSource::Table { .. } => {
1910                            let register_ts = register_ts.expect(
1911                                "caller should have provided a register_ts when creating a table",
1912                            );
1913                            if since_handle.since().elements() == &[T::minimum()]
1914                                && !migrated_storage_collections.contains(&id)
1915                            {
1916                                debug!("advancing {} to initial since of {:?}", id, register_ts);
1917                                let token = since_handle.opaque();
1918                                let _ = since_handle
1919                                    .compare_and_downgrade_since(
1920                                        &token,
1921                                        (&token, &Antichain::from_elem(register_ts.clone())),
1922                                    )
1923                                    .await;
1924                            }
1925                        }
1926                    }
1927
1928                    Ok::<_, StorageError<Self::Timestamp>>((
1929                        id,
1930                        description,
1931                        write,
1932                        since_handle,
1933                        metadata,
1934                    ))
1935                }
1936            })
1937            // Poll each future for each collection concurrently, maximum of 50 at a time.
1938            .buffer_unordered(50)
1939            // HERE BE DRAGONS:
1940            //
1941            // There are at least 2 subtleties in using `FuturesUnordered`
1942            // (which `buffer_unordered` uses underneath:
1943            // - One is captured here
1944            //   <https://github.com/rust-lang/futures-rs/issues/2387>
1945            // - And the other is deadlocking if processing an OUTPUT of a
1946            //   `FuturesUnordered` stream attempts to obtain an async mutex that
1947            //   is also obtained in the futures being polled.
1948            //
1949            // Both of these could potentially be issues in all usages of
1950            // `buffer_unordered` in this method, so we stick the standard
1951            // advice: only use `try_collect` or `collect`!
1952            .try_collect()
1953            .await?;
1954
1955        // Reorder in dependency order.
1956        #[derive(Ord, PartialOrd, Eq, PartialEq)]
1957        enum DependencyOrder {
1958            /// Tables should always be registered first, and large ids before small ones.
1959            Table(Reverse<GlobalId>),
1960            /// For most collections the id order is the correct one.
1961            Collection(GlobalId),
1962            /// Sinks should always be registered last.
1963            Sink(GlobalId),
1964        }
1965        to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
1966            DataSource::Table { .. } => DependencyOrder::Table(Reverse(*id)),
1967            DataSource::Sink { .. } => DependencyOrder::Sink(*id),
1968            _ => DependencyOrder::Collection(*id),
1969        });
1970
1971        // We hold this lock for a very short amount of time, just doing some
1972        // hashmap inserts and unbounded channel sends.
1973        let mut self_collections = self.collections.lock().expect("lock poisoned");
1974
1975        for (id, description, write_handle, since_handle, metadata) in to_register {
1976            let write_frontier = write_handle.upper();
1977            let data_shard_since = since_handle.since().clone();
1978
1979            // Determine if this collection has any dependencies.
1980            let storage_dependencies = self.determine_collection_dependencies(
1981                &*self_collections,
1982                id,
1983                &description.data_source,
1984            )?;
1985
1986            // Determine the initial since of the collection.
1987            let initial_since = match storage_dependencies
1988                .iter()
1989                .at_most_one()
1990                .expect("should have at most one dependency")
1991            {
1992                Some(dep) => {
1993                    let dependency_collection = self_collections
1994                        .get(dep)
1995                        .ok_or(StorageError::IdentifierMissing(*dep))?;
1996                    let dependency_since = dependency_collection.implied_capability.clone();
1997
1998                    // If an item has a dependency, its initial since must be
1999                    // advanced as far as its dependency, i.e. a dependency's
2000                    // since may never be in advance of its dependents.
2001                    //
2002                    // We have to do this every time we initialize the
2003                    // collection, though––the invariant might have been upheld
2004                    // correctly in the previous epoch, but the
2005                    // `data_shard_since` might not have compacted and, on
2006                    // establishing a new persist connection, still have data we
2007                    // said _could_ be compacted.
2008                    if PartialOrder::less_than(&data_shard_since, &dependency_since) {
2009                        // The dependency since cannot be beyond the dependent
2010                        // (our) upper unless the collection is new. In
2011                        // practice, the depdenency is the remap shard of a
2012                        // source (export), and if the since is allowed to
2013                        // "catch up" to the upper, that is `upper <= since`, a
2014                        // restarting ingestion cannot differentiate between
2015                        // updates that have already been written out to the
2016                        // backing persist shard and updates that have yet to be
2017                        // written. We would write duplicate updates.
2018                        //
2019                        // If this check fails, it means that the read hold
2020                        // installed on the dependency was probably not upheld
2021                        // –– if it were, the dependency's since could not have
2022                        // advanced as far the dependent's upper.
2023                        //
2024                        // We don't care about the dependency since when the
2025                        // write frontier is empty. In that case, no-one can
2026                        // write down any more updates.
2027                        mz_ore::soft_assert_or_log!(
2028                            write_frontier.elements() == &[T::minimum()]
2029                                || write_frontier.is_empty()
2030                                || PartialOrder::less_than(&dependency_since, write_frontier),
2031                            "dependency ({dep}) since has advanced past dependent ({id}) upper \n
2032                            dependent ({id}): since {:?}, upper {:?} \n
2033                            dependency ({dep}): since {:?}",
2034                            data_shard_since,
2035                            write_frontier,
2036                            dependency_since
2037                        );
2038
2039                        dependency_since
2040                    } else {
2041                        data_shard_since
2042                    }
2043                }
2044                None => data_shard_since,
2045            };
2046
2047            let mut collection_state = CollectionState::new(
2048                description,
2049                initial_since,
2050                write_frontier.clone(),
2051                storage_dependencies,
2052                metadata.clone(),
2053            );
2054
2055            // Install the collection state in the appropriate spot.
2056            match &collection_state.description.data_source {
2057                DataSource::Introspection(_) => {
2058                    self_collections.insert(id, collection_state);
2059                }
2060                DataSource::Webhook => {
2061                    self_collections.insert(id, collection_state);
2062                }
2063                DataSource::IngestionExport {
2064                    ingestion_id,
2065                    details,
2066                    data_config,
2067                } => {
2068                    // Adjust the source to contain this export.
2069                    let source_collection = self_collections
2070                        .get_mut(ingestion_id)
2071                        .expect("known to exist");
2072                    match &mut source_collection.description {
2073                        CollectionDescription {
2074                            data_source: DataSource::Ingestion(ingestion_desc),
2075                            ..
2076                        } => ingestion_desc.source_exports.insert(
2077                            id,
2078                            SourceExport {
2079                                storage_metadata: (),
2080                                details: details.clone(),
2081                                data_config: data_config.clone(),
2082                            },
2083                        ),
2084                        _ => unreachable!(
2085                            "SourceExport must only refer to primary sources that already exist"
2086                        ),
2087                    };
2088
2089                    self_collections.insert(id, collection_state);
2090                }
2091                DataSource::Table { .. } => {
2092                    // See comment on self.initial_txn_upper on why we're doing
2093                    // this.
2094                    if is_in_txns(id, &metadata)
2095                        && PartialOrder::less_than(
2096                            &collection_state.write_frontier,
2097                            &self.initial_txn_upper,
2098                        )
2099                    {
2100                        // We could try and be cute and use the join of the txn
2101                        // upper and the table upper. But that has more
2102                        // complicated reasoning for why it is or isn't correct,
2103                        // and we're only dealing with totally ordered times
2104                        // here.
2105                        collection_state
2106                            .write_frontier
2107                            .clone_from(&self.initial_txn_upper);
2108                    }
2109                    self_collections.insert(id, collection_state);
2110                }
2111                DataSource::Progress | DataSource::Other => {
2112                    self_collections.insert(id, collection_state);
2113                }
2114                DataSource::Ingestion(_) => {
2115                    self_collections.insert(id, collection_state);
2116                }
2117                DataSource::Sink { .. } => {
2118                    self_collections.insert(id, collection_state);
2119                }
2120            }
2121
2122            self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle);
2123
2124            // If this collection has a dependency, install a read hold on it.
2125            self.install_collection_dependency_read_holds_inner(&mut *self_collections, id)?;
2126        }
2127
2128        drop(self_collections);
2129
2130        self.synchronize_finalized_shards(storage_metadata);
2131
2132        Ok(())
2133    }
2134
2135    async fn alter_ingestion_source_desc(
2136        &self,
2137        ingestion_id: GlobalId,
2138        source_desc: SourceDesc,
2139    ) -> Result<(), StorageError<Self::Timestamp>> {
2140        // The StorageController checks the validity of these. And we just
2141        // accept them.
2142
2143        let mut self_collections = self.collections.lock().expect("lock poisoned");
2144        let collection = self_collections
2145            .get_mut(&ingestion_id)
2146            .ok_or(StorageError::IdentifierMissing(ingestion_id))?;
2147
2148        let curr_ingestion = match &mut collection.description.data_source {
2149            DataSource::Ingestion(active_ingestion) => active_ingestion,
2150            _ => unreachable!("verified collection refers to ingestion"),
2151        };
2152
2153        curr_ingestion.desc = source_desc;
2154        debug!("altered {ingestion_id}'s SourceDesc");
2155
2156        Ok(())
2157    }
2158
2159    async fn alter_ingestion_export_data_configs(
2160        &self,
2161        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
2162    ) -> Result<(), StorageError<Self::Timestamp>> {
2163        let mut self_collections = self.collections.lock().expect("lock poisoned");
2164
2165        for (source_export_id, new_data_config) in source_exports {
2166            // We need to adjust the data config on the CollectionState for
2167            // the source export collection directly
2168            let source_export_collection = self_collections
2169                .get_mut(&source_export_id)
2170                .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2171            let ingestion_id = match &mut source_export_collection.description.data_source {
2172                DataSource::IngestionExport {
2173                    ingestion_id,
2174                    details: _,
2175                    data_config,
2176                } => {
2177                    *data_config = new_data_config.clone();
2178                    *ingestion_id
2179                }
2180                o => {
2181                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2182                    Err(StorageError::IdentifierInvalid(source_export_id))?
2183                }
2184            };
2185            // We also need to adjust the data config on the CollectionState of the
2186            // Ingestion that the export is associated with.
2187            let ingestion_collection = self_collections
2188                .get_mut(&ingestion_id)
2189                .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
2190
2191            match &mut ingestion_collection.description.data_source {
2192                DataSource::Ingestion(ingestion_desc) => {
2193                    let source_export = ingestion_desc
2194                        .source_exports
2195                        .get_mut(&source_export_id)
2196                        .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2197
2198                    if source_export.data_config != new_data_config {
2199                        tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
2200                        source_export.data_config = new_data_config;
2201                    } else {
2202                        tracing::warn!(
2203                            "alter_ingestion_export_data_configs called on \
2204                                    export {source_export_id} of {ingestion_id} but \
2205                                    the data config was the same"
2206                        );
2207                    }
2208                }
2209                o => {
2210                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2211                    Err(StorageError::IdentifierInvalid(ingestion_id))?;
2212                }
2213            }
2214        }
2215
2216        Ok(())
2217    }
2218
2219    async fn alter_ingestion_connections(
2220        &self,
2221        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
2222    ) -> Result<(), StorageError<Self::Timestamp>> {
2223        let mut self_collections = self.collections.lock().expect("lock poisoned");
2224
2225        for (id, conn) in source_connections {
2226            let collection = self_collections
2227                .get_mut(&id)
2228                .ok_or_else(|| StorageError::IdentifierMissing(id))?;
2229
2230            match &mut collection.description.data_source {
2231                DataSource::Ingestion(ingestion) => {
2232                    // If the connection hasn't changed, there's no sense in
2233                    // re-rendering the dataflow.
2234                    if ingestion.desc.connection != conn {
2235                        info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
2236                        ingestion.desc.connection = conn;
2237                    } else {
2238                        warn!(
2239                            "update_source_connection called on {id} but the \
2240                            connection was the same"
2241                        );
2242                    }
2243                }
2244                o => {
2245                    warn!("update_source_connection called on {:?}", o);
2246                    Err(StorageError::IdentifierInvalid(id))?;
2247                }
2248            }
2249        }
2250
2251        Ok(())
2252    }
2253
2254    async fn alter_table_desc(
2255        &self,
2256        existing_collection: GlobalId,
2257        new_collection: GlobalId,
2258        new_desc: RelationDesc,
2259        expected_version: RelationVersion,
2260    ) -> Result<(), StorageError<Self::Timestamp>> {
2261        let data_shard = {
2262            let self_collections = self.collections.lock().expect("lock poisoned");
2263            let existing = self_collections
2264                .get(&existing_collection)
2265                .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;
2266
2267            // TODO(alter_table): Support changes to sources.
2268            if !matches!(&existing.description.data_source, DataSource::Table { .. }) {
2269                return Err(StorageError::IdentifierInvalid(existing_collection));
2270            }
2271
2272            existing.collection_metadata.data_shard
2273        };
2274
2275        let persist_client = self
2276            .persist
2277            .open(self.persist_location.clone())
2278            .await
2279            .unwrap();
2280
2281        // Evolve the schema of this shard.
2282        let diagnostics = Diagnostics {
2283            shard_name: existing_collection.to_string(),
2284            handle_purpose: "alter_table_desc".to_string(),
2285        };
2286        // We map the Adapter's RelationVersion 1:1 with SchemaId.
2287        let expected_schema = expected_version.into();
2288        let schema_result = persist_client
2289            .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
2290                data_shard,
2291                expected_schema,
2292                &new_desc,
2293                &UnitSchema,
2294                diagnostics,
2295            )
2296            .await
2297            .map_err(|e| StorageError::InvalidUsage(e.to_string()))?;
2298        tracing::info!(
2299            ?existing_collection,
2300            ?new_collection,
2301            ?new_desc,
2302            "evolved schema"
2303        );
2304
2305        match schema_result {
2306            CaESchema::Ok(id) => id,
2307            // TODO(alter_table): If we get an expected mismatch we should retry.
2308            CaESchema::ExpectedMismatch {
2309                schema_id,
2310                key,
2311                val,
2312            } => {
2313                mz_ore::soft_panic_or_log!(
2314                    "schema expectation mismatch {schema_id:?}, {key:?}, {val:?}"
2315                );
2316                return Err(StorageError::Generic(anyhow::anyhow!(
2317                    "schema expected mismatch, {existing_collection:?}",
2318                )));
2319            }
2320            CaESchema::Incompatible => {
2321                mz_ore::soft_panic_or_log!(
2322                    "incompatible schema! {existing_collection} {new_desc:?}"
2323                );
2324                return Err(StorageError::Generic(anyhow::anyhow!(
2325                    "schema incompatible, {existing_collection:?}"
2326                )));
2327            }
2328        };
2329
2330        // Once the new schema is registered we can open new data handles.
2331        let (write_handle, since_handle) = self
2332            .open_data_handles(
2333                &new_collection,
2334                data_shard,
2335                None,
2336                new_desc.clone(),
2337                &persist_client,
2338            )
2339            .await;
2340
2341        // TODO(alter_table): Do we need to advance the since of the table to match the time this
2342        // new version was registered with txn-wal?
2343
2344        // Great! Our new schema is registered with Persist, now we need to update our internal
2345        // data structures.
2346        {
2347            let mut self_collections = self.collections.lock().expect("lock poisoned");
2348
2349            // Update the existing collection so we know it's a "projection" of this new one.
2350            let existing = self_collections
2351                .get_mut(&existing_collection)
2352                .expect("existing collection missing");
2353
2354            // A higher level should already be asserting this, but let's make sure.
2355            assert!(matches!(
2356                existing.description.data_source,
2357                DataSource::Table { primary: None }
2358            ));
2359
2360            // The existing version of the table will depend on the new version.
2361            existing.description.data_source = DataSource::Table {
2362                primary: Some(new_collection),
2363            };
2364            existing.storage_dependencies.push(new_collection);
2365
2366            // Copy over the frontiers from the previous version.
2367            // The new table starts with two holds - the implied capability, and the hold from
2368            // the previous version - both at the previous version's read frontier.
2369            let implied_capability = existing.read_capabilities.frontier().to_owned();
2370            let write_frontier = existing.write_frontier.clone();
2371
2372            // Determine the relevant read capabilities on the new collection.
2373            //
2374            // Note(parkmycar): Originally we used `install_collection_dependency_read_holds_inner`
2375            // here, but that only installed a ReadHold on the new collection for the implied
2376            // capability of the existing collection. This would cause runtime panics because it
2377            // would eventually result in negative read capabilities.
2378            let mut changes = ChangeBatch::new();
2379            changes.extend(implied_capability.iter().map(|t| (t.clone(), 1)));
2380
2381            // Note: The new collection is now the "primary collection" so we specify `None` here.
2382            let collection_desc = CollectionDescription::for_table(new_desc.clone(), None);
2383            let collection_meta = CollectionMetadata {
2384                persist_location: self.persist_location.clone(),
2385                relation_desc: collection_desc.desc.clone(),
2386                data_shard,
2387                txns_shard: Some(self.txns_read.txns_id().clone()),
2388            };
2389            let collection_state = CollectionState::new(
2390                collection_desc,
2391                implied_capability,
2392                write_frontier,
2393                Vec::new(),
2394                collection_meta,
2395            );
2396
2397            // Add a record of the new collection.
2398            self_collections.insert(new_collection, collection_state);
2399
2400            let mut updates = BTreeMap::from([(new_collection, changes)]);
2401            StorageCollectionsImpl::update_read_capabilities_inner(
2402                &self.cmd_tx,
2403                &mut *self_collections,
2404                &mut updates,
2405            );
2406        };
2407
2408        // TODO(alter_table): Support changes to sources.
2409        self.register_handles(new_collection, true, since_handle, write_handle);
2410
2411        info!(%existing_collection, %new_collection, ?new_desc, "altered table");
2412
2413        Ok(())
2414    }
2415
2416    fn drop_collections_unvalidated(
2417        &self,
2418        storage_metadata: &StorageMetadata,
2419        identifiers: Vec<GlobalId>,
2420    ) {
2421        debug!(?identifiers, "drop_collections_unvalidated");
2422
2423        let mut self_collections = self.collections.lock().expect("lock poisoned");
2424
2425        for id in identifiers.iter() {
2426            let metadata = storage_metadata.get_collection_shard::<T>(*id);
2427            mz_ore::soft_assert_or_log!(
2428                matches!(metadata, Err(StorageError::IdentifierMissing(_))),
2429                "dropping {id}, but drop was not synchronized with storage \
2430                controller via `synchronize_collections`"
2431            );
2432
2433            let dropped_data_source = match self_collections.get(id) {
2434                Some(col) => col.description.data_source.clone(),
2435                None => continue,
2436            };
2437
2438            // If we are dropping source exports, we need to modify the
2439            // ingestion that it runs on.
2440            if let DataSource::IngestionExport { ingestion_id, .. } = dropped_data_source {
2441                // Adjust the source to remove this export.
2442                let ingestion = match self_collections.get_mut(&ingestion_id) {
2443                    Some(ingestion) => ingestion,
2444                    // Primary ingestion already dropped.
2445                    None => {
2446                        tracing::error!(
2447                            "primary source {ingestion_id} seemingly dropped before subsource {id}",
2448                        );
2449                        continue;
2450                    }
2451                };
2452
2453                match &mut ingestion.description {
2454                    CollectionDescription {
2455                        data_source: DataSource::Ingestion(ingestion_desc),
2456                        ..
2457                    } => {
2458                        let removed = ingestion_desc.source_exports.remove(id);
2459                        mz_ore::soft_assert_or_log!(
2460                            removed.is_some(),
2461                            "dropped subsource {id} already removed from source exports"
2462                        );
2463                    }
2464                    _ => unreachable!(
2465                        "SourceExport must only refer to primary sources that already exist"
2466                    ),
2467                };
2468            }
2469        }
2470
2471        // Policies that advance the since to the empty antichain. We do still
2472        // honor outstanding read holds, and collections will only be dropped
2473        // once those are removed as well.
2474        //
2475        // We don't explicitly remove read capabilities! Downgrading the
2476        // frontier of the source to `[]` (the empty Antichain), will propagate
2477        // to the storage dependencies.
2478        let mut finalized_policies = Vec::new();
2479
2480        for id in identifiers {
2481            // Make sure it's still there, might already have been deleted.
2482            if self_collections.contains_key(&id) {
2483                finalized_policies.push((id, ReadPolicy::ValidFrom(Antichain::new())));
2484            }
2485        }
2486        self.set_read_policies_inner(&mut self_collections, finalized_policies);
2487
2488        drop(self_collections);
2489
2490        self.synchronize_finalized_shards(storage_metadata);
2491    }
2492
2493    fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
2494        let mut collections = self.collections.lock().expect("lock poisoned");
2495
2496        if tracing::enabled!(tracing::Level::TRACE) {
2497            let user_capabilities = collections
2498                .iter_mut()
2499                .filter(|(id, _c)| id.is_user())
2500                .map(|(id, c)| {
2501                    let updates = c.read_capabilities.updates().cloned().collect_vec();
2502                    (*id, c.implied_capability.clone(), updates)
2503                })
2504                .collect_vec();
2505
2506            trace!(?policies, ?user_capabilities, "set_read_policies");
2507        }
2508
2509        self.set_read_policies_inner(&mut collections, policies);
2510
2511        if tracing::enabled!(tracing::Level::TRACE) {
2512            let user_capabilities = collections
2513                .iter_mut()
2514                .filter(|(id, _c)| id.is_user())
2515                .map(|(id, c)| {
2516                    let updates = c.read_capabilities.updates().cloned().collect_vec();
2517                    (*id, c.implied_capability.clone(), updates)
2518                })
2519                .collect_vec();
2520
2521            trace!(?user_capabilities, "after! set_read_policies");
2522        }
2523    }
2524
2525    fn acquire_read_holds(
2526        &self,
2527        desired_holds: Vec<GlobalId>,
2528    ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError> {
2529        if desired_holds.is_empty() {
2530            return Ok(vec![]);
2531        }
2532
2533        let mut collections = self.collections.lock().expect("lock poisoned");
2534
2535        let mut advanced_holds = Vec::new();
2536        // We advance the holds by our current since frontier. Can't acquire
2537        // holds for times that have been compacted away!
2538        //
2539        // NOTE: We acquire read holds at the earliest possible time rather than
2540        // at the implied capability. This is so that, for example, adapter can
2541        // acquire a read hold to hold back the frontier, giving the COMPUTE
2542        // controller a chance to also acquire a read hold at that early
2543        // frontier. If/when we change the interplay between adapter and COMPUTE
2544        // to pass around ReadHold tokens, we might tighten this up and instead
2545        // acquire read holds at the implied capability.
2546        for id in desired_holds.iter() {
2547            let collection = collections
2548                .get(id)
2549                .ok_or(ReadHoldError::CollectionMissing(*id))?;
2550            let since = collection.read_capabilities.frontier().to_owned();
2551            advanced_holds.push((*id, since));
2552        }
2553
2554        let mut updates = advanced_holds
2555            .iter()
2556            .map(|(id, hold)| {
2557                let mut changes = ChangeBatch::new();
2558                changes.extend(hold.iter().map(|time| (time.clone(), 1)));
2559                (*id, changes)
2560            })
2561            .collect::<BTreeMap<_, _>>();
2562
2563        StorageCollectionsImpl::update_read_capabilities_inner(
2564            &self.cmd_tx,
2565            &mut collections,
2566            &mut updates,
2567        );
2568
2569        let acquired_holds = advanced_holds
2570            .into_iter()
2571            .map(|(id, since)| ReadHold::with_channel(id, since, self.holds_tx.clone()))
2572            .collect_vec();
2573
2574        trace!(?desired_holds, ?acquired_holds, "acquire_read_holds");
2575
2576        Ok(acquired_holds)
2577    }
2578
2579    /// Determine time dependence information for the object.
2580    fn determine_time_dependence(
2581        &self,
2582        id: GlobalId,
2583    ) -> Result<Option<TimeDependence>, TimeDependenceError> {
2584        use TimeDependenceError::CollectionMissing;
2585        let collections = self.collections.lock().expect("lock poisoned");
2586        let mut collection = Some(collections.get(&id).ok_or(CollectionMissing(id))?);
2587
2588        let mut result = None;
2589
2590        while let Some(c) = collection.take() {
2591            use DataSource::*;
2592            if let Some(timeline) = &c.description.timeline {
2593                // Only the epoch timeline follows wall-clock.
2594                if *timeline != Timeline::EpochMilliseconds {
2595                    break;
2596                }
2597            }
2598            match &c.description.data_source {
2599                Ingestion(ingestion) => {
2600                    use GenericSourceConnection::*;
2601                    match ingestion.desc.connection {
2602                        // Kafka, Postgres, MySql, and SQL Server sources all
2603                        // follow wall clock.
2604                        Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => {
2605                            result = Some(TimeDependence::default())
2606                        }
2607                        // Load generators not further specified.
2608                        LoadGenerator(_) => {}
2609                    }
2610                }
2611                IngestionExport { ingestion_id, .. } => {
2612                    let c = collections
2613                        .get(ingestion_id)
2614                        .ok_or(CollectionMissing(*ingestion_id))?;
2615                    collection = Some(c);
2616                }
2617                // Introspection, other, progress, table, and webhook sources follow wall clock.
2618                Introspection(_) | Progress | Table { .. } | Webhook { .. } => {
2619                    result = Some(TimeDependence::default())
2620                }
2621                // Materialized views, continual tasks, etc, aren't managed by storage.
2622                Other => {}
2623                Sink { .. } => {}
2624            };
2625        }
2626        Ok(result)
2627    }
2628}
2629
2630/// Wraps either a "critical" [SinceHandle] or a leased [ReadHandle].
2631///
2632/// When a [StorageCollections] is in read-only mode, we will only ever acquire
2633/// [ReadHandle], because acquiring the [SinceHandle] and driving forward its
2634/// since is considered a write. Conversely, when in read-write mode, we acquire
2635/// [SinceHandle].
2636#[derive(Debug)]
2637enum SinceHandleWrapper<T>
2638where
2639    T: TimelyTimestamp + Lattice + Codec64,
2640{
2641    Critical(SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch>),
2642    Leased(ReadHandle<SourceData, (), T, StorageDiff>),
2643}
2644
2645impl<T> SinceHandleWrapper<T>
2646where
2647    T: TimelyTimestamp + Lattice + Codec64 + TotalOrder + Sync,
2648{
2649    pub fn since(&self) -> &Antichain<T> {
2650        match self {
2651            Self::Critical(handle) => handle.since(),
2652            Self::Leased(handle) => handle.since(),
2653        }
2654    }
2655
2656    pub fn opaque(&self) -> PersistEpoch {
2657        match self {
2658            Self::Critical(handle) => handle.opaque().clone(),
2659            Self::Leased(_handle) => {
2660                // The opaque is expected to be used with
2661                // `compare_and_downgrade_since`, and the leased handle doesn't
2662                // have a notion of an opaque. We pretend here and in
2663                // `compare_and_downgrade_since`.
2664                PersistEpoch(None)
2665            }
2666        }
2667    }
2668
2669    pub async fn compare_and_downgrade_since(
2670        &mut self,
2671        expected: &PersistEpoch,
2672        new: (&PersistEpoch, &Antichain<T>),
2673    ) -> Result<Antichain<T>, PersistEpoch> {
2674        match self {
2675            Self::Critical(handle) => handle.compare_and_downgrade_since(expected, new).await,
2676            Self::Leased(handle) => {
2677                let (opaque, since) = new;
2678                assert_none!(opaque.0);
2679
2680                handle.downgrade_since(since).await;
2681
2682                Ok(since.clone())
2683            }
2684        }
2685    }
2686
2687    pub async fn maybe_compare_and_downgrade_since(
2688        &mut self,
2689        expected: &PersistEpoch,
2690        new: (&PersistEpoch, &Antichain<T>),
2691    ) -> Option<Result<Antichain<T>, PersistEpoch>> {
2692        match self {
2693            Self::Critical(handle) => {
2694                handle
2695                    .maybe_compare_and_downgrade_since(expected, new)
2696                    .await
2697            }
2698            Self::Leased(handle) => {
2699                let (opaque, since) = new;
2700                assert_none!(opaque.0);
2701
2702                handle.maybe_downgrade_since(since).await;
2703
2704                Some(Ok(since.clone()))
2705            }
2706        }
2707    }
2708
2709    pub fn snapshot_stats(
2710        &self,
2711        id: GlobalId,
2712        as_of: Option<Antichain<T>>,
2713    ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2714        match self {
2715            Self::Critical(handle) => {
2716                let res = handle
2717                    .snapshot_stats(as_of)
2718                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2719                Box::pin(res)
2720            }
2721            Self::Leased(handle) => {
2722                let res = handle
2723                    .snapshot_stats(as_of)
2724                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2725                Box::pin(res)
2726            }
2727        }
2728    }
2729
2730    pub fn snapshot_stats_from_txn(
2731        &self,
2732        id: GlobalId,
2733        data_snapshot: DataSnapshot<T>,
2734    ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2735        match self {
2736            Self::Critical(handle) => Box::pin(
2737                data_snapshot
2738                    .snapshot_stats_from_critical(handle)
2739                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2740            ),
2741            Self::Leased(handle) => Box::pin(
2742                data_snapshot
2743                    .snapshot_stats_from_leased(handle)
2744                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2745            ),
2746        }
2747    }
2748}
2749
2750/// State maintained about individual collections.
2751#[derive(Debug, Clone)]
2752struct CollectionState<T> {
2753    /// Description with which the collection was created
2754    pub description: CollectionDescription<T>,
2755
2756    /// Accumulation of read capabilities for the collection.
2757    ///
2758    /// This accumulation will always contain `self.implied_capability`, but may
2759    /// also contain capabilities held by others who have read dependencies on
2760    /// this collection.
2761    pub read_capabilities: MutableAntichain<T>,
2762
2763    /// The implicit capability associated with collection creation.  This
2764    /// should never be less than the since of the associated persist
2765    /// collection.
2766    pub implied_capability: Antichain<T>,
2767
2768    /// The policy to use to downgrade `self.implied_capability`.
2769    pub read_policy: ReadPolicy<T>,
2770
2771    /// Storage identifiers on which this collection depends.
2772    pub storage_dependencies: Vec<GlobalId>,
2773
2774    /// Reported write frontier.
2775    pub write_frontier: Antichain<T>,
2776
2777    pub collection_metadata: CollectionMetadata,
2778}
2779
2780impl<T: TimelyTimestamp> CollectionState<T> {
2781    /// Creates a new collection state, with an initial read policy valid from
2782    /// `since`.
2783    pub fn new(
2784        description: CollectionDescription<T>,
2785        since: Antichain<T>,
2786        write_frontier: Antichain<T>,
2787        storage_dependencies: Vec<GlobalId>,
2788        metadata: CollectionMetadata,
2789    ) -> Self {
2790        let mut read_capabilities = MutableAntichain::new();
2791        read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2792        Self {
2793            description,
2794            read_capabilities,
2795            implied_capability: since.clone(),
2796            read_policy: ReadPolicy::NoPolicy {
2797                initial_since: since,
2798            },
2799            storage_dependencies,
2800            write_frontier,
2801            collection_metadata: metadata,
2802        }
2803    }
2804
2805    /// Returns whether the collection was dropped.
2806    pub fn is_dropped(&self) -> bool {
2807        self.read_capabilities.is_empty()
2808    }
2809}
2810
2811/// A task that keeps persist handles, downgrades sinces when asked,
2812/// periodically gets recent uppers from them, and updates the shard collection
2813/// state when needed.
2814///
2815/// This shares state with [StorageCollectionsImpl] via `Arcs` and channels.
2816#[derive(Debug)]
2817struct BackgroundTask<T: TimelyTimestamp + Lattice + Codec64> {
2818    config: Arc<Mutex<StorageConfiguration>>,
2819    cmds_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
2820    cmds_rx: mpsc::UnboundedReceiver<BackgroundCmd<T>>,
2821    holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch<T>)>,
2822    finalizable_shards: Arc<ShardIdSet>,
2823    collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
2824    // So we know what shard ID corresponds to what global ID, which we need
2825    // when re-enqueing futures for determining the next upper update.
2826    shard_by_id: BTreeMap<GlobalId, ShardId>,
2827    since_handles: BTreeMap<GlobalId, SinceHandleWrapper<T>>,
2828    txns_handle: Option<WriteHandle<SourceData, (), T, StorageDiff>>,
2829    txns_shards: BTreeSet<GlobalId>,
2830}
2831
2832#[derive(Debug)]
2833enum BackgroundCmd<T: TimelyTimestamp + Lattice + Codec64> {
2834    Register {
2835        id: GlobalId,
2836        is_in_txns: bool,
2837        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
2838        since_handle: SinceHandleWrapper<T>,
2839    },
2840    DowngradeSince(Vec<(GlobalId, Antichain<T>)>),
2841    SnapshotStats(
2842        GlobalId,
2843        SnapshotStatsAsOf<T>,
2844        oneshot::Sender<SnapshotStatsRes<T>>,
2845    ),
2846}
2847
2848/// A newtype wrapper to hang a Debug impl off of.
2849pub(crate) struct SnapshotStatsRes<T>(BoxFuture<'static, Result<SnapshotStats, StorageError<T>>>);
2850
2851impl<T> Debug for SnapshotStatsRes<T> {
2852    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2853        f.debug_struct("SnapshotStatsRes").finish_non_exhaustive()
2854    }
2855}
2856
2857impl<T> BackgroundTask<T>
2858where
2859    T: TimelyTimestamp
2860        + Lattice
2861        + Codec64
2862        + From<EpochMillis>
2863        + TimestampManipulation
2864        + Into<mz_repr::Timestamp>
2865        + Sync,
2866{
2867    async fn run(&mut self) {
2868        // Futures that fetch the recent upper from all other shards.
2869        let mut upper_futures: FuturesUnordered<
2870            std::pin::Pin<
2871                Box<
2872                    dyn Future<
2873                            Output = (
2874                                GlobalId,
2875                                WriteHandle<SourceData, (), T, StorageDiff>,
2876                                Antichain<T>,
2877                            ),
2878                        > + Send,
2879                >,
2880            >,
2881        > = FuturesUnordered::new();
2882
2883        let gen_upper_future =
2884            |id, mut handle: WriteHandle<_, _, _, _>, prev_upper: Antichain<T>| {
2885                let fut = async move {
2886                    soft_assert_or_log!(
2887                        !prev_upper.is_empty(),
2888                        "cannot await progress when upper is already empty"
2889                    );
2890                    handle.wait_for_upper_past(&prev_upper).await;
2891                    let new_upper = handle.shared_upper();
2892                    (id, handle, new_upper)
2893                };
2894
2895                fut
2896            };
2897
2898        let mut txns_upper_future = match self.txns_handle.take() {
2899            Some(txns_handle) => {
2900                let upper = txns_handle.upper().clone();
2901                let txns_upper_future =
2902                    gen_upper_future(GlobalId::Transient(1), txns_handle, upper);
2903                txns_upper_future.boxed()
2904            }
2905            None => async { std::future::pending().await }.boxed(),
2906        };
2907
2908        loop {
2909            tokio::select! {
2910                (id, handle, upper) = &mut txns_upper_future => {
2911                    trace!("new upper from txns shard: {:?}", upper);
2912                    let mut uppers = Vec::new();
2913                    for id in self.txns_shards.iter() {
2914                        uppers.push((*id, &upper));
2915                    }
2916                    self.update_write_frontiers(&uppers).await;
2917
2918                    let fut = gen_upper_future(id, handle, upper);
2919                    txns_upper_future = fut.boxed();
2920                }
2921                Some((id, handle, upper)) = upper_futures.next() => {
2922                    if id.is_user() {
2923                        trace!("new upper for collection {id}: {:?}", upper);
2924                    }
2925                    let current_shard = self.shard_by_id.get(&id);
2926                    if let Some(shard_id) = current_shard {
2927                        if shard_id == &handle.shard_id() {
2928                            // Still current, so process the update and enqueue
2929                            // again!
2930                            let uppers = &[(id, &upper)];
2931                            self.update_write_frontiers(uppers).await;
2932                            if !upper.is_empty() {
2933                                let fut = gen_upper_future(id, handle, upper);
2934                                upper_futures.push(fut.boxed());
2935                            }
2936                        } else {
2937                            // Be polite and expire the write handle. This can
2938                            // happen when we get an upper update for a write
2939                            // handle that has since been replaced via Update.
2940                            handle.expire().await;
2941                        }
2942                    }
2943                }
2944                cmd = self.cmds_rx.recv() => {
2945                    let cmd = if let Some(cmd) = cmd {
2946                        cmd
2947                    } else {
2948                        // We're done!
2949                        break;
2950                    };
2951
2952                    match cmd {
2953                        BackgroundCmd::Register{ id, is_in_txns, write_handle, since_handle } => {
2954                            debug!("registering handles for {}", id);
2955                            let previous = self.shard_by_id.insert(id, write_handle.shard_id());
2956                            if previous.is_some() {
2957                                panic!("already registered a WriteHandle for collection {id}");
2958                            }
2959
2960                            let previous = self.since_handles.insert(id, since_handle);
2961                            if previous.is_some() {
2962                                panic!("already registered a SinceHandle for collection {id}");
2963                            }
2964
2965                            if is_in_txns {
2966                                self.txns_shards.insert(id);
2967                            } else {
2968                                let upper = write_handle.upper().clone();
2969                                if !upper.is_empty() {
2970                                    let fut = gen_upper_future(id, write_handle, upper);
2971                                    upper_futures.push(fut.boxed());
2972                                }
2973                            }
2974
2975                        }
2976                        BackgroundCmd::DowngradeSince(cmds) => {
2977                            self.downgrade_sinces(cmds).await;
2978                        }
2979                        BackgroundCmd::SnapshotStats(id, as_of, tx) => {
2980                            // NB: The requested as_of could be arbitrarily far
2981                            // in the future. So, in order to avoid blocking
2982                            // this loop until it's available and the
2983                            // `snapshot_stats` call resolves, instead return
2984                            // the future to the caller and await it there.
2985                            let res = match self.since_handles.get(&id) {
2986                                Some(x) => {
2987                                    let fut: BoxFuture<
2988                                        'static,
2989                                        Result<SnapshotStats, StorageError<T>>,
2990                                    > = match as_of {
2991                                        SnapshotStatsAsOf::Direct(as_of) => {
2992                                            x.snapshot_stats(id, Some(as_of))
2993                                        }
2994                                        SnapshotStatsAsOf::Txns(data_snapshot) => {
2995                                            x.snapshot_stats_from_txn(id, data_snapshot)
2996                                        }
2997                                    };
2998                                    SnapshotStatsRes(fut)
2999                                }
3000                                None => SnapshotStatsRes(Box::pin(futures::future::ready(Err(
3001                                    StorageError::IdentifierMissing(id),
3002                                )))),
3003                            };
3004                            // It's fine if the listener hung up.
3005                            let _ = tx.send(res);
3006                        }
3007                    }
3008                }
3009                Some(holds_changes) = self.holds_rx.recv() => {
3010                    let mut batched_changes = BTreeMap::new();
3011                    batched_changes.insert(holds_changes.0, holds_changes.1);
3012
3013                    while let Ok(mut holds_changes) = self.holds_rx.try_recv() {
3014                        let entry = batched_changes.entry(holds_changes.0);
3015                        entry
3016                            .and_modify(|existing| existing.extend(holds_changes.1.drain()))
3017                            .or_insert_with(|| holds_changes.1);
3018                    }
3019
3020                    let mut collections = self.collections.lock().expect("lock poisoned");
3021
3022                    let user_changes = batched_changes
3023                        .iter()
3024                        .filter(|(id, _c)| id.is_user())
3025                        .map(|(id, c)| {
3026                            (id.clone(), c.clone())
3027                        })
3028                        .collect_vec();
3029
3030                    if !user_changes.is_empty() {
3031                        trace!(?user_changes, "applying holds changes from channel");
3032                    }
3033
3034                    StorageCollectionsImpl::update_read_capabilities_inner(
3035                        &self.cmds_tx,
3036                        &mut collections,
3037                        &mut batched_changes,
3038                    );
3039                }
3040            }
3041        }
3042
3043        warn!("BackgroundTask shutting down");
3044    }
3045
3046    #[instrument(level = "debug")]
3047    async fn update_write_frontiers(&mut self, updates: &[(GlobalId, &Antichain<T>)]) {
3048        let mut read_capability_changes = BTreeMap::default();
3049
3050        let mut self_collections = self.collections.lock().expect("lock poisoned");
3051
3052        for (id, new_upper) in updates.iter() {
3053            let collection = if let Some(c) = self_collections.get_mut(id) {
3054                c
3055            } else {
3056                trace!(
3057                    "Reference to absent collection {id}, due to concurrent removal of that collection"
3058                );
3059                continue;
3060            };
3061
3062            if PartialOrder::less_than(&collection.write_frontier, *new_upper) {
3063                collection.write_frontier.clone_from(new_upper);
3064            }
3065
3066            let mut new_read_capability = collection
3067                .read_policy
3068                .frontier(collection.write_frontier.borrow());
3069
3070            if id.is_user() {
3071                trace!(
3072                    %id,
3073                    implied_capability = ?collection.implied_capability,
3074                    policy = ?collection.read_policy,
3075                    write_frontier = ?collection.write_frontier,
3076                    ?new_read_capability,
3077                    "update_write_frontiers");
3078            }
3079
3080            if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
3081                let mut update = ChangeBatch::new();
3082                update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
3083                std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
3084                update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
3085
3086                if !update.is_empty() {
3087                    read_capability_changes.insert(*id, update);
3088                }
3089            }
3090        }
3091
3092        if !read_capability_changes.is_empty() {
3093            StorageCollectionsImpl::update_read_capabilities_inner(
3094                &self.cmds_tx,
3095                &mut self_collections,
3096                &mut read_capability_changes,
3097            );
3098        }
3099    }
3100
3101    async fn downgrade_sinces(&mut self, cmds: Vec<(GlobalId, Antichain<T>)>) {
3102        for (id, new_since) in cmds {
3103            let since_handle = if let Some(c) = self.since_handles.get_mut(&id) {
3104                c
3105            } else {
3106                // This can happen when someone concurrently drops a collection.
3107                trace!("downgrade_sinces: reference to absent collection {id}");
3108                continue;
3109            };
3110
3111            if id.is_user() {
3112                trace!("downgrading since of {} to {:?}", id, new_since);
3113            }
3114
3115            let epoch = since_handle.opaque().clone();
3116            let result = if new_since.is_empty() {
3117                // A shard's since reaching the empty frontier is a prereq for
3118                // being able to finalize a shard, so the final downgrade should
3119                // never be rate-limited.
3120                let res = Some(
3121                    since_handle
3122                        .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3123                        .await,
3124                );
3125
3126                info!(%id, "removing persist handles because the since advanced to []!");
3127
3128                let _since_handle = self.since_handles.remove(&id).expect("known to exist");
3129                let dropped_shard_id = if let Some(shard_id) = self.shard_by_id.remove(&id) {
3130                    shard_id
3131                } else {
3132                    panic!("missing GlobalId -> ShardId mapping for id {id}");
3133                };
3134
3135                // We're not responsible for writes to tables, so we also don't
3136                // de-register them from the txn system. Whoever is responsible
3137                // will remove them. We only make sure to remove the table from
3138                // our tracking.
3139                self.txns_shards.remove(&id);
3140
3141                if !self
3142                    .config
3143                    .lock()
3144                    .expect("lock poisoned")
3145                    .parameters
3146                    .finalize_shards
3147                {
3148                    info!(
3149                        "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3150                    );
3151                    return;
3152                }
3153
3154                info!(%id, %dropped_shard_id, "enqueing shard finalization due to dropped collection and dropped persist handle");
3155
3156                self.finalizable_shards.lock().insert(dropped_shard_id);
3157
3158                res
3159            } else {
3160                since_handle
3161                    .maybe_compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3162                    .await
3163            };
3164
3165            if let Some(Err(other_epoch)) = result {
3166                mz_ore::halt!("fenced by envd @ {other_epoch:?}. ours = {epoch:?}");
3167            }
3168        }
3169    }
3170}
3171
3172struct FinalizeShardsTaskConfig {
3173    envd_epoch: NonZeroI64,
3174    config: Arc<Mutex<StorageConfiguration>>,
3175    metrics: StorageCollectionsMetrics,
3176    finalizable_shards: Arc<ShardIdSet>,
3177    finalized_shards: Arc<ShardIdSet>,
3178    persist_location: PersistLocation,
3179    persist: Arc<PersistClientCache>,
3180    read_only: bool,
3181}
3182
3183async fn finalize_shards_task<T>(
3184    FinalizeShardsTaskConfig {
3185        envd_epoch,
3186        config,
3187        metrics,
3188        finalizable_shards,
3189        finalized_shards,
3190        persist_location,
3191        persist,
3192        read_only,
3193    }: FinalizeShardsTaskConfig,
3194) where
3195    T: TimelyTimestamp + TotalOrder + Lattice + Codec64 + Sync,
3196{
3197    if read_only {
3198        info!("disabling shard finalization in read only mode");
3199        return;
3200    }
3201
3202    let mut interval = tokio::time::interval(Duration::from_secs(5));
3203    interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
3204    loop {
3205        interval.tick().await;
3206
3207        if !config
3208            .lock()
3209            .expect("lock poisoned")
3210            .parameters
3211            .finalize_shards
3212        {
3213            debug!(
3214                "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3215            );
3216            continue;
3217        }
3218
3219        let current_finalizable_shards = {
3220            // We hold the lock for as short as possible and pull our cloned set
3221            // of shards.
3222            finalizable_shards.lock().iter().cloned().collect_vec()
3223        };
3224
3225        if current_finalizable_shards.is_empty() {
3226            debug!("no shards to finalize");
3227            continue;
3228        }
3229
3230        debug!(?current_finalizable_shards, "attempting to finalize shards");
3231
3232        // Open a persist client to delete unused shards.
3233        let persist_client = persist.open(persist_location.clone()).await.unwrap();
3234
3235        let metrics = &metrics;
3236        let finalizable_shards = &finalizable_shards;
3237        let finalized_shards = &finalized_shards;
3238        let persist_client = &persist_client;
3239        let diagnostics = &Diagnostics::from_purpose("finalizing shards");
3240
3241        let force_downgrade_since = STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION
3242            .get(config.lock().expect("lock poisoned").config_set());
3243
3244        let epoch = &PersistEpoch::from(envd_epoch);
3245
3246        futures::stream::iter(current_finalizable_shards.clone())
3247            .map(|shard_id| async move {
3248                let persist_client = persist_client.clone();
3249                let diagnostics = diagnostics.clone();
3250                let epoch = epoch.clone();
3251
3252                metrics.finalization_started.inc();
3253
3254                let is_finalized = persist_client
3255                    .is_finalized::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
3256                    .await
3257                    .expect("invalid persist usage");
3258
3259                if is_finalized {
3260                    debug!(%shard_id, "shard is already finalized!");
3261                    Some(shard_id)
3262                } else {
3263                    debug!(%shard_id, "finalizing shard");
3264                    let finalize = || async move {
3265                        // TODO: thread the global ID into the shard finalization WAL
3266                        let diagnostics = Diagnostics::from_purpose("finalizing shards");
3267
3268                        // We only use the writer to advance the upper, so using a dummy schema is
3269                        // fine.
3270                        let mut write_handle: WriteHandle<SourceData, (), T, StorageDiff> =
3271                            persist_client
3272                                .open_writer(
3273                                    shard_id,
3274                                    Arc::new(RelationDesc::empty()),
3275                                    Arc::new(UnitSchema),
3276                                    diagnostics,
3277                                )
3278                                .await
3279                                .expect("invalid persist usage");
3280                        write_handle.advance_upper(&Antichain::new()).await;
3281                        write_handle.expire().await;
3282
3283                        if force_downgrade_since {
3284                            let mut since_handle: SinceHandle<
3285                                SourceData,
3286                                (),
3287                                T,
3288                                StorageDiff,
3289                                PersistEpoch,
3290                            > = persist_client
3291                                .open_critical_since(
3292                                    shard_id,
3293                                    PersistClient::CONTROLLER_CRITICAL_SINCE,
3294                                    Diagnostics::from_purpose("finalizing shards"),
3295                                )
3296                                .await
3297                                .expect("invalid persist usage");
3298                            let handle_epoch = since_handle.opaque().clone();
3299                            let our_epoch = epoch.clone();
3300                            let epoch = if our_epoch.0 > handle_epoch.0 {
3301                                // We're newer, but it's fine to use the
3302                                // handle's old epoch to try and downgrade.
3303                                handle_epoch
3304                            } else {
3305                                // Good luck, buddy! The downgrade below will
3306                                // not succeed. There's a process with a newer
3307                                // epoch out there and someone at some juncture
3308                                // will fence out this process.
3309                                our_epoch
3310                            };
3311                            let new_since = Antichain::new();
3312                            let downgrade = since_handle
3313                                .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3314                                .await;
3315                            if let Err(e) = downgrade {
3316                                warn!("tried to finalize a shard with an advancing epoch: {e:?}");
3317                                return Ok(());
3318                            }
3319                            // Not available now, so finalization is broken.
3320                            // since_handle.expire().await;
3321                        }
3322
3323                        persist_client
3324                            .finalize_shard::<SourceData, (), T, StorageDiff>(
3325                                shard_id,
3326                                Diagnostics::from_purpose("finalizing shards"),
3327                            )
3328                            .await
3329                    };
3330
3331                    match finalize().await {
3332                        Err(e) => {
3333                            // Rather than error, just leave this shard as
3334                            // one to finalize later.
3335                            warn!("error during finalization of shard {shard_id}: {e:?}");
3336                            None
3337                        }
3338                        Ok(()) => {
3339                            debug!(%shard_id, "finalize success!");
3340                            Some(shard_id)
3341                        }
3342                    }
3343                }
3344            })
3345            // Poll each future for each collection concurrently, maximum of 10
3346            // at a time.
3347            // TODO(benesch): the concurrency here should be configurable
3348            // via LaunchDarkly.
3349            .buffer_unordered(10)
3350            // HERE BE DRAGONS: see warning on other uses of buffer_unordered.
3351            // The closure passed to `for_each` must remain fast or we risk
3352            // starving the finalization futures of calls to `poll`.
3353            .for_each(|shard_id| async move {
3354                match shard_id {
3355                    None => metrics.finalization_failed.inc(),
3356                    Some(shard_id) => {
3357                        // We make successfully finalized shards available for
3358                        // removal from the finalization WAL one by one, so that
3359                        // a handful of stuck shards don't prevent us from
3360                        // removing the shards that have made progress. The
3361                        // overhead of repeatedly acquiring and releasing the
3362                        // locks is negligible.
3363                        {
3364                            let mut finalizable_shards = finalizable_shards.lock();
3365                            let mut finalized_shards = finalized_shards.lock();
3366                            finalizable_shards.remove(&shard_id);
3367                            finalized_shards.insert(shard_id);
3368                        }
3369
3370                        metrics.finalization_succeeded.inc();
3371                    }
3372                }
3373            })
3374            .await;
3375
3376        debug!("done finalizing shards");
3377    }
3378}
3379
3380#[derive(Debug)]
3381pub(crate) enum SnapshotStatsAsOf<T: TimelyTimestamp + Lattice + Codec64> {
3382    /// Stats for a shard with an "eager" upper (one that continually advances
3383    /// as time passes, even if no writes are coming in).
3384    Direct(Antichain<T>),
3385    /// Stats for a shard with a "lazy" upper (one that only physically advances
3386    /// in response to writes).
3387    Txns(DataSnapshot<T>),
3388}
3389
3390#[cfg(test)]
3391mod tests {
3392    use std::str::FromStr;
3393    use std::sync::Arc;
3394
3395    use mz_build_info::DUMMY_BUILD_INFO;
3396    use mz_dyncfg::ConfigSet;
3397    use mz_ore::assert_err;
3398    use mz_ore::metrics::{MetricsRegistry, UIntGauge};
3399    use mz_ore::now::SYSTEM_TIME;
3400    use mz_ore::url::SensitiveUrl;
3401    use mz_persist_client::cache::PersistClientCache;
3402    use mz_persist_client::cfg::PersistConfig;
3403    use mz_persist_client::rpc::PubSubClientConnection;
3404    use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
3405    use mz_persist_types::codec_impls::UnitSchema;
3406    use mz_repr::{RelationDesc, Row};
3407    use mz_secrets::InMemorySecretsController;
3408
3409    use super::*;
3410
3411    #[mz_ore::test(tokio::test)]
3412    #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr`
3413    async fn test_snapshot_stats(&self) {
3414        let persist_location = PersistLocation {
3415            blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3416            consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3417        };
3418        let persist_client = PersistClientCache::new(
3419            PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
3420            &MetricsRegistry::new(),
3421            |_, _| PubSubClientConnection::noop(),
3422        );
3423        let persist_client = Arc::new(persist_client);
3424
3425        let (cmds_tx, mut background_task) =
3426            BackgroundTask::new_for_test(persist_location.clone(), Arc::clone(&persist_client));
3427        let background_task =
3428            mz_ore::task::spawn(|| "storage_collections::background_task", async move {
3429                background_task.run().await
3430            });
3431
3432        let persist = persist_client.open(persist_location).await.unwrap();
3433
3434        let shard_id = ShardId::new();
3435        let since_handle = persist
3436            .open_critical_since(
3437                shard_id,
3438                PersistClient::CONTROLLER_CRITICAL_SINCE,
3439                Diagnostics::for_tests(),
3440            )
3441            .await
3442            .unwrap();
3443        let write_handle = persist
3444            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3445                shard_id,
3446                Arc::new(RelationDesc::empty()),
3447                Arc::new(UnitSchema),
3448                Diagnostics::for_tests(),
3449            )
3450            .await
3451            .unwrap();
3452
3453        cmds_tx
3454            .send(BackgroundCmd::Register {
3455                id: GlobalId::User(1),
3456                is_in_txns: false,
3457                since_handle: SinceHandleWrapper::Critical(since_handle),
3458                write_handle,
3459            })
3460            .unwrap();
3461
3462        let mut write_handle = persist
3463            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3464                shard_id,
3465                Arc::new(RelationDesc::empty()),
3466                Arc::new(UnitSchema),
3467                Diagnostics::for_tests(),
3468            )
3469            .await
3470            .unwrap();
3471
3472        // No stats for unknown GlobalId.
3473        let stats =
3474            snapshot_stats(&cmds_tx, GlobalId::User(2), Antichain::from_elem(0.into())).await;
3475        assert_err!(stats);
3476
3477        // Stats don't resolve for as_of past the upper.
3478        let stats_fut = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3479        assert_none!(stats_fut.now_or_never());
3480
3481        // // Call it again because now_or_never consumed our future and it's not clone-able.
3482        let stats_ts1_fut =
3483            snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3484
3485        // Write some data.
3486        let data = (
3487            (SourceData(Ok(Row::default())), ()),
3488            mz_repr::Timestamp::from(0),
3489            1i64,
3490        );
3491        let () = write_handle
3492            .compare_and_append(
3493                &[data],
3494                Antichain::from_elem(0.into()),
3495                Antichain::from_elem(1.into()),
3496            )
3497            .await
3498            .unwrap()
3499            .unwrap();
3500
3501        // Verify that we can resolve stats for ts 0 while the ts 1 stats call is outstanding.
3502        let stats = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(0.into()))
3503            .await
3504            .unwrap();
3505        assert_eq!(stats.num_updates, 1);
3506
3507        // Write more data and unblock the ts 1 call
3508        let data = (
3509            (SourceData(Ok(Row::default())), ()),
3510            mz_repr::Timestamp::from(1),
3511            1i64,
3512        );
3513        let () = write_handle
3514            .compare_and_append(
3515                &[data],
3516                Antichain::from_elem(1.into()),
3517                Antichain::from_elem(2.into()),
3518            )
3519            .await
3520            .unwrap()
3521            .unwrap();
3522
3523        let stats = stats_ts1_fut.await.unwrap();
3524        assert_eq!(stats.num_updates, 2);
3525
3526        // Make sure it runs until at least here.
3527        drop(background_task);
3528    }
3529
3530    async fn snapshot_stats<T: TimelyTimestamp + Lattice + Codec64>(
3531        cmds_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
3532        id: GlobalId,
3533        as_of: Antichain<T>,
3534    ) -> Result<SnapshotStats, StorageError<T>> {
3535        let (tx, rx) = oneshot::channel();
3536        cmds_tx
3537            .send(BackgroundCmd::SnapshotStats(
3538                id,
3539                SnapshotStatsAsOf::Direct(as_of),
3540                tx,
3541            ))
3542            .unwrap();
3543        let res = rx.await.expect("BackgroundTask should be live").0;
3544
3545        res.await
3546    }
3547
3548    impl<T: TimelyTimestamp + Lattice + Codec64> BackgroundTask<T> {
3549        fn new_for_test(
3550            _persist_location: PersistLocation,
3551            _persist_client: Arc<PersistClientCache>,
3552        ) -> (mpsc::UnboundedSender<BackgroundCmd<T>>, Self) {
3553            let (cmds_tx, cmds_rx) = mpsc::unbounded_channel();
3554            let (_holds_tx, holds_rx) = mpsc::unbounded_channel();
3555            let connection_context =
3556                ConnectionContext::for_tests(Arc::new(InMemorySecretsController::new()));
3557
3558            let task = Self {
3559                config: Arc::new(Mutex::new(StorageConfiguration::new(
3560                    connection_context,
3561                    ConfigSet::default(),
3562                ))),
3563                cmds_tx: cmds_tx.clone(),
3564                cmds_rx,
3565                holds_rx,
3566                finalizable_shards: Arc::new(ShardIdSet::new(
3567                    UIntGauge::new("finalizable_shards", "dummy gauge for tests").unwrap(),
3568                )),
3569                collections: Arc::new(Mutex::new(BTreeMap::new())),
3570                shard_by_id: BTreeMap::new(),
3571                since_handles: BTreeMap::new(),
3572                txns_handle: None,
3573                txns_shards: BTreeSet::new(),
3574            };
3575
3576            (cmds_tx, task)
3577        }
3578    }
3579}