mz_storage_client/
storage_collections.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An abstraction for dealing with storage collections.
11
12use std::cmp::Reverse;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::num::NonZeroI64;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use async_trait::async_trait;
20use differential_dataflow::lattice::Lattice;
21use futures::future::BoxFuture;
22use futures::stream::{BoxStream, FuturesUnordered};
23use futures::{Future, FutureExt, StreamExt};
24use itertools::Itertools;
25
26use mz_ore::collections::CollectionExt;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::now::{EpochMillis, NowFn};
29use mz_ore::task::AbortOnDropHandle;
30use mz_ore::{assert_none, instrument, soft_assert_or_log};
31use mz_persist_client::cache::PersistClientCache;
32use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
33use mz_persist_client::critical::SinceHandle;
34use mz_persist_client::read::{Cursor, ReadHandle};
35use mz_persist_client::schema::CaESchema;
36use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
37use mz_persist_client::write::WriteHandle;
38use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
39use mz_persist_types::Codec64;
40use mz_persist_types::codec_impls::UnitSchema;
41use mz_persist_types::txn::TxnsCodec;
42use mz_repr::{GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
43use mz_storage_types::StorageDiff;
44use mz_storage_types::configuration::StorageConfiguration;
45use mz_storage_types::connections::ConnectionContext;
46use mz_storage_types::connections::inline::InlinedConnection;
47use mz_storage_types::controller::{CollectionMetadata, StorageError, TxnsCodecRow};
48use mz_storage_types::dyncfgs::STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION;
49use mz_storage_types::errors::CollectionMissing;
50use mz_storage_types::parameters::StorageParameters;
51use mz_storage_types::read_holds::ReadHold;
52use mz_storage_types::read_policy::ReadPolicy;
53use mz_storage_types::sources::{
54    GenericSourceConnection, SourceData, SourceDesc, SourceEnvelope, SourceExport,
55    SourceExportDataConfig, Timeline,
56};
57use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
58use mz_txn_wal::metrics::Metrics as TxnMetrics;
59use mz_txn_wal::txn_read::{DataSnapshot, TxnsRead};
60use mz_txn_wal::txns::TxnsHandle;
61use timely::PartialOrder;
62use timely::order::TotalOrder;
63use timely::progress::frontier::MutableAntichain;
64use timely::progress::{Antichain, ChangeBatch, Timestamp as TimelyTimestamp};
65use tokio::sync::{mpsc, oneshot};
66use tokio::time::MissedTickBehavior;
67use tracing::{debug, info, trace, warn};
68
69use crate::client::TimestamplessUpdateBuilder;
70use crate::controller::{
71    CollectionDescription, DataSource, PersistEpoch, StorageMetadata, StorageTxn,
72};
73use crate::storage_collections::metrics::{ShardIdSet, StorageCollectionsMetrics};
74
75mod metrics;
76
77/// An abstraction for keeping track of storage collections and managing access
78/// to them.
79///
80/// Responsibilities:
81///
82/// - Keeps a critical persist handle for holding the since of collections
83///   where it need to be.
84///
85/// - Drives the since forward based on the upper of a collection and a
86///   [ReadPolicy].
87///
88/// - Hands out [ReadHolds](ReadHold) that prevent a collection's since from
89/// advancing while it needs to be read at a specific time.
90#[async_trait]
91pub trait StorageCollections: Debug + Sync {
92    type Timestamp: TimelyTimestamp;
93
94    /// On boot, reconcile this [StorageCollections] with outside state. We get
95    /// a [StorageTxn] where we can record any durable state that we need.
96    ///
97    /// We get `init_ids`, which tells us about all collections that currently
98    /// exist, so that we can record durable state for those that _we_ don't
99    /// know yet about.
100    async fn initialize_state(
101        &self,
102        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
103        init_ids: BTreeSet<GlobalId>,
104    ) -> Result<(), StorageError<Self::Timestamp>>;
105
106    /// Update storage configuration with new parameters.
107    fn update_parameters(&self, config_params: StorageParameters);
108
109    /// Returns the [CollectionMetadata] of the collection identified by `id`.
110    fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing>;
111
112    /// Acquire an iterator over [CollectionMetadata] for all active
113    /// collections.
114    ///
115    /// A collection is "active" when it has a non empty frontier of read
116    /// capabilties.
117    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
118
119    /// Returns the frontiers of the identified collection.
120    fn collection_frontiers(
121        &self,
122        id: GlobalId,
123    ) -> Result<CollectionFrontiers<Self::Timestamp>, CollectionMissing> {
124        let frontiers = self
125            .collections_frontiers(vec![id])?
126            .expect_element(|| "known to exist");
127
128        Ok(frontiers)
129    }
130
131    /// Atomically gets and returns the frontiers of all the identified
132    /// collections.
133    fn collections_frontiers(
134        &self,
135        id: Vec<GlobalId>,
136    ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, CollectionMissing>;
137
138    /// Atomically gets and returns the frontiers of all active collections.
139    ///
140    /// A collection is "active" when it has a non-empty frontier of read
141    /// capabilities.
142    fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>>;
143
144    /// Checks whether a collection exists under the given `GlobalId`. Returns
145    /// an error if the collection does not exist.
146    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
147
148    /// Returns aggregate statistics about the contents of the local input named
149    /// `id` at `as_of`.
150    async fn snapshot_stats(
151        &self,
152        id: GlobalId,
153        as_of: Antichain<Self::Timestamp>,
154    ) -> Result<SnapshotStats, StorageError<Self::Timestamp>>;
155
156    /// Returns aggregate statistics about the contents of the local input named
157    /// `id` at `as_of`.
158    ///
159    /// Note that this async function itself returns a future. We may
160    /// need to block on the stats being available, but don't want to hold a reference
161    /// to the controller for too long... so the outer future holds a reference to the
162    /// controller but returns quickly, and the inner future is slow but does not
163    /// reference the controller.
164    async fn snapshot_parts_stats(
165        &self,
166        id: GlobalId,
167        as_of: Antichain<Self::Timestamp>,
168    ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>;
169
170    /// Returns a snapshot of the contents of collection `id` at `as_of`.
171    fn snapshot(
172        &self,
173        id: GlobalId,
174        as_of: Self::Timestamp,
175    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>;
176
177    /// Returns a snapshot of the contents of collection `id` at the largest
178    /// readable `as_of`.
179    async fn snapshot_latest(
180        &self,
181        id: GlobalId,
182    ) -> Result<Vec<Row>, StorageError<Self::Timestamp>>;
183
184    /// Returns a snapshot of the contents of collection `id` at `as_of`.
185    fn snapshot_cursor(
186        &self,
187        id: GlobalId,
188        as_of: Self::Timestamp,
189    ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
190    where
191        Self::Timestamp: Codec64 + TimelyTimestamp + Lattice;
192
193    /// Generates a snapshot of the contents of collection `id` at `as_of` and
194    /// streams out all of the updates in bounded memory.
195    ///
196    /// The output is __not__ consolidated.
197    fn snapshot_and_stream(
198        &self,
199        id: GlobalId,
200        as_of: Self::Timestamp,
201    ) -> BoxFuture<
202        'static,
203        Result<
204            BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
205            StorageError<Self::Timestamp>,
206        >,
207    >;
208
209    /// Create a [`TimestamplessUpdateBuilder`] that can be used to stage
210    /// updates for the provided [`GlobalId`].
211    fn create_update_builder(
212        &self,
213        id: GlobalId,
214    ) -> BoxFuture<
215        'static,
216        Result<
217            TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
218            StorageError<Self::Timestamp>,
219        >,
220    >
221    where
222        Self::Timestamp: Lattice + Codec64;
223
224    /// Update the given [`StorageTxn`] with the appropriate metadata given the
225    /// IDs to add and drop.
226    ///
227    /// The data modified in the `StorageTxn` must be made available in all
228    /// subsequent calls that require [`StorageMetadata`] as a parameter.
229    async fn prepare_state(
230        &self,
231        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
232        ids_to_add: BTreeSet<GlobalId>,
233        ids_to_drop: BTreeSet<GlobalId>,
234        ids_to_register: BTreeMap<GlobalId, ShardId>,
235    ) -> Result<(), StorageError<Self::Timestamp>>;
236
237    /// Create the collections described by the individual
238    /// [CollectionDescriptions](CollectionDescription).
239    ///
240    /// Each command carries the source id, the source description, and any
241    /// associated metadata needed to ingest the particular source.
242    ///
243    /// This command installs collection state for the indicated sources, and
244    /// they are now valid to use in queries at times beyond the initial `since`
245    /// frontiers. Each collection also acquires a read capability at this
246    /// frontier, which will need to be repeatedly downgraded with
247    /// `allow_compaction()` to permit compaction.
248    ///
249    /// This method is NOT idempotent; It can fail between processing of
250    /// different collections and leave the [StorageCollections] in an
251    /// inconsistent state. It is almost always wrong to do anything but abort
252    /// the process on `Err`.
253    ///
254    /// The `register_ts` is used as the initial timestamp that tables are
255    /// available for reads. (We might later give non-tables the same treatment,
256    /// but hold off on that initially.) Callers must provide a Some if any of
257    /// the collections is a table. A None may be given if none of the
258    /// collections are a table (i.e. all materialized views, sources, etc).
259    ///
260    /// `migrated_storage_collections` is a set of migrated storage collections to be excluded
261    /// from the txn-wal sub-system.
262    async fn create_collections_for_bootstrap(
263        &self,
264        storage_metadata: &StorageMetadata,
265        register_ts: Option<Self::Timestamp>,
266        collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
267        migrated_storage_collections: &BTreeSet<GlobalId>,
268    ) -> Result<(), StorageError<Self::Timestamp>>;
269
270    /// Alters the identified ingestion to use the provided [`SourceDesc`].
271    ///
272    /// NOTE: Ideally, [StorageCollections] would not care about these, but we
273    /// have to learn about changes such that when new subsources are created we
274    /// can correctly determine a since based on its depenencies' sinces. This
275    /// is really only relevant because newly created subsources depend on the
276    /// remap shard, and we can't just have them start at since 0.
277    async fn alter_ingestion_source_desc(
278        &self,
279        ingestion_id: GlobalId,
280        source_desc: SourceDesc,
281    ) -> Result<(), StorageError<Self::Timestamp>>;
282
283    /// Alters the data config for the specified source exports of the specified ingestions.
284    async fn alter_ingestion_export_data_configs(
285        &self,
286        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
287    ) -> Result<(), StorageError<Self::Timestamp>>;
288
289    /// Alters each identified collection to use the correlated
290    /// [`GenericSourceConnection`].
291    ///
292    /// See NOTE on [StorageCollections::alter_ingestion_source_desc].
293    async fn alter_ingestion_connections(
294        &self,
295        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
296    ) -> Result<(), StorageError<Self::Timestamp>>;
297
298    /// Updates the [`RelationDesc`] for the specified table.
299    async fn alter_table_desc(
300        &self,
301        existing_collection: GlobalId,
302        new_collection: GlobalId,
303        new_desc: RelationDesc,
304        expected_version: RelationVersion,
305    ) -> Result<(), StorageError<Self::Timestamp>>;
306
307    /// Drops the read capability for the sources and allows their resources to
308    /// be reclaimed.
309    ///
310    /// TODO(jkosh44): This method does not validate the provided identifiers.
311    /// Currently when the controller starts/restarts it has no durable state.
312    /// That means that it has no way of remembering any past commands sent. In
313    /// the future we plan on persisting state for the controller so that it is
314    /// aware of past commands. Therefore this method is for dropping sources
315    /// that we know to have been previously created, but have been forgotten by
316    /// the controller due to a restart. Once command history becomes durable we
317    /// can remove this method and use the normal `drop_sources`.
318    fn drop_collections_unvalidated(
319        &self,
320        storage_metadata: &StorageMetadata,
321        identifiers: Vec<GlobalId>,
322    );
323
324    /// Assigns a read policy to specific identifiers.
325    ///
326    /// The policies are assigned in the order presented, and repeated
327    /// identifiers should conclude with the last policy. Changing a policy will
328    /// immediately downgrade the read capability if appropriate, but it will
329    /// not "recover" the read capability if the prior capability is already
330    /// ahead of it.
331    ///
332    /// This [StorageCollections] may include its own overrides on these
333    /// policies.
334    ///
335    /// Identifiers not present in `policies` retain their existing read
336    /// policies.
337    fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>);
338
339    /// Acquires and returns the earliest possible read holds for the specified
340    /// collections.
341    fn acquire_read_holds(
342        &self,
343        desired_holds: Vec<GlobalId>,
344    ) -> Result<Vec<ReadHold<Self::Timestamp>>, CollectionMissing>;
345
346    /// Get the time dependence for a storage collection. Returns no value if unknown or if
347    /// the object isn't managed by storage.
348    fn determine_time_dependence(
349        &self,
350        id: GlobalId,
351    ) -> Result<Option<TimeDependence>, TimeDependenceError>;
352
353    /// Returns the state of [`StorageCollections`] formatted as JSON.
354    fn dump(&self) -> Result<serde_json::Value, anyhow::Error>;
355}
356
357/// A cursor over a snapshot, allowing us to read just part of a snapshot in its
358/// consolidated form.
359pub struct SnapshotCursor<T: Codec64 + TimelyTimestamp + Lattice> {
360    // We allocate a temporary read handle for each snapshot, and that handle needs to live at
361    // least as long as the cursor itself, which holds part leases. Bundling them together!
362    pub _read_handle: ReadHandle<SourceData, (), T, StorageDiff>,
363    pub cursor: Cursor<SourceData, (), T, StorageDiff>,
364}
365
366impl<T: Codec64 + TimelyTimestamp + Lattice + Sync> SnapshotCursor<T> {
367    pub async fn next(
368        &mut self,
369    ) -> Option<impl Iterator<Item = (SourceData, T, StorageDiff)> + Sized + '_> {
370        let iter = self.cursor.next().await?;
371        Some(iter.map(|((k, ()), t, d)| (k, t, d)))
372    }
373}
374
375/// Frontiers of the collection identified by `id`.
376#[derive(Debug)]
377pub struct CollectionFrontiers<T> {
378    /// The [GlobalId] of the collection that these frontiers belong to.
379    pub id: GlobalId,
380
381    /// The upper/write frontier of the collection.
382    pub write_frontier: Antichain<T>,
383
384    /// The since frontier that is implied by the collection's existence,
385    /// disregarding any read holds.
386    ///
387    /// Concretely, it is the since frontier that is implied by the combination
388    /// of the `write_frontier` and a [ReadPolicy]. The implied capability is
389    /// derived from the write frontier using the [ReadPolicy].
390    pub implied_capability: Antichain<T>,
391
392    /// The frontier of all oustanding [ReadHolds](ReadHold). This includes the
393    /// implied capability.
394    pub read_capabilities: Antichain<T>,
395}
396
397/// Implementation of [StorageCollections] that is shallow-cloneable and uses a
398/// background task for doing work concurrently, in the background.
399#[derive(Debug, Clone)]
400pub struct StorageCollectionsImpl<
401    T: TimelyTimestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
402> {
403    /// The fencing token for this instance of [StorageCollections], and really
404    /// all of the controllers and Coordinator.
405    envd_epoch: NonZeroI64,
406
407    /// Whether or not this [StorageCollections] is in read-only mode.
408    ///
409    /// When in read-only mode, we are not allowed to affect changes to external
410    /// systems, including, for example, acquiring and downgrading critical
411    /// [SinceHandles](SinceHandle)
412    read_only: bool,
413
414    /// The set of [ShardIds](ShardId) that we have to finalize. These will have
415    /// been persisted by the caller of [StorageCollections::prepare_state].
416    finalizable_shards: Arc<ShardIdSet>,
417
418    /// The set of [ShardIds](ShardId) that we have finalized. We keep track of
419    /// shards here until we are given a chance to let our callers know that
420    /// these have been finalized, for example via
421    /// [StorageCollections::prepare_state].
422    finalized_shards: Arc<ShardIdSet>,
423
424    /// Collections maintained by this [StorageCollections].
425    collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
426
427    /// A shared TxnsCache running in a task and communicated with over a channel.
428    txns_read: TxnsRead<T>,
429
430    /// Storage configuration parameters.
431    config: Arc<Mutex<StorageConfiguration>>,
432
433    /// The upper of the txn shard as it was when we booted. We forward the
434    /// upper of created/registered tables to make sure that their uppers are at
435    /// least not less than the initially known txn upper.
436    ///
437    /// NOTE: This works around a quirk in how the adapter chooses the as_of of
438    /// existing indexes when bootstrapping, where tables that have an upper
439    /// that is less than the initially known txn upper can lead to indexes that
440    /// cannot hydrate in read-only mode.
441    initial_txn_upper: Antichain<T>,
442
443    /// The persist location where all storage collections are being written to
444    persist_location: PersistLocation,
445
446    /// A persist client used to write to storage collections
447    persist: Arc<PersistClientCache>,
448
449    /// For sending commands to our internal task.
450    cmd_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
451
452    /// For sending updates about read holds to our internal task.
453    holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<T>)>,
454
455    /// Handles to tasks we own, making sure they're dropped when we are.
456    _background_task: Arc<AbortOnDropHandle<()>>,
457    _finalize_shards_task: Arc<AbortOnDropHandle<()>>,
458}
459
460// Supporting methods for implementing [StorageCollections].
461//
462// Almost all internal methods that are the backing implementation for a trait
463// method have the `_inner` suffix.
464//
465// We follow a pattern where `_inner` methods get a mutable reference to the
466// shared collections state, and it's the public-facing method that locks the
467// state for the duration of its invocation. This allows calling other `_inner`
468// methods from within `_inner` methods.
469impl<T> StorageCollectionsImpl<T>
470where
471    T: TimelyTimestamp
472        + Lattice
473        + Codec64
474        + From<EpochMillis>
475        + TimestampManipulation
476        + Into<mz_repr::Timestamp>
477        + Sync,
478{
479    /// Creates and returns a new [StorageCollections].
480    ///
481    /// Note that when creating a new [StorageCollections], you must also
482    /// reconcile it with the previous state using
483    /// [StorageCollections::initialize_state],
484    /// [StorageCollections::prepare_state], and
485    /// [StorageCollections::create_collections_for_bootstrap].
486    pub async fn new(
487        persist_location: PersistLocation,
488        persist_clients: Arc<PersistClientCache>,
489        metrics_registry: &MetricsRegistry,
490        _now: NowFn,
491        txns_metrics: Arc<TxnMetrics>,
492        envd_epoch: NonZeroI64,
493        read_only: bool,
494        connection_context: ConnectionContext,
495        txn: &dyn StorageTxn<T>,
496    ) -> Self {
497        let metrics = StorageCollectionsMetrics::register_into(metrics_registry);
498
499        // This value must be already installed because we must ensure it's
500        // durably recorded before it is used, otherwise we risk leaking persist
501        // state.
502        let txns_id = txn
503            .get_txn_wal_shard()
504            .expect("must call prepare initialization before creating StorageCollections");
505
506        let txns_client = persist_clients
507            .open(persist_location.clone())
508            .await
509            .expect("location should be valid");
510
511        // We have to initialize, so that TxnsRead::start() below does not
512        // block.
513        let _txns_handle: TxnsHandle<SourceData, (), T, StorageDiff, PersistEpoch, TxnsCodecRow> =
514            TxnsHandle::open(
515                T::minimum(),
516                txns_client.clone(),
517                txns_client.dyncfgs().clone(),
518                Arc::clone(&txns_metrics),
519                txns_id,
520            )
521            .await;
522
523        // For handing to the background task, for listening to upper updates.
524        let (txns_key_schema, txns_val_schema) = TxnsCodecRow::schemas();
525        let mut txns_write = txns_client
526            .open_writer(
527                txns_id,
528                Arc::new(txns_key_schema),
529                Arc::new(txns_val_schema),
530                Diagnostics {
531                    shard_name: "txns".to_owned(),
532                    handle_purpose: "commit txns".to_owned(),
533                },
534            )
535            .await
536            .expect("txns schema shouldn't change");
537
538        let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
539
540        let collections = Arc::new(std::sync::Mutex::new(BTreeMap::default()));
541        let finalizable_shards =
542            Arc::new(ShardIdSet::new(metrics.finalization_outstanding.clone()));
543        let finalized_shards =
544            Arc::new(ShardIdSet::new(metrics.finalization_pending_commit.clone()));
545        let config = Arc::new(Mutex::new(StorageConfiguration::new(
546            connection_context,
547            mz_dyncfgs::all_dyncfgs(),
548        )));
549
550        let initial_txn_upper = txns_write.fetch_recent_upper().await.to_owned();
551
552        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
553        let (holds_tx, holds_rx) = mpsc::unbounded_channel();
554        let mut background_task = BackgroundTask {
555            config: Arc::clone(&config),
556            cmds_tx: cmd_tx.clone(),
557            cmds_rx: cmd_rx,
558            holds_rx,
559            collections: Arc::clone(&collections),
560            finalizable_shards: Arc::clone(&finalizable_shards),
561            shard_by_id: BTreeMap::new(),
562            since_handles: BTreeMap::new(),
563            txns_handle: Some(txns_write),
564            txns_shards: Default::default(),
565        };
566
567        let background_task =
568            mz_ore::task::spawn(|| "storage_collections::background_task", async move {
569                background_task.run().await
570            });
571
572        let finalize_shards_task = mz_ore::task::spawn(
573            || "storage_collections::finalize_shards_task",
574            finalize_shards_task::<T>(FinalizeShardsTaskConfig {
575                envd_epoch: envd_epoch.clone(),
576                config: Arc::clone(&config),
577                metrics,
578                finalizable_shards: Arc::clone(&finalizable_shards),
579                finalized_shards: Arc::clone(&finalized_shards),
580                persist_location: persist_location.clone(),
581                persist: Arc::clone(&persist_clients),
582                read_only,
583            }),
584        );
585
586        Self {
587            finalizable_shards,
588            finalized_shards,
589            collections,
590            txns_read,
591            envd_epoch,
592            read_only,
593            config,
594            initial_txn_upper,
595            persist_location,
596            persist: persist_clients,
597            cmd_tx,
598            holds_tx,
599            _background_task: Arc::new(background_task.abort_on_drop()),
600            _finalize_shards_task: Arc::new(finalize_shards_task.abort_on_drop()),
601        }
602    }
603
604    /// Opens a [WriteHandle] and a [SinceHandleWrapper], for holding back the since.
605    ///
606    /// `since` is an optional since that the read handle will be forwarded to
607    /// if it is less than its current since.
608    ///
609    /// This will `halt!` the process if we cannot successfully acquire a
610    /// critical handle with our current epoch.
611    async fn open_data_handles(
612        &self,
613        id: &GlobalId,
614        shard: ShardId,
615        since: Option<&Antichain<T>>,
616        relation_desc: RelationDesc,
617        persist_client: &PersistClient,
618    ) -> (
619        WriteHandle<SourceData, (), T, StorageDiff>,
620        SinceHandleWrapper<T>,
621    ) {
622        let since_handle = if self.read_only {
623            let read_handle = self
624                .open_leased_handle(id, shard, relation_desc.clone(), since, persist_client)
625                .await;
626            SinceHandleWrapper::Leased(read_handle)
627        } else {
628            // We're managing the data for this shard in read-write mode, which would fence out other
629            // processes in read-only mode; it's safe to upgrade the metadata version.
630            persist_client
631                .upgrade_version::<SourceData, (), T, StorageDiff>(
632                    shard,
633                    Diagnostics {
634                        shard_name: id.to_string(),
635                        handle_purpose: format!("controller data for {}", id),
636                    },
637                )
638                .await
639                .expect("invalid persist usage");
640
641            let since_handle = self
642                .open_critical_handle(id, shard, since, persist_client)
643                .await;
644
645            SinceHandleWrapper::Critical(since_handle)
646        };
647
648        let mut write_handle = self
649            .open_write_handle(id, shard, relation_desc, persist_client)
650            .await;
651
652        // N.B.
653        // Fetch the most recent upper for the write handle. Otherwise, this may
654        // be behind the since of the since handle. Its vital this happens AFTER
655        // we create the since handle as it needs to be linearized with that
656        // operation. It may be true that creating the write handle after the
657        // since handle already ensures this, but we do this out of an abundance
658        // of caution.
659        //
660        // Note that this returns the upper, but also sets it on the handle to
661        // be fetched later.
662        write_handle.fetch_recent_upper().await;
663
664        (write_handle, since_handle)
665    }
666
667    /// Opens a write handle for the given `shard`.
668    async fn open_write_handle(
669        &self,
670        id: &GlobalId,
671        shard: ShardId,
672        relation_desc: RelationDesc,
673        persist_client: &PersistClient,
674    ) -> WriteHandle<SourceData, (), T, StorageDiff> {
675        let diagnostics = Diagnostics {
676            shard_name: id.to_string(),
677            handle_purpose: format!("controller data for {}", id),
678        };
679
680        let write = persist_client
681            .open_writer(
682                shard,
683                Arc::new(relation_desc),
684                Arc::new(UnitSchema),
685                diagnostics.clone(),
686            )
687            .await
688            .expect("invalid persist usage");
689
690        write
691    }
692
693    /// Opens a critical since handle for the given `shard`.
694    ///
695    /// `since` is an optional since that the read handle will be forwarded to
696    /// if it is less than its current since.
697    ///
698    /// This will `halt!` the process if we cannot successfully acquire a
699    /// critical handle with our current epoch.
700    async fn open_critical_handle(
701        &self,
702        id: &GlobalId,
703        shard: ShardId,
704        since: Option<&Antichain<T>>,
705        persist_client: &PersistClient,
706    ) -> SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch> {
707        tracing::debug!(%id, ?since, "opening critical handle");
708
709        assert!(
710            !self.read_only,
711            "attempting to open critical SinceHandle in read-only mode"
712        );
713
714        let diagnostics = Diagnostics {
715            shard_name: id.to_string(),
716            handle_purpose: format!("controller data for {}", id),
717        };
718
719        // Construct the handle in a separate block to ensure all error paths
720        // are diverging
721        let since_handle = {
722            // This block's aim is to ensure the handle is in terms of our epoch
723            // by the time we return it.
724            let mut handle: SinceHandle<_, _, _, _, PersistEpoch> = persist_client
725                .open_critical_since(
726                    shard,
727                    PersistClient::CONTROLLER_CRITICAL_SINCE,
728                    diagnostics.clone(),
729                )
730                .await
731                .expect("invalid persist usage");
732
733            // Take the join of the handle's since and the provided `since`;
734            // this lets materialized views express the since at which their
735            // read handles "start."
736            let provided_since = match since {
737                Some(since) => since,
738                None => &Antichain::from_elem(T::minimum()),
739            };
740            let since = handle.since().join(provided_since);
741
742            let our_epoch = self.envd_epoch;
743
744            loop {
745                let current_epoch: PersistEpoch = handle.opaque().clone();
746
747                // Ensure the current epoch is <= our epoch.
748                let unchecked_success = current_epoch.0.map(|e| e <= our_epoch).unwrap_or(true);
749
750                if unchecked_success {
751                    // Update the handle's state so that it is in terms of our
752                    // epoch.
753                    let checked_success = handle
754                        .compare_and_downgrade_since(
755                            &current_epoch,
756                            (&PersistEpoch::from(our_epoch), &since),
757                        )
758                        .await
759                        .is_ok();
760                    if checked_success {
761                        break handle;
762                    }
763                } else {
764                    mz_ore::halt!("fenced by envd @ {current_epoch:?}. ours = {our_epoch}");
765                }
766            }
767        };
768
769        since_handle
770    }
771
772    /// Opens a leased [ReadHandle], for the purpose of holding back a since,
773    /// for the given `shard`.
774    ///
775    /// `since` is an optional since that the read handle will be forwarded to
776    /// if it is less than its current since.
777    async fn open_leased_handle(
778        &self,
779        id: &GlobalId,
780        shard: ShardId,
781        relation_desc: RelationDesc,
782        since: Option<&Antichain<T>>,
783        persist_client: &PersistClient,
784    ) -> ReadHandle<SourceData, (), T, StorageDiff> {
785        tracing::debug!(%id, ?since, "opening leased handle");
786
787        let diagnostics = Diagnostics {
788            shard_name: id.to_string(),
789            handle_purpose: format!("controller data for {}", id),
790        };
791
792        let use_critical_since = false;
793        let mut handle: ReadHandle<_, _, _, _> = persist_client
794            .open_leased_reader(
795                shard,
796                Arc::new(relation_desc),
797                Arc::new(UnitSchema),
798                diagnostics.clone(),
799                use_critical_since,
800            )
801            .await
802            .expect("invalid persist usage");
803
804        // Take the join of the handle's since and the provided `since`;
805        // this lets materialized views express the since at which their
806        // read handles "start."
807        let provided_since = match since {
808            Some(since) => since,
809            None => &Antichain::from_elem(T::minimum()),
810        };
811        let since = handle.since().join(provided_since);
812
813        handle.downgrade_since(&since).await;
814
815        handle
816    }
817
818    fn register_handles(
819        &self,
820        id: GlobalId,
821        is_in_txns: bool,
822        since_handle: SinceHandleWrapper<T>,
823        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
824    ) {
825        self.send(BackgroundCmd::Register {
826            id,
827            is_in_txns,
828            since_handle,
829            write_handle,
830        });
831    }
832
833    fn send(&self, cmd: BackgroundCmd<T>) {
834        let _ = self.cmd_tx.send(cmd);
835    }
836
837    async fn snapshot_stats_inner(
838        &self,
839        id: GlobalId,
840        as_of: SnapshotStatsAsOf<T>,
841    ) -> Result<SnapshotStats, StorageError<T>> {
842        // TODO: Pull this out of BackgroundTask. Unlike the other methods, the
843        // caller of this one drives it to completion.
844        //
845        // We'd need to either share the critical handle somehow or maybe have
846        // two instances around, one in the worker and one in the
847        // StorageCollections.
848        let (tx, rx) = oneshot::channel();
849        self.send(BackgroundCmd::SnapshotStats(id, as_of, tx));
850        rx.await.expect("BackgroundTask should be live").0.await
851    }
852
853    /// If this identified collection has a dependency, install a read hold on
854    /// it.
855    ///
856    /// This is necessary to ensure that the dependency's since does not advance
857    /// beyond its dependents'.
858    fn install_collection_dependency_read_holds_inner(
859        &self,
860        self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
861        id: GlobalId,
862    ) -> Result<(), StorageError<T>> {
863        let (deps, collection_implied_capability) = match self_collections.get(&id) {
864            Some(CollectionState {
865                storage_dependencies: deps,
866                implied_capability,
867                ..
868            }) => (deps.clone(), implied_capability),
869            _ => return Ok(()),
870        };
871
872        for dep in deps.iter() {
873            let dep_collection = self_collections
874                .get(dep)
875                .ok_or(StorageError::IdentifierMissing(id))?;
876
877            mz_ore::soft_assert_or_log!(
878                PartialOrder::less_equal(
879                    &dep_collection.implied_capability,
880                    collection_implied_capability
881                ),
882                "dependency since ({dep}@{:?}) cannot be in advance of dependent's since ({id}@{:?})",
883                dep_collection.implied_capability,
884                collection_implied_capability,
885            );
886        }
887
888        self.install_read_capabilities_inner(
889            self_collections,
890            id,
891            &deps,
892            collection_implied_capability.clone(),
893        )?;
894
895        Ok(())
896    }
897
898    /// Returns the given collection's dependencies.
899    fn determine_collection_dependencies(
900        &self,
901        self_collections: &BTreeMap<GlobalId, CollectionState<T>>,
902        source_id: GlobalId,
903        collection_desc: &CollectionDescription<T>,
904    ) -> Result<Vec<GlobalId>, StorageError<T>> {
905        let mut dependencies = Vec::new();
906
907        if let Some(id) = collection_desc.primary {
908            dependencies.push(id);
909        }
910
911        match &collection_desc.data_source {
912            DataSource::Introspection(_)
913            | DataSource::Webhook
914            | DataSource::Table
915            | DataSource::Progress
916            | DataSource::Other => (),
917            DataSource::IngestionExport {
918                ingestion_id,
919                data_config,
920                ..
921            } => {
922                // Ingestion exports depend on their primary source's remap
923                // collection, except when they use a CDCv2 envelope.
924                let source = self_collections
925                    .get(ingestion_id)
926                    .ok_or(StorageError::IdentifierMissing(*ingestion_id))?;
927                let DataSource::Ingestion(ingestion) = &source.description.data_source else {
928                    panic!("SourceExport must refer to a primary source that already exists");
929                };
930
931                match data_config.envelope {
932                    SourceEnvelope::CdcV2 => (),
933                    _ => dependencies.push(ingestion.remap_collection_id),
934                }
935            }
936            // Ingestions depend on their remap collection.
937            DataSource::Ingestion(ingestion) => {
938                if ingestion.remap_collection_id != source_id {
939                    dependencies.push(ingestion.remap_collection_id);
940                }
941            }
942            DataSource::Sink { desc } => dependencies.push(desc.sink.from),
943        }
944
945        Ok(dependencies)
946    }
947
948    /// Install read capabilities on the given `storage_dependencies`.
949    #[instrument(level = "debug")]
950    fn install_read_capabilities_inner(
951        &self,
952        self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
953        from_id: GlobalId,
954        storage_dependencies: &[GlobalId],
955        read_capability: Antichain<T>,
956    ) -> Result<(), StorageError<T>> {
957        let mut changes = ChangeBatch::new();
958        for time in read_capability.iter() {
959            changes.update(time.clone(), 1);
960        }
961
962        if tracing::span_enabled!(tracing::Level::TRACE) {
963            // Collecting `user_capabilities` is potentially slow, thus only do it when needed.
964            let user_capabilities = self_collections
965                .iter_mut()
966                .filter(|(id, _c)| id.is_user())
967                .map(|(id, c)| {
968                    let updates = c.read_capabilities.updates().cloned().collect_vec();
969                    (*id, c.implied_capability.clone(), updates)
970                })
971                .collect_vec();
972
973            trace!(
974                %from_id,
975                ?storage_dependencies,
976                ?read_capability,
977                ?user_capabilities,
978                "install_read_capabilities_inner");
979        }
980
981        let mut storage_read_updates = storage_dependencies
982            .iter()
983            .map(|id| (*id, changes.clone()))
984            .collect();
985
986        StorageCollectionsImpl::update_read_capabilities_inner(
987            &self.cmd_tx,
988            self_collections,
989            &mut storage_read_updates,
990        );
991
992        if tracing::span_enabled!(tracing::Level::TRACE) {
993            // Collecting `user_capabilities` is potentially slow, thus only do it when needed.
994            let user_capabilities = self_collections
995                .iter_mut()
996                .filter(|(id, _c)| id.is_user())
997                .map(|(id, c)| {
998                    let updates = c.read_capabilities.updates().cloned().collect_vec();
999                    (*id, c.implied_capability.clone(), updates)
1000                })
1001                .collect_vec();
1002
1003            trace!(
1004                %from_id,
1005                ?storage_dependencies,
1006                ?read_capability,
1007                ?user_capabilities,
1008                "after install_read_capabilities_inner!");
1009        }
1010
1011        Ok(())
1012    }
1013
1014    async fn recent_upper(&self, id: GlobalId) -> Result<Antichain<T>, StorageError<T>> {
1015        let metadata = &self.collection_metadata(id)?;
1016        let persist_client = self
1017            .persist
1018            .open(metadata.persist_location.clone())
1019            .await
1020            .unwrap();
1021        // Duplicate part of open_data_handles here because we don't need the
1022        // fetch_recent_upper call. The pubsub-updated shared_upper is enough.
1023        let diagnostics = Diagnostics {
1024            shard_name: id.to_string(),
1025            handle_purpose: format!("controller data for {}", id),
1026        };
1027        // NB: Opening a WriteHandle is cheap if it's never used in a
1028        // compare_and_append operation.
1029        let write = persist_client
1030            .open_writer::<SourceData, (), T, StorageDiff>(
1031                metadata.data_shard,
1032                Arc::new(metadata.relation_desc.clone()),
1033                Arc::new(UnitSchema),
1034                diagnostics.clone(),
1035            )
1036            .await
1037            .expect("invalid persist usage");
1038        Ok(write.shared_upper())
1039    }
1040
1041    async fn read_handle_for_snapshot(
1042        persist: Arc<PersistClientCache>,
1043        metadata: &CollectionMetadata,
1044        id: GlobalId,
1045    ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
1046        let persist_client = persist
1047            .open(metadata.persist_location.clone())
1048            .await
1049            .unwrap();
1050
1051        // We create a new read handle every time someone requests a snapshot
1052        // and then immediately expire it instead of keeping a read handle
1053        // permanently in our state to avoid having it heartbeat continually.
1054        // The assumption is that calls to snapshot are rare and therefore worth
1055        // it to always create a new handle.
1056        let read_handle = persist_client
1057            .open_leased_reader::<SourceData, (), _, _>(
1058                metadata.data_shard,
1059                Arc::new(metadata.relation_desc.clone()),
1060                Arc::new(UnitSchema),
1061                Diagnostics {
1062                    shard_name: id.to_string(),
1063                    handle_purpose: format!("snapshot {}", id),
1064                },
1065                USE_CRITICAL_SINCE_SNAPSHOT.get(&persist.cfg),
1066            )
1067            .await
1068            .expect("invalid persist usage");
1069        Ok(read_handle)
1070    }
1071
1072    // TODO(petrosagg): This signature is not very useful in the context of partially ordered times
1073    // where the as_of frontier might have multiple elements. In the current form the mutually
1074    // incomparable updates will be accumulated together to a state of the collection that never
1075    // actually existed. We should include the original time in the updates advanced by the as_of
1076    // frontier in the result and let the caller decide what to do with the information.
1077    fn snapshot(
1078        &self,
1079        id: GlobalId,
1080        as_of: T,
1081        txns_read: &TxnsRead<T>,
1082    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<T>>>
1083    where
1084        T: Codec64 + From<EpochMillis> + TimestampManipulation,
1085    {
1086        let metadata = match self.collection_metadata(id) {
1087            Ok(metadata) => metadata.clone(),
1088            Err(e) => return async { Err(e.into()) }.boxed(),
1089        };
1090        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1091            assert_eq!(txns_id, txns_read.txns_id());
1092            txns_read.clone()
1093        });
1094        let persist = Arc::clone(&self.persist);
1095        async move {
1096            let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1097            let contents = match txns_read {
1098                None => {
1099                    // We're not using txn-wal for tables, so we can take a snapshot directly.
1100                    read_handle
1101                        .snapshot_and_fetch(Antichain::from_elem(as_of))
1102                        .await
1103                }
1104                Some(txns_read) => {
1105                    // We _are_ using txn-wal for tables. It advances the physical upper of the
1106                    // shard lazily, so we need to ask it for the snapshot to ensure the read is
1107                    // unblocked.
1108                    //
1109                    // Consider the following scenario:
1110                    // - Table A is written to via txns at time 5
1111                    // - Tables other than A are written to via txns consuming timestamps up to 10
1112                    // - We'd like to read A at 7
1113                    // - The application process of A's txn has advanced the upper to 5+1, but we need
1114                    //   it to be past 7, but the txns shard knows that (5,10) is empty of writes to A
1115                    // - This branch allows it to handle that advancing the physical upper of Table A to
1116                    //   10 (NB but only once we see it get past the write at 5!)
1117                    // - Then we can read it normally.
1118                    txns_read.update_gt(as_of.clone()).await;
1119                    let data_snapshot = txns_read
1120                        .data_snapshot(metadata.data_shard, as_of.clone())
1121                        .await;
1122                    data_snapshot.snapshot_and_fetch(&mut read_handle).await
1123                }
1124            };
1125            match contents {
1126                Ok(contents) => {
1127                    let mut snapshot = Vec::with_capacity(contents.len());
1128                    for ((data, _), _, diff) in contents {
1129                        // TODO(petrosagg): We should accumulate the errors too and let the user
1130                        // interpret the result
1131                        let row = data.0?;
1132                        snapshot.push((row, diff));
1133                    }
1134                    Ok(snapshot)
1135                }
1136                Err(_) => Err(StorageError::ReadBeforeSince(id)),
1137            }
1138        }
1139        .boxed()
1140    }
1141
1142    fn snapshot_and_stream(
1143        &self,
1144        id: GlobalId,
1145        as_of: T,
1146        txns_read: &TxnsRead<T>,
1147    ) -> BoxFuture<'static, Result<BoxStream<'static, (SourceData, T, StorageDiff)>, StorageError<T>>>
1148    {
1149        use futures::stream::StreamExt;
1150
1151        let metadata = match self.collection_metadata(id) {
1152            Ok(metadata) => metadata.clone(),
1153            Err(e) => return async { Err(e.into()) }.boxed(),
1154        };
1155        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1156            assert_eq!(txns_id, txns_read.txns_id());
1157            txns_read.clone()
1158        });
1159        let persist = Arc::clone(&self.persist);
1160
1161        async move {
1162            let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1163            let stream = match txns_read {
1164                None => {
1165                    // We're not using txn-wal for tables, so we can take a snapshot directly.
1166                    read_handle
1167                        .snapshot_and_stream(Antichain::from_elem(as_of))
1168                        .await
1169                        .map_err(|_| StorageError::ReadBeforeSince(id))?
1170                        .boxed()
1171                }
1172                Some(txns_read) => {
1173                    txns_read.update_gt(as_of.clone()).await;
1174                    let data_snapshot = txns_read
1175                        .data_snapshot(metadata.data_shard, as_of.clone())
1176                        .await;
1177                    data_snapshot
1178                        .snapshot_and_stream(&mut read_handle)
1179                        .await
1180                        .map_err(|_| StorageError::ReadBeforeSince(id))?
1181                        .boxed()
1182                }
1183            };
1184
1185            // Map our stream, unwrapping Persist internal errors.
1186            let stream = stream.map(|((data, _v), t, d)| (data, t, d)).boxed();
1187            Ok(stream)
1188        }
1189        .boxed()
1190    }
1191
1192    fn set_read_policies_inner(
1193        &self,
1194        collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1195        policies: Vec<(GlobalId, ReadPolicy<T>)>,
1196    ) {
1197        trace!("set_read_policies: {:?}", policies);
1198
1199        let mut read_capability_changes = BTreeMap::default();
1200
1201        for (id, policy) in policies.into_iter() {
1202            let collection = match collections.get_mut(&id) {
1203                Some(c) => c,
1204                None => {
1205                    panic!("Reference to absent collection {id}");
1206                }
1207            };
1208
1209            let mut new_read_capability = policy.frontier(collection.write_frontier.borrow());
1210
1211            if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
1212                let mut update = ChangeBatch::new();
1213                update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
1214                std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
1215                update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
1216                if !update.is_empty() {
1217                    read_capability_changes.insert(id, update);
1218                }
1219            }
1220
1221            collection.read_policy = policy;
1222        }
1223
1224        for (id, changes) in read_capability_changes.iter() {
1225            if id.is_user() {
1226                trace!(%id, ?changes, "in set_read_policies, capability changes");
1227            }
1228        }
1229
1230        if !read_capability_changes.is_empty() {
1231            StorageCollectionsImpl::update_read_capabilities_inner(
1232                &self.cmd_tx,
1233                collections,
1234                &mut read_capability_changes,
1235            );
1236        }
1237    }
1238
1239    // This is not an associated function so that we can share it with the task
1240    // that updates the persist handles and also has a reference to the shared
1241    // collections state.
1242    fn update_read_capabilities_inner(
1243        cmd_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
1244        collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1245        updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>,
1246    ) {
1247        // Location to record consequences that we need to act on.
1248        let mut collections_net = BTreeMap::new();
1249
1250        // We must not rely on any specific relative ordering of `GlobalId`s.
1251        // That said, it is reasonable to assume that collections generally have
1252        // greater IDs than their dependencies, so starting with the largest is
1253        // a useful optimization.
1254        while let Some(id) = updates.keys().rev().next().cloned() {
1255            let mut update = updates.remove(&id).unwrap();
1256
1257            if id.is_user() {
1258                trace!(id = ?id, update = ?update, "update_read_capabilities");
1259            }
1260
1261            let collection = if let Some(c) = collections.get_mut(&id) {
1262                c
1263            } else {
1264                let has_positive_updates = update.iter().any(|(_ts, diff)| *diff > 0);
1265                if has_positive_updates {
1266                    panic!(
1267                        "reference to absent collection {id} but we have positive updates: {:?}",
1268                        update
1269                    );
1270                } else {
1271                    // Continue purely negative updates. Someone has probably
1272                    // already dropped this collection!
1273                    continue;
1274                }
1275            };
1276
1277            let current_read_capabilities = collection.read_capabilities.frontier().to_owned();
1278            for (time, diff) in update.iter() {
1279                assert!(
1280                    collection.read_capabilities.count_for(time) + diff >= 0,
1281                    "update {:?} for collection {id} would lead to negative \
1282                        read capabilities, read capabilities before applying: {:?}",
1283                    update,
1284                    collection.read_capabilities
1285                );
1286
1287                if collection.read_capabilities.count_for(time) + diff > 0 {
1288                    assert!(
1289                        current_read_capabilities.less_equal(time),
1290                        "update {:?} for collection {id} is trying to \
1291                            install read capabilities before the current \
1292                            frontier of read capabilities, read capabilities before applying: {:?}",
1293                        update,
1294                        collection.read_capabilities
1295                    );
1296                }
1297            }
1298
1299            let changes = collection.read_capabilities.update_iter(update.drain());
1300            update.extend(changes);
1301
1302            if id.is_user() {
1303                trace!(
1304                %id,
1305                ?collection.storage_dependencies,
1306                ?update,
1307                "forwarding update to storage dependencies");
1308            }
1309
1310            for id in collection.storage_dependencies.iter() {
1311                updates
1312                    .entry(*id)
1313                    .or_insert_with(ChangeBatch::new)
1314                    .extend(update.iter().cloned());
1315            }
1316
1317            let (changes, frontier) = collections_net
1318                .entry(id)
1319                .or_insert_with(|| (<ChangeBatch<_>>::new(), Antichain::new()));
1320
1321            changes.extend(update.drain());
1322            *frontier = collection.read_capabilities.frontier().to_owned();
1323        }
1324
1325        // Translate our net compute actions into downgrades of persist sinces.
1326        // The actual downgrades are performed by a Tokio task asynchronously.
1327        let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
1328        for (key, (mut changes, frontier)) in collections_net {
1329            if !changes.is_empty() {
1330                // If the collection has a "primary" collection, let that primary drive compaction.
1331                let collection = collections.get(&key).expect("must still exist");
1332                let should_emit_persist_compaction = collection.description.primary.is_none();
1333
1334                if frontier.is_empty() {
1335                    info!(id = %key, "removing collection state because the since advanced to []!");
1336                    collections.remove(&key).expect("must still exist");
1337                }
1338
1339                if should_emit_persist_compaction {
1340                    persist_compaction_commands.push((key, frontier));
1341                }
1342            }
1343        }
1344
1345        if !persist_compaction_commands.is_empty() {
1346            cmd_tx
1347                .send(BackgroundCmd::DowngradeSince(persist_compaction_commands))
1348                .expect("cannot fail to send");
1349        }
1350    }
1351
1352    /// Remove any shards that we know are finalized
1353    fn synchronize_finalized_shards(&self, storage_metadata: &StorageMetadata) {
1354        self.finalized_shards
1355            .lock()
1356            .retain(|shard| storage_metadata.unfinalized_shards.contains(shard));
1357    }
1358}
1359
1360// See comments on the above impl for StorageCollectionsImpl.
1361#[async_trait]
1362impl<T> StorageCollections for StorageCollectionsImpl<T>
1363where
1364    T: TimelyTimestamp
1365        + Lattice
1366        + Codec64
1367        + From<EpochMillis>
1368        + TimestampManipulation
1369        + Into<mz_repr::Timestamp>
1370        + Sync,
1371{
1372    type Timestamp = T;
1373
1374    async fn initialize_state(
1375        &self,
1376        txn: &mut (dyn StorageTxn<T> + Send),
1377        init_ids: BTreeSet<GlobalId>,
1378    ) -> Result<(), StorageError<T>> {
1379        let metadata = txn.get_collection_metadata();
1380        let existing_metadata: BTreeSet<_> = metadata.into_iter().map(|(id, _)| id).collect();
1381
1382        // Determine which collections we do not yet have metadata for.
1383        let new_collections: BTreeSet<GlobalId> =
1384            init_ids.difference(&existing_metadata).cloned().collect();
1385
1386        self.prepare_state(
1387            txn,
1388            new_collections,
1389            BTreeSet::default(),
1390            BTreeMap::default(),
1391        )
1392        .await?;
1393
1394        // All shards that belong to collections dropped in the last epoch are
1395        // eligible for finalization.
1396        //
1397        // n.b. this introduces an unlikely race condition: if a collection is
1398        // dropped from the catalog, but the dataflow is still running on a
1399        // worker, assuming the shard is safe to finalize on reboot may cause
1400        // the cluster to panic.
1401        let unfinalized_shards = txn.get_unfinalized_shards().into_iter().collect_vec();
1402
1403        info!(?unfinalized_shards, "initializing finalizable_shards");
1404
1405        self.finalizable_shards.lock().extend(unfinalized_shards);
1406
1407        Ok(())
1408    }
1409
1410    fn update_parameters(&self, config_params: StorageParameters) {
1411        // We serialize the dyncfg updates in StorageParameters, but configure
1412        // persist separately.
1413        config_params.dyncfg_updates.apply(self.persist.cfg());
1414
1415        self.config
1416            .lock()
1417            .expect("lock poisoned")
1418            .update(config_params);
1419    }
1420
1421    fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing> {
1422        let collections = self.collections.lock().expect("lock poisoned");
1423
1424        collections
1425            .get(&id)
1426            .map(|c| c.collection_metadata.clone())
1427            .ok_or(CollectionMissing(id))
1428    }
1429
1430    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
1431        let collections = self.collections.lock().expect("lock poisoned");
1432
1433        collections
1434            .iter()
1435            .filter(|(_id, c)| !c.is_dropped())
1436            .map(|(id, c)| (*id, c.collection_metadata.clone()))
1437            .collect()
1438    }
1439
1440    fn collections_frontiers(
1441        &self,
1442        ids: Vec<GlobalId>,
1443    ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, CollectionMissing> {
1444        if ids.is_empty() {
1445            return Ok(vec![]);
1446        }
1447
1448        let collections = self.collections.lock().expect("lock poisoned");
1449
1450        let res = ids
1451            .into_iter()
1452            .map(|id| {
1453                collections
1454                    .get(&id)
1455                    .map(|c| CollectionFrontiers {
1456                        id: id.clone(),
1457                        write_frontier: c.write_frontier.clone(),
1458                        implied_capability: c.implied_capability.clone(),
1459                        read_capabilities: c.read_capabilities.frontier().to_owned(),
1460                    })
1461                    .ok_or(CollectionMissing(id))
1462            })
1463            .collect::<Result<Vec<_>, _>>()?;
1464
1465        Ok(res)
1466    }
1467
1468    fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
1469        let collections = self.collections.lock().expect("lock poisoned");
1470
1471        let res = collections
1472            .iter()
1473            .filter(|(_id, c)| !c.is_dropped())
1474            .map(|(id, c)| CollectionFrontiers {
1475                id: id.clone(),
1476                write_frontier: c.write_frontier.clone(),
1477                implied_capability: c.implied_capability.clone(),
1478                read_capabilities: c.read_capabilities.frontier().to_owned(),
1479            })
1480            .collect_vec();
1481
1482        res
1483    }
1484
1485    async fn snapshot_stats(
1486        &self,
1487        id: GlobalId,
1488        as_of: Antichain<Self::Timestamp>,
1489    ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
1490        let metadata = self.collection_metadata(id)?;
1491
1492        // See the comments in StorageController::snapshot for what's going on
1493        // here.
1494        let as_of = match metadata.txns_shard.as_ref() {
1495            None => SnapshotStatsAsOf::Direct(as_of),
1496            Some(txns_id) => {
1497                assert_eq!(txns_id, self.txns_read.txns_id());
1498                let as_of = as_of
1499                    .into_option()
1500                    .expect("cannot read as_of the empty antichain");
1501                self.txns_read.update_gt(as_of.clone()).await;
1502                let data_snapshot = self
1503                    .txns_read
1504                    .data_snapshot(metadata.data_shard, as_of.clone())
1505                    .await;
1506                SnapshotStatsAsOf::Txns(data_snapshot)
1507            }
1508        };
1509        self.snapshot_stats_inner(id, as_of).await
1510    }
1511
1512    async fn snapshot_parts_stats(
1513        &self,
1514        id: GlobalId,
1515        as_of: Antichain<Self::Timestamp>,
1516    ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
1517        let metadata = {
1518            let self_collections = self.collections.lock().expect("lock poisoned");
1519
1520            let collection_metadata = self_collections
1521                .get(&id)
1522                .ok_or(StorageError::IdentifierMissing(id))
1523                .map(|c| c.collection_metadata.clone());
1524
1525            match collection_metadata {
1526                Ok(m) => m,
1527                Err(e) => return Box::pin(async move { Err(e) }),
1528            }
1529        };
1530
1531        // See the comments in StorageController::snapshot for what's going on
1532        // here.
1533        let persist = Arc::clone(&self.persist);
1534        let read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await;
1535
1536        let data_snapshot = match (metadata, as_of.as_option()) {
1537            (
1538                CollectionMetadata {
1539                    txns_shard: Some(txns_id),
1540                    data_shard,
1541                    ..
1542                },
1543                Some(as_of),
1544            ) => {
1545                assert_eq!(txns_id, *self.txns_read.txns_id());
1546                self.txns_read.update_gt(as_of.clone()).await;
1547                let data_snapshot = self
1548                    .txns_read
1549                    .data_snapshot(data_shard, as_of.clone())
1550                    .await;
1551                Some(data_snapshot)
1552            }
1553            _ => None,
1554        };
1555
1556        Box::pin(async move {
1557            let read_handle = read_handle?;
1558            let result = match data_snapshot {
1559                Some(data_snapshot) => data_snapshot.snapshot_parts_stats(&read_handle).await,
1560                None => read_handle.snapshot_parts_stats(as_of).await,
1561            };
1562            read_handle.expire().await;
1563            result.map_err(|_| StorageError::ReadBeforeSince(id))
1564        })
1565    }
1566
1567    // TODO(petrosagg): This signature is not very useful in the context of partially ordered times
1568    // where the as_of frontier might have multiple elements. In the current form the mutually
1569    // incomparable updates will be accumulated together to a state of the collection that never
1570    // actually existed. We should include the original time in the updates advanced by the as_of
1571    // frontier in the result and let the caller decide what to do with the information.
1572    fn snapshot(
1573        &self,
1574        id: GlobalId,
1575        as_of: Self::Timestamp,
1576    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>> {
1577        self.snapshot(id, as_of, &self.txns_read)
1578    }
1579
1580    async fn snapshot_latest(
1581        &self,
1582        id: GlobalId,
1583    ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
1584        let upper = self.recent_upper(id).await?;
1585        let res = match upper.as_option() {
1586            Some(f) if f > &T::minimum() => {
1587                let as_of = f.step_back().unwrap();
1588
1589                let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap();
1590                snapshot
1591                    .into_iter()
1592                    .map(|(row, diff)| {
1593                        assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
1594                        row
1595                    })
1596                    .collect()
1597            }
1598            Some(_min) => {
1599                // The collection must be empty!
1600                Vec::new()
1601            }
1602            // The collection is closed, we cannot determine a latest read
1603            // timestamp based on the upper.
1604            _ => {
1605                return Err(StorageError::InvalidUsage(
1606                    "collection closed, cannot determine a read timestamp based on the upper"
1607                        .to_string(),
1608                ));
1609            }
1610        };
1611
1612        Ok(res)
1613    }
1614
1615    fn snapshot_cursor(
1616        &self,
1617        id: GlobalId,
1618        as_of: Self::Timestamp,
1619    ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
1620    where
1621        Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
1622    {
1623        let metadata = match self.collection_metadata(id) {
1624            Ok(metadata) => metadata.clone(),
1625            Err(e) => return async { Err(e.into()) }.boxed(),
1626        };
1627        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1628            // Ensure the txn's shard the controller has is the same that this
1629            // collection is registered to.
1630            assert_eq!(txns_id, self.txns_read.txns_id());
1631            self.txns_read.clone()
1632        });
1633        let persist = Arc::clone(&self.persist);
1634
1635        // See the comments in Self::snapshot for what's going on here.
1636        async move {
1637            let mut handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1638            let cursor = match txns_read {
1639                None => {
1640                    let cursor = handle
1641                        .snapshot_cursor(Antichain::from_elem(as_of), |_| true)
1642                        .await
1643                        .map_err(|_| StorageError::ReadBeforeSince(id))?;
1644                    SnapshotCursor {
1645                        _read_handle: handle,
1646                        cursor,
1647                    }
1648                }
1649                Some(txns_read) => {
1650                    txns_read.update_gt(as_of.clone()).await;
1651                    let data_snapshot = txns_read
1652                        .data_snapshot(metadata.data_shard, as_of.clone())
1653                        .await;
1654                    let cursor = data_snapshot
1655                        .snapshot_cursor(&mut handle, |_| true)
1656                        .await
1657                        .map_err(|_| StorageError::ReadBeforeSince(id))?;
1658                    SnapshotCursor {
1659                        _read_handle: handle,
1660                        cursor,
1661                    }
1662                }
1663            };
1664
1665            Ok(cursor)
1666        }
1667        .boxed()
1668    }
1669
1670    fn snapshot_and_stream(
1671        &self,
1672        id: GlobalId,
1673        as_of: Self::Timestamp,
1674    ) -> BoxFuture<
1675        'static,
1676        Result<
1677            BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
1678            StorageError<Self::Timestamp>,
1679        >,
1680    >
1681    where
1682        Self::Timestamp: TimelyTimestamp + Lattice + Codec64 + 'static,
1683    {
1684        self.snapshot_and_stream(id, as_of, &self.txns_read)
1685    }
1686
1687    fn create_update_builder(
1688        &self,
1689        id: GlobalId,
1690    ) -> BoxFuture<
1691        'static,
1692        Result<
1693            TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
1694            StorageError<Self::Timestamp>,
1695        >,
1696    > {
1697        let metadata = match self.collection_metadata(id) {
1698            Ok(m) => m,
1699            Err(e) => return Box::pin(async move { Err(e.into()) }),
1700        };
1701        let persist = Arc::clone(&self.persist);
1702
1703        async move {
1704            let persist_client = persist
1705                .open(metadata.persist_location.clone())
1706                .await
1707                .expect("invalid persist usage");
1708            let write_handle = persist_client
1709                .open_writer::<SourceData, (), Self::Timestamp, StorageDiff>(
1710                    metadata.data_shard,
1711                    Arc::new(metadata.relation_desc.clone()),
1712                    Arc::new(UnitSchema),
1713                    Diagnostics {
1714                        shard_name: id.to_string(),
1715                        handle_purpose: format!("create write batch {}", id),
1716                    },
1717                )
1718                .await
1719                .expect("invalid persist usage");
1720            let builder = TimestamplessUpdateBuilder::new(&write_handle);
1721
1722            Ok(builder)
1723        }
1724        .boxed()
1725    }
1726
1727    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
1728        let collections = self.collections.lock().expect("lock poisoned");
1729
1730        if collections.contains_key(&id) {
1731            Ok(())
1732        } else {
1733            Err(StorageError::IdentifierMissing(id))
1734        }
1735    }
1736
1737    async fn prepare_state(
1738        &self,
1739        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
1740        ids_to_add: BTreeSet<GlobalId>,
1741        ids_to_drop: BTreeSet<GlobalId>,
1742        ids_to_register: BTreeMap<GlobalId, ShardId>,
1743    ) -> Result<(), StorageError<T>> {
1744        txn.insert_collection_metadata(
1745            ids_to_add
1746                .into_iter()
1747                .map(|id| (id, ShardId::new()))
1748                .collect(),
1749        )?;
1750        txn.insert_collection_metadata(ids_to_register)?;
1751
1752        // Delete the metadata for any dropped collections.
1753        let dropped_mappings = txn.delete_collection_metadata(ids_to_drop);
1754
1755        let dropped_shards = dropped_mappings
1756            .into_iter()
1757            .map(|(_id, shard)| shard)
1758            .collect();
1759
1760        txn.insert_unfinalized_shards(dropped_shards)?;
1761
1762        // Reconcile any shards we've successfully finalized with the shard
1763        // finalization collection.
1764        let finalized_shards = self.finalized_shards.lock().iter().copied().collect();
1765        txn.mark_shards_as_finalized(finalized_shards);
1766
1767        Ok(())
1768    }
1769
1770    // TODO(aljoscha): It would be swell if we could refactor this Leviathan of
1771    // a method/move individual parts to their own methods.
1772    #[instrument(level = "debug")]
1773    async fn create_collections_for_bootstrap(
1774        &self,
1775        storage_metadata: &StorageMetadata,
1776        register_ts: Option<Self::Timestamp>,
1777        mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
1778        migrated_storage_collections: &BTreeSet<GlobalId>,
1779    ) -> Result<(), StorageError<Self::Timestamp>> {
1780        let is_in_txns = |id, metadata: &CollectionMetadata| {
1781            metadata.txns_shard.is_some()
1782                && !(self.read_only && migrated_storage_collections.contains(&id))
1783        };
1784
1785        // Validate first, to avoid corrupting state.
1786        // 1. create a dropped identifier, or
1787        // 2. create an existing identifier with a new description.
1788        // Make sure to check for errors within `ingestions` as well.
1789        collections.sort_by_key(|(id, _)| *id);
1790        collections.dedup();
1791        for pos in 1..collections.len() {
1792            if collections[pos - 1].0 == collections[pos].0 {
1793                return Err(StorageError::CollectionIdReused(collections[pos].0));
1794            }
1795        }
1796
1797        {
1798            // Early sanity check: if we knew about a collection already it's
1799            // description must match!
1800            //
1801            // NOTE: There could be concurrent modifications to
1802            // `self.collections`, but this sanity check is better than nothing.
1803            let self_collections = self.collections.lock().expect("lock poisoned");
1804            for (id, description) in collections.iter() {
1805                if let Some(existing_collection) = self_collections.get(id) {
1806                    if &existing_collection.description != description {
1807                        return Err(StorageError::CollectionIdReused(*id));
1808                    }
1809                }
1810            }
1811        }
1812
1813        // We first enrich each collection description with some additional
1814        // metadata...
1815        let enriched_with_metadata = collections
1816            .into_iter()
1817            .map(|(id, description)| {
1818                let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
1819
1820                // If the shard is being managed by txn-wal (initially,
1821                // tables), then we need to pass along the shard id for the txns
1822                // shard to dataflow rendering.
1823                let txns_shard = description
1824                    .data_source
1825                    .in_txns()
1826                    .then(|| *self.txns_read.txns_id());
1827
1828                let metadata = CollectionMetadata {
1829                    persist_location: self.persist_location.clone(),
1830                    data_shard,
1831                    relation_desc: description.desc.clone(),
1832                    txns_shard,
1833                };
1834
1835                Ok((id, description, metadata))
1836            })
1837            .collect_vec();
1838
1839        // So that we can open `SinceHandle`s for each collections concurrently.
1840        let persist_client = self
1841            .persist
1842            .open(self.persist_location.clone())
1843            .await
1844            .unwrap();
1845        let persist_client = &persist_client;
1846        // Reborrow the `&mut self` as immutable, as all the concurrent work to
1847        // be processed in this stream cannot all have exclusive access.
1848        use futures::stream::{StreamExt, TryStreamExt};
1849        let this = &*self;
1850        let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
1851            .map(|data: Result<_, StorageError<Self::Timestamp>>| {
1852                let register_ts = register_ts.clone();
1853                async move {
1854                    let (id, description, metadata) = data?;
1855
1856                    // should be replaced with real introspection
1857                    // (https://github.com/MaterializeInc/database-issues/issues/4078)
1858                    // but for now, it's helpful to have this mapping written down
1859                    // somewhere
1860                    debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
1861
1862                    // If this collection has a primary, the primary is responsible for downgrading
1863                    // the critical since and it would be an error if we did so here while opening
1864                    // the since handle.
1865                    let since = if description.primary.is_some() {
1866                        None
1867                    } else {
1868                        description.since.as_ref()
1869                    };
1870
1871                    let (write, mut since_handle) = this
1872                        .open_data_handles(
1873                            &id,
1874                            metadata.data_shard,
1875                            since,
1876                            metadata.relation_desc.clone(),
1877                            persist_client,
1878                        )
1879                        .await;
1880
1881                    // Present tables as springing into existence at the register_ts
1882                    // by advancing the since. Otherwise, we could end up in a
1883                    // situation where a table with a long compaction window appears
1884                    // to exist before the environment (and this the table) existed.
1885                    //
1886                    // We could potentially also do the same thing for other
1887                    // sources, in particular storage's internal sources and perhaps
1888                    // others, but leave them for now.
1889                    match description.data_source {
1890                        DataSource::Introspection(_)
1891                        | DataSource::IngestionExport { .. }
1892                        | DataSource::Webhook
1893                        | DataSource::Ingestion(_)
1894                        | DataSource::Progress
1895                        | DataSource::Other => {}
1896                        DataSource::Sink { .. } => {}
1897                        DataSource::Table => {
1898                            let register_ts = register_ts.expect(
1899                                "caller should have provided a register_ts when creating a table",
1900                            );
1901                            if since_handle.since().elements() == &[T::minimum()]
1902                                && !migrated_storage_collections.contains(&id)
1903                            {
1904                                debug!("advancing {} to initial since of {:?}", id, register_ts);
1905                                let token = since_handle.opaque();
1906                                let _ = since_handle
1907                                    .compare_and_downgrade_since(
1908                                        &token,
1909                                        (&token, &Antichain::from_elem(register_ts.clone())),
1910                                    )
1911                                    .await;
1912                            }
1913                        }
1914                    }
1915
1916                    Ok::<_, StorageError<Self::Timestamp>>((
1917                        id,
1918                        description,
1919                        write,
1920                        since_handle,
1921                        metadata,
1922                    ))
1923                }
1924            })
1925            // Poll each future for each collection concurrently, maximum of 50 at a time.
1926            .buffer_unordered(50)
1927            // HERE BE DRAGONS:
1928            //
1929            // There are at least 2 subtleties in using `FuturesUnordered`
1930            // (which `buffer_unordered` uses underneath:
1931            // - One is captured here
1932            //   <https://github.com/rust-lang/futures-rs/issues/2387>
1933            // - And the other is deadlocking if processing an OUTPUT of a
1934            //   `FuturesUnordered` stream attempts to obtain an async mutex that
1935            //   is also obtained in the futures being polled.
1936            //
1937            // Both of these could potentially be issues in all usages of
1938            // `buffer_unordered` in this method, so we stick the standard
1939            // advice: only use `try_collect` or `collect`!
1940            .try_collect()
1941            .await?;
1942
1943        // Reorder in dependency order.
1944        #[derive(Ord, PartialOrd, Eq, PartialEq)]
1945        enum DependencyOrder {
1946            /// Tables should always be registered first, and large ids before small ones.
1947            Table(Reverse<GlobalId>),
1948            /// For most collections the id order is the correct one.
1949            Collection(GlobalId),
1950            /// Sinks should always be registered last.
1951            Sink(GlobalId),
1952        }
1953        to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
1954            DataSource::Table => DependencyOrder::Table(Reverse(*id)),
1955            DataSource::Sink { .. } => DependencyOrder::Sink(*id),
1956            _ => DependencyOrder::Collection(*id),
1957        });
1958
1959        // We hold this lock for a very short amount of time, just doing some
1960        // hashmap inserts and unbounded channel sends.
1961        let mut self_collections = self.collections.lock().expect("lock poisoned");
1962
1963        for (id, description, write_handle, since_handle, metadata) in to_register {
1964            let write_frontier = write_handle.upper();
1965            let data_shard_since = since_handle.since().clone();
1966
1967            // Determine if this collection has any dependencies.
1968            let storage_dependencies =
1969                self.determine_collection_dependencies(&*self_collections, id, &description)?;
1970
1971            // Determine the initial since of the collection.
1972            let initial_since = match storage_dependencies
1973                .iter()
1974                .at_most_one()
1975                .expect("should have at most one dependency")
1976            {
1977                Some(dep) => {
1978                    let dependency_collection = self_collections
1979                        .get(dep)
1980                        .ok_or(StorageError::IdentifierMissing(*dep))?;
1981                    let dependency_since = dependency_collection.implied_capability.clone();
1982
1983                    // If an item has a dependency, its initial since must be
1984                    // advanced as far as its dependency, i.e. a dependency's
1985                    // since may never be in advance of its dependents.
1986                    //
1987                    // We have to do this every time we initialize the
1988                    // collection, though––the invariant might have been upheld
1989                    // correctly in the previous epoch, but the
1990                    // `data_shard_since` might not have compacted and, on
1991                    // establishing a new persist connection, still have data we
1992                    // said _could_ be compacted.
1993                    if PartialOrder::less_than(&data_shard_since, &dependency_since) {
1994                        // The dependency since cannot be beyond the dependent
1995                        // (our) upper unless the collection is new. In
1996                        // practice, the depdenency is the remap shard of a
1997                        // source (export), and if the since is allowed to
1998                        // "catch up" to the upper, that is `upper <= since`, a
1999                        // restarting ingestion cannot differentiate between
2000                        // updates that have already been written out to the
2001                        // backing persist shard and updates that have yet to be
2002                        // written. We would write duplicate updates.
2003                        //
2004                        // If this check fails, it means that the read hold
2005                        // installed on the dependency was probably not upheld
2006                        // –– if it were, the dependency's since could not have
2007                        // advanced as far the dependent's upper.
2008                        //
2009                        // We don't care about the dependency since when the
2010                        // write frontier is empty. In that case, no-one can
2011                        // write down any more updates.
2012                        mz_ore::soft_assert_or_log!(
2013                            write_frontier.elements() == &[T::minimum()]
2014                                || write_frontier.is_empty()
2015                                || PartialOrder::less_than(&dependency_since, write_frontier),
2016                            "dependency ({dep}) since has advanced past dependent ({id}) upper \n
2017                            dependent ({id}): since {:?}, upper {:?} \n
2018                            dependency ({dep}): since {:?}",
2019                            data_shard_since,
2020                            write_frontier,
2021                            dependency_since
2022                        );
2023
2024                        dependency_since
2025                    } else {
2026                        data_shard_since
2027                    }
2028                }
2029                None => data_shard_since,
2030            };
2031
2032            let mut collection_state = CollectionState::new(
2033                description,
2034                initial_since,
2035                write_frontier.clone(),
2036                storage_dependencies,
2037                metadata.clone(),
2038            );
2039
2040            // Install the collection state in the appropriate spot.
2041            match &collection_state.description.data_source {
2042                DataSource::Introspection(_) => {
2043                    self_collections.insert(id, collection_state);
2044                }
2045                DataSource::Webhook => {
2046                    self_collections.insert(id, collection_state);
2047                }
2048                DataSource::IngestionExport {
2049                    ingestion_id,
2050                    details,
2051                    data_config,
2052                } => {
2053                    // Adjust the source to contain this export.
2054                    let source_collection = self_collections
2055                        .get_mut(ingestion_id)
2056                        .expect("known to exist");
2057                    match &mut source_collection.description {
2058                        CollectionDescription {
2059                            data_source: DataSource::Ingestion(ingestion_desc),
2060                            ..
2061                        } => ingestion_desc.source_exports.insert(
2062                            id,
2063                            SourceExport {
2064                                storage_metadata: (),
2065                                details: details.clone(),
2066                                data_config: data_config.clone(),
2067                            },
2068                        ),
2069                        _ => unreachable!(
2070                            "SourceExport must only refer to primary sources that already exist"
2071                        ),
2072                    };
2073
2074                    self_collections.insert(id, collection_state);
2075                }
2076                DataSource::Table => {
2077                    // See comment on self.initial_txn_upper on why we're doing
2078                    // this.
2079                    if is_in_txns(id, &metadata)
2080                        && PartialOrder::less_than(
2081                            &collection_state.write_frontier,
2082                            &self.initial_txn_upper,
2083                        )
2084                    {
2085                        // We could try and be cute and use the join of the txn
2086                        // upper and the table upper. But that has more
2087                        // complicated reasoning for why it is or isn't correct,
2088                        // and we're only dealing with totally ordered times
2089                        // here.
2090                        collection_state
2091                            .write_frontier
2092                            .clone_from(&self.initial_txn_upper);
2093                    }
2094                    self_collections.insert(id, collection_state);
2095                }
2096                DataSource::Progress | DataSource::Other => {
2097                    self_collections.insert(id, collection_state);
2098                }
2099                DataSource::Ingestion(_) => {
2100                    self_collections.insert(id, collection_state);
2101                }
2102                DataSource::Sink { .. } => {
2103                    self_collections.insert(id, collection_state);
2104                }
2105            }
2106
2107            self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle);
2108
2109            // If this collection has a dependency, install a read hold on it.
2110            self.install_collection_dependency_read_holds_inner(&mut *self_collections, id)?;
2111        }
2112
2113        drop(self_collections);
2114
2115        self.synchronize_finalized_shards(storage_metadata);
2116
2117        Ok(())
2118    }
2119
2120    async fn alter_ingestion_source_desc(
2121        &self,
2122        ingestion_id: GlobalId,
2123        source_desc: SourceDesc,
2124    ) -> Result<(), StorageError<Self::Timestamp>> {
2125        // The StorageController checks the validity of these. And we just
2126        // accept them.
2127
2128        let mut self_collections = self.collections.lock().expect("lock poisoned");
2129        let collection = self_collections
2130            .get_mut(&ingestion_id)
2131            .ok_or(StorageError::IdentifierMissing(ingestion_id))?;
2132
2133        let curr_ingestion = match &mut collection.description.data_source {
2134            DataSource::Ingestion(active_ingestion) => active_ingestion,
2135            _ => unreachable!("verified collection refers to ingestion"),
2136        };
2137
2138        curr_ingestion.desc = source_desc;
2139        debug!("altered {ingestion_id}'s SourceDesc");
2140
2141        Ok(())
2142    }
2143
2144    async fn alter_ingestion_export_data_configs(
2145        &self,
2146        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
2147    ) -> Result<(), StorageError<Self::Timestamp>> {
2148        let mut self_collections = self.collections.lock().expect("lock poisoned");
2149
2150        for (source_export_id, new_data_config) in source_exports {
2151            // We need to adjust the data config on the CollectionState for
2152            // the source export collection directly
2153            let source_export_collection = self_collections
2154                .get_mut(&source_export_id)
2155                .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2156            let ingestion_id = match &mut source_export_collection.description.data_source {
2157                DataSource::IngestionExport {
2158                    ingestion_id,
2159                    details: _,
2160                    data_config,
2161                } => {
2162                    *data_config = new_data_config.clone();
2163                    *ingestion_id
2164                }
2165                o => {
2166                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2167                    Err(StorageError::IdentifierInvalid(source_export_id))?
2168                }
2169            };
2170            // We also need to adjust the data config on the CollectionState of the
2171            // Ingestion that the export is associated with.
2172            let ingestion_collection = self_collections
2173                .get_mut(&ingestion_id)
2174                .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
2175
2176            match &mut ingestion_collection.description.data_source {
2177                DataSource::Ingestion(ingestion_desc) => {
2178                    let source_export = ingestion_desc
2179                        .source_exports
2180                        .get_mut(&source_export_id)
2181                        .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2182
2183                    if source_export.data_config != new_data_config {
2184                        tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
2185                        source_export.data_config = new_data_config;
2186                    } else {
2187                        tracing::warn!(
2188                            "alter_ingestion_export_data_configs called on \
2189                                    export {source_export_id} of {ingestion_id} but \
2190                                    the data config was the same"
2191                        );
2192                    }
2193                }
2194                o => {
2195                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2196                    Err(StorageError::IdentifierInvalid(ingestion_id))?;
2197                }
2198            }
2199        }
2200
2201        Ok(())
2202    }
2203
2204    async fn alter_ingestion_connections(
2205        &self,
2206        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
2207    ) -> Result<(), StorageError<Self::Timestamp>> {
2208        let mut self_collections = self.collections.lock().expect("lock poisoned");
2209
2210        for (id, conn) in source_connections {
2211            let collection = self_collections
2212                .get_mut(&id)
2213                .ok_or_else(|| StorageError::IdentifierMissing(id))?;
2214
2215            match &mut collection.description.data_source {
2216                DataSource::Ingestion(ingestion) => {
2217                    // If the connection hasn't changed, there's no sense in
2218                    // re-rendering the dataflow.
2219                    if ingestion.desc.connection != conn {
2220                        info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
2221                        ingestion.desc.connection = conn;
2222                    } else {
2223                        warn!(
2224                            "update_source_connection called on {id} but the \
2225                            connection was the same"
2226                        );
2227                    }
2228                }
2229                o => {
2230                    warn!("update_source_connection called on {:?}", o);
2231                    Err(StorageError::IdentifierInvalid(id))?;
2232                }
2233            }
2234        }
2235
2236        Ok(())
2237    }
2238
2239    async fn alter_table_desc(
2240        &self,
2241        existing_collection: GlobalId,
2242        new_collection: GlobalId,
2243        new_desc: RelationDesc,
2244        expected_version: RelationVersion,
2245    ) -> Result<(), StorageError<Self::Timestamp>> {
2246        let data_shard = {
2247            let self_collections = self.collections.lock().expect("lock poisoned");
2248            let existing = self_collections
2249                .get(&existing_collection)
2250                .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;
2251
2252            // TODO(alter_table): Support changes to sources.
2253            if existing.description.data_source != DataSource::Table {
2254                return Err(StorageError::IdentifierInvalid(existing_collection));
2255            }
2256
2257            existing.collection_metadata.data_shard
2258        };
2259
2260        let persist_client = self
2261            .persist
2262            .open(self.persist_location.clone())
2263            .await
2264            .unwrap();
2265
2266        // Evolve the schema of this shard.
2267        let diagnostics = Diagnostics {
2268            shard_name: existing_collection.to_string(),
2269            handle_purpose: "alter_table_desc".to_string(),
2270        };
2271        // We map the Adapter's RelationVersion 1:1 with SchemaId.
2272        let expected_schema = expected_version.into();
2273        let schema_result = persist_client
2274            .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
2275                data_shard,
2276                expected_schema,
2277                &new_desc,
2278                &UnitSchema,
2279                diagnostics,
2280            )
2281            .await
2282            .map_err(|e| StorageError::InvalidUsage(e.to_string()))?;
2283        tracing::info!(
2284            ?existing_collection,
2285            ?new_collection,
2286            ?new_desc,
2287            "evolved schema"
2288        );
2289
2290        match schema_result {
2291            CaESchema::Ok(id) => id,
2292            // TODO(alter_table): If we get an expected mismatch we should retry.
2293            CaESchema::ExpectedMismatch {
2294                schema_id,
2295                key,
2296                val,
2297            } => {
2298                mz_ore::soft_panic_or_log!(
2299                    "schema expectation mismatch {schema_id:?}, {key:?}, {val:?}"
2300                );
2301                return Err(StorageError::Generic(anyhow::anyhow!(
2302                    "schema expected mismatch, {existing_collection:?}",
2303                )));
2304            }
2305            CaESchema::Incompatible => {
2306                mz_ore::soft_panic_or_log!(
2307                    "incompatible schema! {existing_collection} {new_desc:?}"
2308                );
2309                return Err(StorageError::Generic(anyhow::anyhow!(
2310                    "schema incompatible, {existing_collection:?}"
2311                )));
2312            }
2313        };
2314
2315        // Once the new schema is registered we can open new data handles.
2316        let (write_handle, since_handle) = self
2317            .open_data_handles(
2318                &new_collection,
2319                data_shard,
2320                None,
2321                new_desc.clone(),
2322                &persist_client,
2323            )
2324            .await;
2325
2326        // TODO(alter_table): Do we need to advance the since of the table to match the time this
2327        // new version was registered with txn-wal?
2328
2329        // Great! Our new schema is registered with Persist, now we need to update our internal
2330        // data structures.
2331        {
2332            let mut self_collections = self.collections.lock().expect("lock poisoned");
2333
2334            // Update the existing collection so we know it's a "projection" of this new one.
2335            let existing = self_collections
2336                .get_mut(&existing_collection)
2337                .expect("existing collection missing");
2338
2339            // A higher level should already be asserting this, but let's make sure.
2340            assert_eq!(existing.description.data_source, DataSource::Table);
2341            assert_none!(existing.description.primary);
2342
2343            // The existing version of the table will depend on the new version.
2344            existing.description.primary = Some(new_collection);
2345            existing.storage_dependencies.push(new_collection);
2346
2347            // Copy over the frontiers from the previous version.
2348            // The new table starts with two holds - the implied capability, and the hold from
2349            // the previous version - both at the previous version's read frontier.
2350            let implied_capability = existing.read_capabilities.frontier().to_owned();
2351            let write_frontier = existing.write_frontier.clone();
2352
2353            // Determine the relevant read capabilities on the new collection.
2354            //
2355            // Note(parkmycar): Originally we used `install_collection_dependency_read_holds_inner`
2356            // here, but that only installed a ReadHold on the new collection for the implied
2357            // capability of the existing collection. This would cause runtime panics because it
2358            // would eventually result in negative read capabilities.
2359            let mut changes = ChangeBatch::new();
2360            changes.extend(implied_capability.iter().map(|t| (t.clone(), 1)));
2361
2362            // Note: The new collection is now the "primary collection".
2363            let collection_desc = CollectionDescription::for_table(new_desc.clone());
2364            let collection_meta = CollectionMetadata {
2365                persist_location: self.persist_location.clone(),
2366                relation_desc: collection_desc.desc.clone(),
2367                data_shard,
2368                txns_shard: Some(self.txns_read.txns_id().clone()),
2369            };
2370            let collection_state = CollectionState::new(
2371                collection_desc,
2372                implied_capability,
2373                write_frontier,
2374                Vec::new(),
2375                collection_meta,
2376            );
2377
2378            // Add a record of the new collection.
2379            self_collections.insert(new_collection, collection_state);
2380
2381            let mut updates = BTreeMap::from([(new_collection, changes)]);
2382            StorageCollectionsImpl::update_read_capabilities_inner(
2383                &self.cmd_tx,
2384                &mut *self_collections,
2385                &mut updates,
2386            );
2387        };
2388
2389        // TODO(alter_table): Support changes to sources.
2390        self.register_handles(new_collection, true, since_handle, write_handle);
2391
2392        info!(%existing_collection, %new_collection, ?new_desc, "altered table");
2393
2394        Ok(())
2395    }
2396
2397    fn drop_collections_unvalidated(
2398        &self,
2399        storage_metadata: &StorageMetadata,
2400        identifiers: Vec<GlobalId>,
2401    ) {
2402        debug!(?identifiers, "drop_collections_unvalidated");
2403
2404        let mut self_collections = self.collections.lock().expect("lock poisoned");
2405
2406        for id in identifiers.iter() {
2407            let metadata = storage_metadata.get_collection_shard::<T>(*id);
2408            mz_ore::soft_assert_or_log!(
2409                matches!(metadata, Err(StorageError::IdentifierMissing(_))),
2410                "dropping {id}, but drop was not synchronized with storage \
2411                controller via `synchronize_collections`"
2412            );
2413
2414            let dropped_data_source = match self_collections.get(id) {
2415                Some(col) => col.description.data_source.clone(),
2416                None => continue,
2417            };
2418
2419            // If we are dropping source exports, we need to modify the
2420            // ingestion that it runs on.
2421            if let DataSource::IngestionExport { ingestion_id, .. } = dropped_data_source {
2422                // Adjust the source to remove this export.
2423                let ingestion = match self_collections.get_mut(&ingestion_id) {
2424                    Some(ingestion) => ingestion,
2425                    // Primary ingestion already dropped.
2426                    None => {
2427                        tracing::error!(
2428                            "primary source {ingestion_id} seemingly dropped before subsource {id}",
2429                        );
2430                        continue;
2431                    }
2432                };
2433
2434                match &mut ingestion.description {
2435                    CollectionDescription {
2436                        data_source: DataSource::Ingestion(ingestion_desc),
2437                        ..
2438                    } => {
2439                        let removed = ingestion_desc.source_exports.remove(id);
2440                        mz_ore::soft_assert_or_log!(
2441                            removed.is_some(),
2442                            "dropped subsource {id} already removed from source exports"
2443                        );
2444                    }
2445                    _ => unreachable!(
2446                        "SourceExport must only refer to primary sources that already exist"
2447                    ),
2448                };
2449            }
2450        }
2451
2452        // Policies that advance the since to the empty antichain. We do still
2453        // honor outstanding read holds, and collections will only be dropped
2454        // once those are removed as well.
2455        //
2456        // We don't explicitly remove read capabilities! Downgrading the
2457        // frontier of the source to `[]` (the empty Antichain), will propagate
2458        // to the storage dependencies.
2459        let mut finalized_policies = Vec::new();
2460
2461        for id in identifiers {
2462            // Make sure it's still there, might already have been deleted.
2463            if self_collections.contains_key(&id) {
2464                finalized_policies.push((id, ReadPolicy::ValidFrom(Antichain::new())));
2465            }
2466        }
2467        self.set_read_policies_inner(&mut self_collections, finalized_policies);
2468
2469        drop(self_collections);
2470
2471        self.synchronize_finalized_shards(storage_metadata);
2472    }
2473
2474    fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
2475        let mut collections = self.collections.lock().expect("lock poisoned");
2476
2477        if tracing::enabled!(tracing::Level::TRACE) {
2478            let user_capabilities = collections
2479                .iter_mut()
2480                .filter(|(id, _c)| id.is_user())
2481                .map(|(id, c)| {
2482                    let updates = c.read_capabilities.updates().cloned().collect_vec();
2483                    (*id, c.implied_capability.clone(), updates)
2484                })
2485                .collect_vec();
2486
2487            trace!(?policies, ?user_capabilities, "set_read_policies");
2488        }
2489
2490        self.set_read_policies_inner(&mut collections, policies);
2491
2492        if tracing::enabled!(tracing::Level::TRACE) {
2493            let user_capabilities = collections
2494                .iter_mut()
2495                .filter(|(id, _c)| id.is_user())
2496                .map(|(id, c)| {
2497                    let updates = c.read_capabilities.updates().cloned().collect_vec();
2498                    (*id, c.implied_capability.clone(), updates)
2499                })
2500                .collect_vec();
2501
2502            trace!(?user_capabilities, "after! set_read_policies");
2503        }
2504    }
2505
2506    fn acquire_read_holds(
2507        &self,
2508        desired_holds: Vec<GlobalId>,
2509    ) -> Result<Vec<ReadHold<Self::Timestamp>>, CollectionMissing> {
2510        if desired_holds.is_empty() {
2511            return Ok(vec![]);
2512        }
2513
2514        let mut collections = self.collections.lock().expect("lock poisoned");
2515
2516        let mut advanced_holds = Vec::new();
2517        // We advance the holds by our current since frontier. Can't acquire
2518        // holds for times that have been compacted away!
2519        //
2520        // NOTE: We acquire read holds at the earliest possible time rather than
2521        // at the implied capability. This is so that, for example, adapter can
2522        // acquire a read hold to hold back the frontier, giving the COMPUTE
2523        // controller a chance to also acquire a read hold at that early
2524        // frontier. If/when we change the interplay between adapter and COMPUTE
2525        // to pass around ReadHold tokens, we might tighten this up and instead
2526        // acquire read holds at the implied capability.
2527        for id in desired_holds.iter() {
2528            let collection = collections.get(id).ok_or(CollectionMissing(*id))?;
2529            let since = collection.read_capabilities.frontier().to_owned();
2530            advanced_holds.push((*id, since));
2531        }
2532
2533        let mut updates = advanced_holds
2534            .iter()
2535            .map(|(id, hold)| {
2536                let mut changes = ChangeBatch::new();
2537                changes.extend(hold.iter().map(|time| (time.clone(), 1)));
2538                (*id, changes)
2539            })
2540            .collect::<BTreeMap<_, _>>();
2541
2542        StorageCollectionsImpl::update_read_capabilities_inner(
2543            &self.cmd_tx,
2544            &mut collections,
2545            &mut updates,
2546        );
2547
2548        let acquired_holds = advanced_holds
2549            .into_iter()
2550            .map(|(id, since)| ReadHold::with_channel(id, since, self.holds_tx.clone()))
2551            .collect_vec();
2552
2553        trace!(?desired_holds, ?acquired_holds, "acquire_read_holds");
2554
2555        Ok(acquired_holds)
2556    }
2557
2558    /// Determine time dependence information for the object.
2559    fn determine_time_dependence(
2560        &self,
2561        id: GlobalId,
2562    ) -> Result<Option<TimeDependence>, TimeDependenceError> {
2563        use TimeDependenceError::CollectionMissing;
2564        let collections = self.collections.lock().expect("lock poisoned");
2565        let mut collection = Some(collections.get(&id).ok_or(CollectionMissing(id))?);
2566
2567        let mut result = None;
2568
2569        while let Some(c) = collection.take() {
2570            use DataSource::*;
2571            if let Some(timeline) = &c.description.timeline {
2572                // Only the epoch timeline follows wall-clock.
2573                if *timeline != Timeline::EpochMilliseconds {
2574                    break;
2575                }
2576            }
2577            match &c.description.data_source {
2578                Ingestion(ingestion) => {
2579                    use GenericSourceConnection::*;
2580                    match ingestion.desc.connection {
2581                        // Kafka, Postgres, MySql, and SQL Server sources all
2582                        // follow wall clock.
2583                        Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => {
2584                            result = Some(TimeDependence::default())
2585                        }
2586                        // Load generators not further specified.
2587                        LoadGenerator(_) => {}
2588                    }
2589                }
2590                IngestionExport { ingestion_id, .. } => {
2591                    let c = collections
2592                        .get(ingestion_id)
2593                        .ok_or(CollectionMissing(*ingestion_id))?;
2594                    collection = Some(c);
2595                }
2596                // Introspection, other, progress, table, and webhook sources follow wall clock.
2597                Introspection(_) | Progress | Table { .. } | Webhook { .. } => {
2598                    result = Some(TimeDependence::default())
2599                }
2600                // Materialized views, continual tasks, etc, aren't managed by storage.
2601                Other => {}
2602                Sink { .. } => {}
2603            };
2604        }
2605        Ok(result)
2606    }
2607
2608    fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2609        // Destructure `self` here so we don't forget to consider dumping newly added fields.
2610        let Self {
2611            envd_epoch,
2612            read_only,
2613            finalizable_shards,
2614            finalized_shards,
2615            collections,
2616            txns_read: _,
2617            config,
2618            initial_txn_upper,
2619            persist_location,
2620            persist: _,
2621            cmd_tx: _,
2622            holds_tx: _,
2623            _background_task: _,
2624            _finalize_shards_task: _,
2625        } = self;
2626
2627        let finalizable_shards: Vec<_> = finalizable_shards
2628            .lock()
2629            .iter()
2630            .map(ToString::to_string)
2631            .collect();
2632        let finalized_shards: Vec<_> = finalized_shards
2633            .lock()
2634            .iter()
2635            .map(ToString::to_string)
2636            .collect();
2637        let collections: BTreeMap<_, _> = collections
2638            .lock()
2639            .expect("poisoned")
2640            .iter()
2641            .map(|(id, c)| (id.to_string(), format!("{c:?}")))
2642            .collect();
2643        let config = format!("{:?}", config.lock().expect("poisoned"));
2644
2645        Ok(serde_json::json!({
2646            "envd_epoch": envd_epoch,
2647            "read_only": read_only,
2648            "finalizable_shards": finalizable_shards,
2649            "finalized_shards": finalized_shards,
2650            "collections": collections,
2651            "config": config,
2652            "initial_txn_upper": initial_txn_upper,
2653            "persist_location": format!("{persist_location:?}"),
2654        }))
2655    }
2656}
2657
2658/// Wraps either a "critical" [SinceHandle] or a leased [ReadHandle].
2659///
2660/// When a [StorageCollections] is in read-only mode, we will only ever acquire
2661/// [ReadHandle], because acquiring the [SinceHandle] and driving forward its
2662/// since is considered a write. Conversely, when in read-write mode, we acquire
2663/// [SinceHandle].
2664#[derive(Debug)]
2665enum SinceHandleWrapper<T>
2666where
2667    T: TimelyTimestamp + Lattice + Codec64,
2668{
2669    Critical(SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch>),
2670    Leased(ReadHandle<SourceData, (), T, StorageDiff>),
2671}
2672
2673impl<T> SinceHandleWrapper<T>
2674where
2675    T: TimelyTimestamp + Lattice + Codec64 + TotalOrder + Sync,
2676{
2677    pub fn since(&self) -> &Antichain<T> {
2678        match self {
2679            Self::Critical(handle) => handle.since(),
2680            Self::Leased(handle) => handle.since(),
2681        }
2682    }
2683
2684    pub fn opaque(&self) -> PersistEpoch {
2685        match self {
2686            Self::Critical(handle) => handle.opaque().clone(),
2687            Self::Leased(_handle) => {
2688                // The opaque is expected to be used with
2689                // `compare_and_downgrade_since`, and the leased handle doesn't
2690                // have a notion of an opaque. We pretend here and in
2691                // `compare_and_downgrade_since`.
2692                PersistEpoch(None)
2693            }
2694        }
2695    }
2696
2697    pub async fn compare_and_downgrade_since(
2698        &mut self,
2699        expected: &PersistEpoch,
2700        new: (&PersistEpoch, &Antichain<T>),
2701    ) -> Result<Antichain<T>, PersistEpoch> {
2702        match self {
2703            Self::Critical(handle) => handle.compare_and_downgrade_since(expected, new).await,
2704            Self::Leased(handle) => {
2705                let (opaque, since) = new;
2706                assert_none!(opaque.0);
2707
2708                handle.downgrade_since(since).await;
2709
2710                Ok(since.clone())
2711            }
2712        }
2713    }
2714
2715    pub async fn maybe_compare_and_downgrade_since(
2716        &mut self,
2717        expected: &PersistEpoch,
2718        new: (&PersistEpoch, &Antichain<T>),
2719    ) -> Option<Result<Antichain<T>, PersistEpoch>> {
2720        match self {
2721            Self::Critical(handle) => {
2722                handle
2723                    .maybe_compare_and_downgrade_since(expected, new)
2724                    .await
2725            }
2726            Self::Leased(handle) => {
2727                let (opaque, since) = new;
2728                assert_none!(opaque.0);
2729
2730                handle.maybe_downgrade_since(since).await;
2731
2732                Some(Ok(since.clone()))
2733            }
2734        }
2735    }
2736
2737    pub fn snapshot_stats(
2738        &self,
2739        id: GlobalId,
2740        as_of: Option<Antichain<T>>,
2741    ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2742        match self {
2743            Self::Critical(handle) => {
2744                let res = handle
2745                    .snapshot_stats(as_of)
2746                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2747                Box::pin(res)
2748            }
2749            Self::Leased(handle) => {
2750                let res = handle
2751                    .snapshot_stats(as_of)
2752                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2753                Box::pin(res)
2754            }
2755        }
2756    }
2757
2758    pub fn snapshot_stats_from_txn(
2759        &self,
2760        id: GlobalId,
2761        data_snapshot: DataSnapshot<T>,
2762    ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2763        match self {
2764            Self::Critical(handle) => Box::pin(
2765                data_snapshot
2766                    .snapshot_stats_from_critical(handle)
2767                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2768            ),
2769            Self::Leased(handle) => Box::pin(
2770                data_snapshot
2771                    .snapshot_stats_from_leased(handle)
2772                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2773            ),
2774        }
2775    }
2776}
2777
2778/// State maintained about individual collections.
2779#[derive(Debug, Clone)]
2780struct CollectionState<T> {
2781    /// Description with which the collection was created
2782    pub description: CollectionDescription<T>,
2783
2784    /// Accumulation of read capabilities for the collection.
2785    ///
2786    /// This accumulation will always contain `self.implied_capability`, but may
2787    /// also contain capabilities held by others who have read dependencies on
2788    /// this collection.
2789    pub read_capabilities: MutableAntichain<T>,
2790
2791    /// The implicit capability associated with collection creation.  This
2792    /// should never be less than the since of the associated persist
2793    /// collection.
2794    pub implied_capability: Antichain<T>,
2795
2796    /// The policy to use to downgrade `self.implied_capability`.
2797    pub read_policy: ReadPolicy<T>,
2798
2799    /// Storage identifiers on which this collection depends.
2800    pub storage_dependencies: Vec<GlobalId>,
2801
2802    /// Reported write frontier.
2803    pub write_frontier: Antichain<T>,
2804
2805    pub collection_metadata: CollectionMetadata,
2806}
2807
2808impl<T: TimelyTimestamp> CollectionState<T> {
2809    /// Creates a new collection state, with an initial read policy valid from
2810    /// `since`.
2811    pub fn new(
2812        description: CollectionDescription<T>,
2813        since: Antichain<T>,
2814        write_frontier: Antichain<T>,
2815        storage_dependencies: Vec<GlobalId>,
2816        metadata: CollectionMetadata,
2817    ) -> Self {
2818        let mut read_capabilities = MutableAntichain::new();
2819        read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2820        Self {
2821            description,
2822            read_capabilities,
2823            implied_capability: since.clone(),
2824            read_policy: ReadPolicy::NoPolicy {
2825                initial_since: since,
2826            },
2827            storage_dependencies,
2828            write_frontier,
2829            collection_metadata: metadata,
2830        }
2831    }
2832
2833    /// Returns whether the collection was dropped.
2834    pub fn is_dropped(&self) -> bool {
2835        self.read_capabilities.is_empty()
2836    }
2837}
2838
2839/// A task that keeps persist handles, downgrades sinces when asked,
2840/// periodically gets recent uppers from them, and updates the shard collection
2841/// state when needed.
2842///
2843/// This shares state with [StorageCollectionsImpl] via `Arcs` and channels.
2844#[derive(Debug)]
2845struct BackgroundTask<T: TimelyTimestamp + Lattice + Codec64> {
2846    config: Arc<Mutex<StorageConfiguration>>,
2847    cmds_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
2848    cmds_rx: mpsc::UnboundedReceiver<BackgroundCmd<T>>,
2849    holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch<T>)>,
2850    finalizable_shards: Arc<ShardIdSet>,
2851    collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
2852    // So we know what shard ID corresponds to what global ID, which we need
2853    // when re-enqueing futures for determining the next upper update.
2854    shard_by_id: BTreeMap<GlobalId, ShardId>,
2855    since_handles: BTreeMap<GlobalId, SinceHandleWrapper<T>>,
2856    txns_handle: Option<WriteHandle<SourceData, (), T, StorageDiff>>,
2857    txns_shards: BTreeSet<GlobalId>,
2858}
2859
2860#[derive(Debug)]
2861enum BackgroundCmd<T: TimelyTimestamp + Lattice + Codec64> {
2862    Register {
2863        id: GlobalId,
2864        is_in_txns: bool,
2865        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
2866        since_handle: SinceHandleWrapper<T>,
2867    },
2868    DowngradeSince(Vec<(GlobalId, Antichain<T>)>),
2869    SnapshotStats(
2870        GlobalId,
2871        SnapshotStatsAsOf<T>,
2872        oneshot::Sender<SnapshotStatsRes<T>>,
2873    ),
2874}
2875
2876/// A newtype wrapper to hang a Debug impl off of.
2877pub(crate) struct SnapshotStatsRes<T>(BoxFuture<'static, Result<SnapshotStats, StorageError<T>>>);
2878
2879impl<T> Debug for SnapshotStatsRes<T> {
2880    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2881        f.debug_struct("SnapshotStatsRes").finish_non_exhaustive()
2882    }
2883}
2884
2885impl<T> BackgroundTask<T>
2886where
2887    T: TimelyTimestamp
2888        + Lattice
2889        + Codec64
2890        + From<EpochMillis>
2891        + TimestampManipulation
2892        + Into<mz_repr::Timestamp>
2893        + Sync,
2894{
2895    async fn run(&mut self) {
2896        // Futures that fetch the recent upper from all other shards.
2897        let mut upper_futures: FuturesUnordered<
2898            std::pin::Pin<
2899                Box<
2900                    dyn Future<
2901                            Output = (
2902                                GlobalId,
2903                                WriteHandle<SourceData, (), T, StorageDiff>,
2904                                Antichain<T>,
2905                            ),
2906                        > + Send,
2907                >,
2908            >,
2909        > = FuturesUnordered::new();
2910
2911        let gen_upper_future =
2912            |id, mut handle: WriteHandle<_, _, _, _>, prev_upper: Antichain<T>| {
2913                let fut = async move {
2914                    soft_assert_or_log!(
2915                        !prev_upper.is_empty(),
2916                        "cannot await progress when upper is already empty"
2917                    );
2918                    handle.wait_for_upper_past(&prev_upper).await;
2919                    let new_upper = handle.shared_upper();
2920                    (id, handle, new_upper)
2921                };
2922
2923                fut
2924            };
2925
2926        let mut txns_upper_future = match self.txns_handle.take() {
2927            Some(txns_handle) => {
2928                let upper = txns_handle.upper().clone();
2929                let txns_upper_future =
2930                    gen_upper_future(GlobalId::Transient(1), txns_handle, upper);
2931                txns_upper_future.boxed()
2932            }
2933            None => async { std::future::pending().await }.boxed(),
2934        };
2935
2936        loop {
2937            tokio::select! {
2938                (id, handle, upper) = &mut txns_upper_future => {
2939                    trace!("new upper from txns shard: {:?}", upper);
2940                    let mut uppers = Vec::new();
2941                    for id in self.txns_shards.iter() {
2942                        uppers.push((*id, &upper));
2943                    }
2944                    self.update_write_frontiers(&uppers).await;
2945
2946                    let fut = gen_upper_future(id, handle, upper);
2947                    txns_upper_future = fut.boxed();
2948                }
2949                Some((id, handle, upper)) = upper_futures.next() => {
2950                    if id.is_user() {
2951                        trace!("new upper for collection {id}: {:?}", upper);
2952                    }
2953                    let current_shard = self.shard_by_id.get(&id);
2954                    if let Some(shard_id) = current_shard {
2955                        if shard_id == &handle.shard_id() {
2956                            // Still current, so process the update and enqueue
2957                            // again!
2958                            let uppers = &[(id, &upper)];
2959                            self.update_write_frontiers(uppers).await;
2960                            if !upper.is_empty() {
2961                                let fut = gen_upper_future(id, handle, upper);
2962                                upper_futures.push(fut.boxed());
2963                            }
2964                        } else {
2965                            // Be polite and expire the write handle. This can
2966                            // happen when we get an upper update for a write
2967                            // handle that has since been replaced via Update.
2968                            handle.expire().await;
2969                        }
2970                    }
2971                }
2972                cmd = self.cmds_rx.recv() => {
2973                    let cmd = if let Some(cmd) = cmd {
2974                        cmd
2975                    } else {
2976                        // We're done!
2977                        break;
2978                    };
2979
2980                    match cmd {
2981                        BackgroundCmd::Register{ id, is_in_txns, write_handle, since_handle } => {
2982                            debug!("registering handles for {}", id);
2983                            let previous = self.shard_by_id.insert(id, write_handle.shard_id());
2984                            if previous.is_some() {
2985                                panic!("already registered a WriteHandle for collection {id}");
2986                            }
2987
2988                            let previous = self.since_handles.insert(id, since_handle);
2989                            if previous.is_some() {
2990                                panic!("already registered a SinceHandle for collection {id}");
2991                            }
2992
2993                            if is_in_txns {
2994                                self.txns_shards.insert(id);
2995                            } else {
2996                                let upper = write_handle.upper().clone();
2997                                if !upper.is_empty() {
2998                                    let fut = gen_upper_future(id, write_handle, upper);
2999                                    upper_futures.push(fut.boxed());
3000                                }
3001                            }
3002
3003                        }
3004                        BackgroundCmd::DowngradeSince(cmds) => {
3005                            self.downgrade_sinces(cmds).await;
3006                        }
3007                        BackgroundCmd::SnapshotStats(id, as_of, tx) => {
3008                            // NB: The requested as_of could be arbitrarily far
3009                            // in the future. So, in order to avoid blocking
3010                            // this loop until it's available and the
3011                            // `snapshot_stats` call resolves, instead return
3012                            // the future to the caller and await it there.
3013                            let res = match self.since_handles.get(&id) {
3014                                Some(x) => {
3015                                    let fut: BoxFuture<
3016                                        'static,
3017                                        Result<SnapshotStats, StorageError<T>>,
3018                                    > = match as_of {
3019                                        SnapshotStatsAsOf::Direct(as_of) => {
3020                                            x.snapshot_stats(id, Some(as_of))
3021                                        }
3022                                        SnapshotStatsAsOf::Txns(data_snapshot) => {
3023                                            x.snapshot_stats_from_txn(id, data_snapshot)
3024                                        }
3025                                    };
3026                                    SnapshotStatsRes(fut)
3027                                }
3028                                None => SnapshotStatsRes(Box::pin(futures::future::ready(Err(
3029                                    StorageError::IdentifierMissing(id),
3030                                )))),
3031                            };
3032                            // It's fine if the listener hung up.
3033                            let _ = tx.send(res);
3034                        }
3035                    }
3036                }
3037                Some(holds_changes) = self.holds_rx.recv() => {
3038                    let mut batched_changes = BTreeMap::new();
3039                    batched_changes.insert(holds_changes.0, holds_changes.1);
3040
3041                    while let Ok(mut holds_changes) = self.holds_rx.try_recv() {
3042                        let entry = batched_changes.entry(holds_changes.0);
3043                        entry
3044                            .and_modify(|existing| existing.extend(holds_changes.1.drain()))
3045                            .or_insert_with(|| holds_changes.1);
3046                    }
3047
3048                    let mut collections = self.collections.lock().expect("lock poisoned");
3049
3050                    let user_changes = batched_changes
3051                        .iter()
3052                        .filter(|(id, _c)| id.is_user())
3053                        .map(|(id, c)| {
3054                            (id.clone(), c.clone())
3055                        })
3056                        .collect_vec();
3057
3058                    if !user_changes.is_empty() {
3059                        trace!(?user_changes, "applying holds changes from channel");
3060                    }
3061
3062                    StorageCollectionsImpl::update_read_capabilities_inner(
3063                        &self.cmds_tx,
3064                        &mut collections,
3065                        &mut batched_changes,
3066                    );
3067                }
3068            }
3069        }
3070
3071        warn!("BackgroundTask shutting down");
3072    }
3073
3074    #[instrument(level = "debug")]
3075    async fn update_write_frontiers(&self, updates: &[(GlobalId, &Antichain<T>)]) {
3076        let mut read_capability_changes = BTreeMap::default();
3077
3078        let mut self_collections = self.collections.lock().expect("lock poisoned");
3079
3080        for (id, new_upper) in updates.iter() {
3081            let collection = if let Some(c) = self_collections.get_mut(id) {
3082                c
3083            } else {
3084                trace!(
3085                    "Reference to absent collection {id}, due to concurrent removal of that collection"
3086                );
3087                continue;
3088            };
3089
3090            if PartialOrder::less_than(&collection.write_frontier, *new_upper) {
3091                collection.write_frontier.clone_from(new_upper);
3092            }
3093
3094            let mut new_read_capability = collection
3095                .read_policy
3096                .frontier(collection.write_frontier.borrow());
3097
3098            if id.is_user() {
3099                trace!(
3100                    %id,
3101                    implied_capability = ?collection.implied_capability,
3102                    policy = ?collection.read_policy,
3103                    write_frontier = ?collection.write_frontier,
3104                    ?new_read_capability,
3105                    "update_write_frontiers");
3106            }
3107
3108            if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
3109                let mut update = ChangeBatch::new();
3110                update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
3111                std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
3112                update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
3113
3114                if !update.is_empty() {
3115                    read_capability_changes.insert(*id, update);
3116                }
3117            }
3118        }
3119
3120        if !read_capability_changes.is_empty() {
3121            StorageCollectionsImpl::update_read_capabilities_inner(
3122                &self.cmds_tx,
3123                &mut self_collections,
3124                &mut read_capability_changes,
3125            );
3126        }
3127    }
3128
3129    async fn downgrade_sinces(&mut self, cmds: Vec<(GlobalId, Antichain<T>)>) {
3130        for (id, new_since) in cmds {
3131            let since_handle = if let Some(c) = self.since_handles.get_mut(&id) {
3132                c
3133            } else {
3134                // This can happen when someone concurrently drops a collection.
3135                trace!("downgrade_sinces: reference to absent collection {id}");
3136                continue;
3137            };
3138
3139            if id.is_user() {
3140                trace!("downgrading since of {} to {:?}", id, new_since);
3141            }
3142
3143            let epoch = since_handle.opaque().clone();
3144            let result = if new_since.is_empty() {
3145                // A shard's since reaching the empty frontier is a prereq for
3146                // being able to finalize a shard, so the final downgrade should
3147                // never be rate-limited.
3148                let res = Some(
3149                    since_handle
3150                        .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3151                        .await,
3152                );
3153
3154                info!(%id, "removing persist handles because the since advanced to []!");
3155
3156                let _since_handle = self.since_handles.remove(&id).expect("known to exist");
3157                let dropped_shard_id = if let Some(shard_id) = self.shard_by_id.remove(&id) {
3158                    shard_id
3159                } else {
3160                    panic!("missing GlobalId -> ShardId mapping for id {id}");
3161                };
3162
3163                // We're not responsible for writes to tables, so we also don't
3164                // de-register them from the txn system. Whoever is responsible
3165                // will remove them. We only make sure to remove the table from
3166                // our tracking.
3167                self.txns_shards.remove(&id);
3168
3169                if !self
3170                    .config
3171                    .lock()
3172                    .expect("lock poisoned")
3173                    .parameters
3174                    .finalize_shards
3175                {
3176                    info!(
3177                        "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3178                    );
3179                    return;
3180                }
3181
3182                info!(%id, %dropped_shard_id, "enqueing shard finalization due to dropped collection and dropped persist handle");
3183
3184                self.finalizable_shards.lock().insert(dropped_shard_id);
3185
3186                res
3187            } else {
3188                since_handle
3189                    .maybe_compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3190                    .await
3191            };
3192
3193            if let Some(Err(other_epoch)) = result {
3194                mz_ore::halt!("fenced by envd @ {other_epoch:?}. ours = {epoch:?}");
3195            }
3196        }
3197    }
3198}
3199
3200struct FinalizeShardsTaskConfig {
3201    envd_epoch: NonZeroI64,
3202    config: Arc<Mutex<StorageConfiguration>>,
3203    metrics: StorageCollectionsMetrics,
3204    finalizable_shards: Arc<ShardIdSet>,
3205    finalized_shards: Arc<ShardIdSet>,
3206    persist_location: PersistLocation,
3207    persist: Arc<PersistClientCache>,
3208    read_only: bool,
3209}
3210
3211async fn finalize_shards_task<T>(
3212    FinalizeShardsTaskConfig {
3213        envd_epoch,
3214        config,
3215        metrics,
3216        finalizable_shards,
3217        finalized_shards,
3218        persist_location,
3219        persist,
3220        read_only,
3221    }: FinalizeShardsTaskConfig,
3222) where
3223    T: TimelyTimestamp + TotalOrder + Lattice + Codec64 + Sync,
3224{
3225    if read_only {
3226        info!("disabling shard finalization in read only mode");
3227        return;
3228    }
3229
3230    let mut interval = tokio::time::interval(Duration::from_secs(5));
3231    interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
3232    loop {
3233        interval.tick().await;
3234
3235        if !config
3236            .lock()
3237            .expect("lock poisoned")
3238            .parameters
3239            .finalize_shards
3240        {
3241            debug!(
3242                "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3243            );
3244            continue;
3245        }
3246
3247        let current_finalizable_shards = {
3248            // We hold the lock for as short as possible and pull our cloned set
3249            // of shards.
3250            finalizable_shards.lock().iter().cloned().collect_vec()
3251        };
3252
3253        if current_finalizable_shards.is_empty() {
3254            debug!("no shards to finalize");
3255            continue;
3256        }
3257
3258        debug!(?current_finalizable_shards, "attempting to finalize shards");
3259
3260        // Open a persist client to delete unused shards.
3261        let persist_client = persist.open(persist_location.clone()).await.unwrap();
3262
3263        let metrics = &metrics;
3264        let finalizable_shards = &finalizable_shards;
3265        let finalized_shards = &finalized_shards;
3266        let persist_client = &persist_client;
3267        let diagnostics = &Diagnostics::from_purpose("finalizing shards");
3268
3269        let force_downgrade_since = STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION
3270            .get(config.lock().expect("lock poisoned").config_set());
3271
3272        let epoch = &PersistEpoch::from(envd_epoch);
3273
3274        futures::stream::iter(current_finalizable_shards.clone())
3275            .map(|shard_id| async move {
3276                let persist_client = persist_client.clone();
3277                let diagnostics = diagnostics.clone();
3278                let epoch = epoch.clone();
3279
3280                metrics.finalization_started.inc();
3281
3282                let is_finalized = persist_client
3283                    .is_finalized::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
3284                    .await
3285                    .expect("invalid persist usage");
3286
3287                if is_finalized {
3288                    debug!(%shard_id, "shard is already finalized!");
3289                    Some(shard_id)
3290                } else {
3291                    debug!(%shard_id, "finalizing shard");
3292                    let finalize = || async move {
3293                        // TODO: thread the global ID into the shard finalization WAL
3294                        let diagnostics = Diagnostics::from_purpose("finalizing shards");
3295
3296                        // We only use the writer to advance the upper, so using a dummy schema is
3297                        // fine.
3298                        let mut write_handle: WriteHandle<SourceData, (), T, StorageDiff> =
3299                            persist_client
3300                                .open_writer(
3301                                    shard_id,
3302                                    Arc::new(RelationDesc::empty()),
3303                                    Arc::new(UnitSchema),
3304                                    diagnostics,
3305                                )
3306                                .await
3307                                .expect("invalid persist usage");
3308                        write_handle.advance_upper(&Antichain::new()).await;
3309                        write_handle.expire().await;
3310
3311                        if force_downgrade_since {
3312                            let mut since_handle: SinceHandle<
3313                                SourceData,
3314                                (),
3315                                T,
3316                                StorageDiff,
3317                                PersistEpoch,
3318                            > = persist_client
3319                                .open_critical_since(
3320                                    shard_id,
3321                                    PersistClient::CONTROLLER_CRITICAL_SINCE,
3322                                    Diagnostics::from_purpose("finalizing shards"),
3323                                )
3324                                .await
3325                                .expect("invalid persist usage");
3326                            let handle_epoch = since_handle.opaque().clone();
3327                            let our_epoch = epoch.clone();
3328                            let epoch = if our_epoch.0 > handle_epoch.0 {
3329                                // We're newer, but it's fine to use the
3330                                // handle's old epoch to try and downgrade.
3331                                handle_epoch
3332                            } else {
3333                                // Good luck, buddy! The downgrade below will
3334                                // not succeed. There's a process with a newer
3335                                // epoch out there and someone at some juncture
3336                                // will fence out this process.
3337                                our_epoch
3338                            };
3339                            let new_since = Antichain::new();
3340                            let downgrade = since_handle
3341                                .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3342                                .await;
3343                            if let Err(e) = downgrade {
3344                                warn!("tried to finalize a shard with an advancing epoch: {e:?}");
3345                                return Ok(());
3346                            }
3347                            // Not available now, so finalization is broken.
3348                            // since_handle.expire().await;
3349                        }
3350
3351                        persist_client
3352                            .finalize_shard::<SourceData, (), T, StorageDiff>(
3353                                shard_id,
3354                                Diagnostics::from_purpose("finalizing shards"),
3355                            )
3356                            .await
3357                    };
3358
3359                    match finalize().await {
3360                        Err(e) => {
3361                            // Rather than error, just leave this shard as
3362                            // one to finalize later.
3363                            warn!("error during finalization of shard {shard_id}: {e:?}");
3364                            None
3365                        }
3366                        Ok(()) => {
3367                            debug!(%shard_id, "finalize success!");
3368                            Some(shard_id)
3369                        }
3370                    }
3371                }
3372            })
3373            // Poll each future for each collection concurrently, maximum of 10
3374            // at a time.
3375            // TODO(benesch): the concurrency here should be configurable
3376            // via LaunchDarkly.
3377            .buffer_unordered(10)
3378            // HERE BE DRAGONS: see warning on other uses of buffer_unordered.
3379            // The closure passed to `for_each` must remain fast or we risk
3380            // starving the finalization futures of calls to `poll`.
3381            .for_each(|shard_id| async move {
3382                match shard_id {
3383                    None => metrics.finalization_failed.inc(),
3384                    Some(shard_id) => {
3385                        // We make successfully finalized shards available for
3386                        // removal from the finalization WAL one by one, so that
3387                        // a handful of stuck shards don't prevent us from
3388                        // removing the shards that have made progress. The
3389                        // overhead of repeatedly acquiring and releasing the
3390                        // locks is negligible.
3391                        {
3392                            let mut finalizable_shards = finalizable_shards.lock();
3393                            let mut finalized_shards = finalized_shards.lock();
3394                            finalizable_shards.remove(&shard_id);
3395                            finalized_shards.insert(shard_id);
3396                        }
3397
3398                        metrics.finalization_succeeded.inc();
3399                    }
3400                }
3401            })
3402            .await;
3403
3404        debug!("done finalizing shards");
3405    }
3406}
3407
3408#[derive(Debug)]
3409pub(crate) enum SnapshotStatsAsOf<T: TimelyTimestamp + Lattice + Codec64> {
3410    /// Stats for a shard with an "eager" upper (one that continually advances
3411    /// as time passes, even if no writes are coming in).
3412    Direct(Antichain<T>),
3413    /// Stats for a shard with a "lazy" upper (one that only physically advances
3414    /// in response to writes).
3415    Txns(DataSnapshot<T>),
3416}
3417
3418#[cfg(test)]
3419mod tests {
3420    use std::str::FromStr;
3421    use std::sync::Arc;
3422
3423    use mz_build_info::DUMMY_BUILD_INFO;
3424    use mz_dyncfg::ConfigSet;
3425    use mz_ore::assert_err;
3426    use mz_ore::metrics::{MetricsRegistry, UIntGauge};
3427    use mz_ore::now::SYSTEM_TIME;
3428    use mz_ore::url::SensitiveUrl;
3429    use mz_persist_client::cache::PersistClientCache;
3430    use mz_persist_client::cfg::PersistConfig;
3431    use mz_persist_client::rpc::PubSubClientConnection;
3432    use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
3433    use mz_persist_types::codec_impls::UnitSchema;
3434    use mz_repr::{RelationDesc, Row};
3435    use mz_secrets::InMemorySecretsController;
3436
3437    use super::*;
3438
3439    #[mz_ore::test(tokio::test)]
3440    #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr`
3441    async fn test_snapshot_stats(&self) {
3442        let persist_location = PersistLocation {
3443            blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3444            consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3445        };
3446        let persist_client = PersistClientCache::new(
3447            PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
3448            &MetricsRegistry::new(),
3449            |_, _| PubSubClientConnection::noop(),
3450        );
3451        let persist_client = Arc::new(persist_client);
3452
3453        let (cmds_tx, mut background_task) =
3454            BackgroundTask::new_for_test(persist_location.clone(), Arc::clone(&persist_client));
3455        let background_task =
3456            mz_ore::task::spawn(|| "storage_collections::background_task", async move {
3457                background_task.run().await
3458            });
3459
3460        let persist = persist_client.open(persist_location).await.unwrap();
3461
3462        let shard_id = ShardId::new();
3463        let since_handle = persist
3464            .open_critical_since(
3465                shard_id,
3466                PersistClient::CONTROLLER_CRITICAL_SINCE,
3467                Diagnostics::for_tests(),
3468            )
3469            .await
3470            .unwrap();
3471        let write_handle = persist
3472            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3473                shard_id,
3474                Arc::new(RelationDesc::empty()),
3475                Arc::new(UnitSchema),
3476                Diagnostics::for_tests(),
3477            )
3478            .await
3479            .unwrap();
3480
3481        cmds_tx
3482            .send(BackgroundCmd::Register {
3483                id: GlobalId::User(1),
3484                is_in_txns: false,
3485                since_handle: SinceHandleWrapper::Critical(since_handle),
3486                write_handle,
3487            })
3488            .unwrap();
3489
3490        let mut write_handle = persist
3491            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3492                shard_id,
3493                Arc::new(RelationDesc::empty()),
3494                Arc::new(UnitSchema),
3495                Diagnostics::for_tests(),
3496            )
3497            .await
3498            .unwrap();
3499
3500        // No stats for unknown GlobalId.
3501        let stats =
3502            snapshot_stats(&cmds_tx, GlobalId::User(2), Antichain::from_elem(0.into())).await;
3503        assert_err!(stats);
3504
3505        // Stats don't resolve for as_of past the upper.
3506        let stats_fut = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3507        assert_none!(stats_fut.now_or_never());
3508
3509        // // Call it again because now_or_never consumed our future and it's not clone-able.
3510        let stats_ts1_fut =
3511            snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3512
3513        // Write some data.
3514        let data = (
3515            (SourceData(Ok(Row::default())), ()),
3516            mz_repr::Timestamp::from(0),
3517            1i64,
3518        );
3519        let () = write_handle
3520            .compare_and_append(
3521                &[data],
3522                Antichain::from_elem(0.into()),
3523                Antichain::from_elem(1.into()),
3524            )
3525            .await
3526            .unwrap()
3527            .unwrap();
3528
3529        // Verify that we can resolve stats for ts 0 while the ts 1 stats call is outstanding.
3530        let stats = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(0.into()))
3531            .await
3532            .unwrap();
3533        assert_eq!(stats.num_updates, 1);
3534
3535        // Write more data and unblock the ts 1 call
3536        let data = (
3537            (SourceData(Ok(Row::default())), ()),
3538            mz_repr::Timestamp::from(1),
3539            1i64,
3540        );
3541        let () = write_handle
3542            .compare_and_append(
3543                &[data],
3544                Antichain::from_elem(1.into()),
3545                Antichain::from_elem(2.into()),
3546            )
3547            .await
3548            .unwrap()
3549            .unwrap();
3550
3551        let stats = stats_ts1_fut.await.unwrap();
3552        assert_eq!(stats.num_updates, 2);
3553
3554        // Make sure it runs until at least here.
3555        drop(background_task);
3556    }
3557
3558    async fn snapshot_stats<T: TimelyTimestamp + Lattice + Codec64>(
3559        cmds_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
3560        id: GlobalId,
3561        as_of: Antichain<T>,
3562    ) -> Result<SnapshotStats, StorageError<T>> {
3563        let (tx, rx) = oneshot::channel();
3564        cmds_tx
3565            .send(BackgroundCmd::SnapshotStats(
3566                id,
3567                SnapshotStatsAsOf::Direct(as_of),
3568                tx,
3569            ))
3570            .unwrap();
3571        let res = rx.await.expect("BackgroundTask should be live").0;
3572
3573        res.await
3574    }
3575
3576    impl<T: TimelyTimestamp + Lattice + Codec64> BackgroundTask<T> {
3577        fn new_for_test(
3578            _persist_location: PersistLocation,
3579            _persist_client: Arc<PersistClientCache>,
3580        ) -> (mpsc::UnboundedSender<BackgroundCmd<T>>, Self) {
3581            let (cmds_tx, cmds_rx) = mpsc::unbounded_channel();
3582            let (_holds_tx, holds_rx) = mpsc::unbounded_channel();
3583            let connection_context =
3584                ConnectionContext::for_tests(Arc::new(InMemorySecretsController::new()));
3585
3586            let task = Self {
3587                config: Arc::new(Mutex::new(StorageConfiguration::new(
3588                    connection_context,
3589                    ConfigSet::default(),
3590                ))),
3591                cmds_tx: cmds_tx.clone(),
3592                cmds_rx,
3593                holds_rx,
3594                finalizable_shards: Arc::new(ShardIdSet::new(
3595                    UIntGauge::new("finalizable_shards", "dummy gauge for tests").unwrap(),
3596                )),
3597                collections: Arc::new(Mutex::new(BTreeMap::new())),
3598                shard_by_id: BTreeMap::new(),
3599                since_handles: BTreeMap::new(),
3600                txns_handle: None,
3601                txns_shards: BTreeSet::new(),
3602            };
3603
3604            (cmds_tx, task)
3605        }
3606    }
3607}