Skip to main content

mz_storage_client/
storage_collections.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An abstraction for dealing with storage collections.
11
12use std::cmp::Reverse;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::iter;
16use std::num::NonZeroI64;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::future::BoxFuture;
23use futures::stream::{BoxStream, FuturesUnordered};
24use futures::{Future, FutureExt, StreamExt};
25use itertools::Itertools;
26use mz_ore::collections::CollectionExt;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::now::NowFn;
29use mz_ore::task::AbortOnDropHandle;
30use mz_ore::{assert_none, instrument, soft_assert_or_log};
31use mz_persist_client::cache::PersistClientCache;
32use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
33use mz_persist_client::critical::{Opaque, SinceHandle};
34use mz_persist_client::read::{Cursor, ReadHandle};
35use mz_persist_client::schema::CaESchema;
36use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
37use mz_persist_client::write::WriteHandle;
38use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
39use mz_persist_types::codec_impls::UnitSchema;
40use mz_persist_types::txn::TxnsCodec;
41use mz_repr::{GlobalId, RelationDesc, RelationVersion, Row, Timestamp};
42use mz_storage_types::StorageDiff;
43use mz_storage_types::configuration::StorageConfiguration;
44use mz_storage_types::connections::ConnectionContext;
45use mz_storage_types::controller::{CollectionMetadata, StorageError, TxnsCodecRow};
46use mz_storage_types::dyncfgs::STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION;
47use mz_storage_types::errors::CollectionMissing;
48use mz_storage_types::parameters::StorageParameters;
49use mz_storage_types::read_holds::ReadHold;
50use mz_storage_types::read_policy::ReadPolicy;
51use mz_storage_types::sources::{GenericSourceConnection, SourceData, SourceEnvelope, Timeline};
52use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
53use mz_txn_wal::metrics::Metrics as TxnMetrics;
54use mz_txn_wal::txn_read::{DataSnapshot, TxnsRead};
55use mz_txn_wal::txns::TxnsHandle;
56use timely::PartialOrder;
57use timely::progress::frontier::MutableAntichain;
58use timely::progress::{Antichain, ChangeBatch};
59use tokio::sync::{mpsc, oneshot};
60use tokio::time::MissedTickBehavior;
61use tracing::{debug, info, trace, warn};
62
63use crate::client::TimestamplessUpdateBuilder;
64use crate::controller::{
65    CollectionDescription, DataSource, PersistEpoch, StorageMetadata, StorageTxn,
66};
67use crate::storage_collections::metrics::{ShardIdSet, StorageCollectionsMetrics};
68
69mod metrics;
70
71/// An abstraction for keeping track of storage collections and managing access
72/// to them.
73///
74/// Responsibilities:
75///
76/// - Keeps a critical persist handle for holding the since of collections
77///   where it need to be.
78///
79/// - Drives the since forward based on the upper of a collection and a
80///   [ReadPolicy].
81///
82/// - Hands out [ReadHolds](ReadHold) that prevent a collection's since from
83/// advancing while it needs to be read at a specific time.
84#[async_trait]
85pub trait StorageCollections: Debug + Sync {
86    /// On boot, reconcile this [StorageCollections] with outside state. We get
87    /// a [StorageTxn] where we can record any durable state that we need.
88    ///
89    /// We get `init_ids`, which tells us about all collections that currently
90    /// exist, so that we can record durable state for those that _we_ don't
91    /// know yet about.
92    async fn initialize_state(
93        &self,
94        txn: &mut (dyn StorageTxn + Send),
95        init_ids: BTreeSet<GlobalId>,
96    ) -> Result<(), StorageError>;
97
98    /// Update storage configuration with new parameters.
99    fn update_parameters(&self, config_params: StorageParameters);
100
101    /// Returns the [CollectionMetadata] of the collection identified by `id`.
102    fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing>;
103
104    /// Acquire an iterator over [CollectionMetadata] for all active
105    /// collections.
106    ///
107    /// A collection is "active" when it has a non empty frontier of read
108    /// capabilties.
109    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
110
111    /// Returns the frontiers of the identified collection.
112    fn collection_frontiers(&self, id: GlobalId) -> Result<CollectionFrontiers, CollectionMissing> {
113        let frontiers = self
114            .collections_frontiers(vec![id])?
115            .expect_element(|| "known to exist");
116
117        Ok(frontiers)
118    }
119
120    /// Atomically gets and returns the frontiers of all the identified
121    /// collections.
122    fn collections_frontiers(
123        &self,
124        id: Vec<GlobalId>,
125    ) -> Result<Vec<CollectionFrontiers>, CollectionMissing>;
126
127    /// Atomically gets and returns the frontiers of all active collections.
128    ///
129    /// A collection is "active" when it has a non-empty frontier of read
130    /// capabilities.
131    fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers>;
132
133    /// Checks whether a collection exists under the given `GlobalId`. Returns
134    /// an error if the collection does not exist.
135    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError>;
136
137    /// Returns aggregate statistics about the contents of the local input named
138    /// `id` at `as_of`.
139    async fn snapshot_stats(
140        &self,
141        id: GlobalId,
142        as_of: Antichain<Timestamp>,
143    ) -> Result<SnapshotStats, StorageError>;
144
145    /// Returns aggregate statistics about the contents of the local input named
146    /// `id` at `as_of`.
147    ///
148    /// Note that this async function itself returns a future. We may
149    /// need to block on the stats being available, but don't want to hold a reference
150    /// to the controller for too long... so the outer future holds a reference to the
151    /// controller but returns quickly, and the inner future is slow but does not
152    /// reference the controller.
153    async fn snapshot_parts_stats(
154        &self,
155        id: GlobalId,
156        as_of: Antichain<Timestamp>,
157    ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError>>;
158
159    /// Returns a snapshot of the contents of collection `id` at `as_of`.
160    fn snapshot(
161        &self,
162        id: GlobalId,
163        as_of: Timestamp,
164    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError>>;
165
166    /// Returns a snapshot of the contents of collection `id` at the largest
167    /// readable `as_of`.
168    async fn snapshot_latest(&self, id: GlobalId) -> Result<Vec<Row>, StorageError>;
169
170    /// Returns a snapshot of the contents of collection `id` at `as_of`.
171    fn snapshot_cursor(
172        &self,
173        id: GlobalId,
174        as_of: Timestamp,
175    ) -> BoxFuture<'static, Result<SnapshotCursor, StorageError>>;
176
177    /// Generates a snapshot of the contents of collection `id` at `as_of` and
178    /// streams out all of the updates in bounded memory.
179    ///
180    /// The output is __not__ consolidated.
181    fn snapshot_and_stream(
182        &self,
183        id: GlobalId,
184        as_of: Timestamp,
185    ) -> BoxFuture<
186        'static,
187        Result<BoxStream<'static, (SourceData, Timestamp, StorageDiff)>, StorageError>,
188    >;
189
190    /// Create a [`TimestamplessUpdateBuilder`] that can be used to stage
191    /// updates for the provided [`GlobalId`].
192    fn create_update_builder(
193        &self,
194        id: GlobalId,
195    ) -> BoxFuture<
196        'static,
197        Result<TimestamplessUpdateBuilder<SourceData, (), StorageDiff>, StorageError>,
198    >;
199
200    /// Update the given [`StorageTxn`] with the appropriate metadata given the
201    /// IDs to add and drop.
202    ///
203    /// The data modified in the `StorageTxn` must be made available in all
204    /// subsequent calls that require [`StorageMetadata`] as a parameter.
205    async fn prepare_state(
206        &self,
207        txn: &mut (dyn StorageTxn + Send),
208        ids_to_add: BTreeSet<GlobalId>,
209        ids_to_drop: BTreeSet<GlobalId>,
210        ids_to_register: BTreeMap<GlobalId, ShardId>,
211    ) -> Result<(), StorageError>;
212
213    /// Create the collections described by the individual
214    /// [CollectionDescriptions](CollectionDescription).
215    ///
216    /// Each command carries the source id, the source description, and any
217    /// associated metadata needed to ingest the particular source.
218    ///
219    /// This command installs collection state for the indicated sources, and
220    /// they are now valid to use in queries at times beyond the initial `since`
221    /// frontiers. Each collection also acquires a read capability at this
222    /// frontier, which will need to be repeatedly downgraded with
223    /// `allow_compaction()` to permit compaction.
224    ///
225    /// This method is NOT idempotent; It can fail between processing of
226    /// different collections and leave the [StorageCollections] in an
227    /// inconsistent state. It is almost always wrong to do anything but abort
228    /// the process on `Err`.
229    ///
230    /// The `register_ts` is used as the initial timestamp that tables are
231    /// available for reads. (We might later give non-tables the same treatment,
232    /// but hold off on that initially.) Callers must provide a Some if any of
233    /// the collections is a table. A None may be given if none of the
234    /// collections are a table (i.e. all materialized views, sources, etc).
235    ///
236    /// `migrated_storage_collections` is a set of migrated storage collections to be excluded
237    /// from the txn-wal sub-system.
238    async fn create_collections_for_bootstrap(
239        &self,
240        storage_metadata: &StorageMetadata,
241        register_ts: Option<Timestamp>,
242        collections: Vec<(GlobalId, CollectionDescription)>,
243        migrated_storage_collections: &BTreeSet<GlobalId>,
244    ) -> Result<(), StorageError>;
245
246    /// Updates the [`RelationDesc`] for the specified table.
247    async fn alter_table_desc(
248        &self,
249        existing_collection: GlobalId,
250        new_collection: GlobalId,
251        new_desc: RelationDesc,
252        expected_version: RelationVersion,
253    ) -> Result<(), StorageError>;
254
255    /// Drops the read capability for the sources and allows their resources to
256    /// be reclaimed.
257    ///
258    /// TODO(jkosh44): This method does not validate the provided identifiers.
259    /// Currently when the controller starts/restarts it has no durable state.
260    /// That means that it has no way of remembering any past commands sent. In
261    /// the future we plan on persisting state for the controller so that it is
262    /// aware of past commands. Therefore this method is for dropping sources
263    /// that we know to have been previously created, but have been forgotten by
264    /// the controller due to a restart. Once command history becomes durable we
265    /// can remove this method and use the normal `drop_sources`.
266    fn drop_collections_unvalidated(
267        &self,
268        storage_metadata: &StorageMetadata,
269        identifiers: Vec<GlobalId>,
270    );
271
272    /// Assigns a read policy to specific identifiers.
273    ///
274    /// The policies are assigned in the order presented, and repeated
275    /// identifiers should conclude with the last policy. Changing a policy will
276    /// immediately downgrade the read capability if appropriate, but it will
277    /// not "recover" the read capability if the prior capability is already
278    /// ahead of it.
279    ///
280    /// This [StorageCollections] may include its own overrides on these
281    /// policies.
282    ///
283    /// Identifiers not present in `policies` retain their existing read
284    /// policies.
285    fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy)>);
286
287    /// Acquires and returns the earliest possible read holds for the specified
288    /// collections.
289    fn acquire_read_holds(
290        &self,
291        desired_holds: Vec<GlobalId>,
292    ) -> Result<Vec<ReadHold>, CollectionMissing>;
293
294    /// Get the time dependence for a storage collection. Returns no value if unknown or if
295    /// the object isn't managed by storage.
296    fn determine_time_dependence(
297        &self,
298        id: GlobalId,
299    ) -> Result<Option<TimeDependence>, TimeDependenceError>;
300
301    /// Returns the state of [`StorageCollections`] formatted as JSON.
302    fn dump(&self) -> Result<serde_json::Value, anyhow::Error>;
303}
304
305/// A cursor over a snapshot, allowing us to read just part of a snapshot in its
306/// consolidated form.
307pub struct SnapshotCursor {
308    // We allocate a temporary read handle for each snapshot, and that handle needs to live at
309    // least as long as the cursor itself, which holds part leases. Bundling them together!
310    pub _read_handle: ReadHandle<SourceData, (), Timestamp, StorageDiff>,
311    pub cursor: Cursor<SourceData, (), Timestamp, StorageDiff>,
312}
313
314impl SnapshotCursor {
315    pub async fn next(
316        &mut self,
317    ) -> Option<impl Iterator<Item = (SourceData, Timestamp, StorageDiff)> + Sized + '_> {
318        let iter = self.cursor.next().await?;
319        Some(iter.map(|((k, ()), t, d)| (k, t, d)))
320    }
321}
322
323/// Frontiers of the collection identified by `id`.
324#[derive(Debug)]
325pub struct CollectionFrontiers {
326    /// The [GlobalId] of the collection that these frontiers belong to.
327    pub id: GlobalId,
328
329    /// The upper/write frontier of the collection.
330    pub write_frontier: Antichain<Timestamp>,
331
332    /// The since frontier that is implied by the collection's existence,
333    /// disregarding any read holds.
334    ///
335    /// Concretely, it is the since frontier that is implied by the combination
336    /// of the `write_frontier` and a [ReadPolicy]. The implied capability is
337    /// derived from the write frontier using the [ReadPolicy].
338    pub implied_capability: Antichain<Timestamp>,
339
340    /// The frontier of all oustanding [ReadHolds](ReadHold). This includes the
341    /// implied capability.
342    pub read_capabilities: Antichain<Timestamp>,
343}
344
345/// Implementation of [StorageCollections] that is shallow-cloneable and uses a
346/// background task for doing work concurrently, in the background.
347#[derive(Debug, Clone)]
348pub struct StorageCollectionsImpl {
349    /// The fencing token for this instance of [StorageCollections], and really
350    /// all of the controllers and Coordinator.
351    envd_epoch: NonZeroI64,
352
353    /// Whether or not this [StorageCollections] is in read-only mode.
354    ///
355    /// When in read-only mode, we are not allowed to affect changes to external
356    /// systems, including, for example, acquiring and downgrading critical
357    /// [SinceHandles](SinceHandle)
358    read_only: bool,
359
360    /// The set of [ShardIds](ShardId) that we have to finalize. These will have
361    /// been persisted by the caller of [StorageCollections::prepare_state].
362    finalizable_shards: Arc<ShardIdSet>,
363
364    /// The set of [ShardIds](ShardId) that we have finalized. We keep track of
365    /// shards here until we are given a chance to let our callers know that
366    /// these have been finalized, for example via
367    /// [StorageCollections::prepare_state].
368    finalized_shards: Arc<ShardIdSet>,
369
370    /// Collections maintained by this [StorageCollections].
371    collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState>>>,
372
373    /// A shared TxnsCache running in a task and communicated with over a channel.
374    txns_read: TxnsRead<Timestamp>,
375
376    /// Storage configuration parameters.
377    config: Arc<Mutex<StorageConfiguration>>,
378
379    /// The upper of the txn shard as it was when we booted. We forward the
380    /// upper of created/registered tables to make sure that their uppers are
381    /// not less than the initially known txn upper.
382    ///
383    /// NOTE: This works around a quirk in how the adapter chooses the as_of of
384    /// existing indexes when bootstrapping, where tables that have an upper
385    /// that is less than the initially known txn upper can lead to indexes that
386    /// cannot hydrate in read-only mode.
387    initial_txn_upper: Antichain<Timestamp>,
388
389    /// The persist location where all storage collections are being written to
390    persist_location: PersistLocation,
391
392    /// A persist client used to write to storage collections
393    persist: Arc<PersistClientCache>,
394
395    /// For sending commands to our internal task.
396    cmd_tx: mpsc::UnboundedSender<BackgroundCmd>,
397
398    /// For sending updates about read holds to our internal task.
399    holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<Timestamp>)>,
400
401    /// Handles to tasks we own, making sure they're dropped when we are.
402    _background_task: Arc<AbortOnDropHandle<()>>,
403    _finalize_shards_task: Arc<AbortOnDropHandle<()>>,
404}
405
406// Supporting methods for implementing [StorageCollections].
407//
408// Almost all internal methods that are the backing implementation for a trait
409// method have the `_inner` suffix.
410//
411// We follow a pattern where `_inner` methods get a mutable reference to the
412// shared collections state, and it's the public-facing method that locks the
413/// A boxed stream of source data with timestamps and diffs.
414type SourceDataStream = BoxStream<'static, (SourceData, Timestamp, StorageDiff)>;
415
416// state for the duration of its invocation. This allows calling other `_inner`
417// methods from within `_inner` methods.
418impl StorageCollectionsImpl {
419    /// Creates and returns a new [StorageCollections].
420    ///
421    /// Note that when creating a new [StorageCollections], you must also
422    /// reconcile it with the previous state using
423    /// [StorageCollections::initialize_state],
424    /// [StorageCollections::prepare_state], and
425    /// [StorageCollections::create_collections_for_bootstrap].
426    pub async fn new(
427        persist_location: PersistLocation,
428        persist_clients: Arc<PersistClientCache>,
429        metrics_registry: &MetricsRegistry,
430        _now: NowFn,
431        txns_metrics: Arc<TxnMetrics>,
432        envd_epoch: NonZeroI64,
433        read_only: bool,
434        connection_context: ConnectionContext,
435        txn: &dyn StorageTxn,
436    ) -> Self {
437        let metrics = StorageCollectionsMetrics::register_into(metrics_registry);
438
439        // This value must be already installed because we must ensure it's
440        // durably recorded before it is used, otherwise we risk leaking persist
441        // state.
442        let txns_id = txn
443            .get_txn_wal_shard()
444            .expect("must call prepare initialization before creating StorageCollections");
445
446        let txns_client = persist_clients
447            .open(persist_location.clone())
448            .await
449            .expect("location should be valid");
450
451        // We have to initialize, so that TxnsRead::start() below does not
452        // block.
453        let _txns_handle: TxnsHandle<SourceData, (), Timestamp, StorageDiff, TxnsCodecRow> =
454            TxnsHandle::open(
455                Timestamp::MIN,
456                txns_client.clone(),
457                txns_client.dyncfgs().clone(),
458                Arc::clone(&txns_metrics),
459                txns_id,
460                Opaque::encode(&PersistEpoch::default()),
461            )
462            .await;
463
464        // For handing to the background task, for listening to upper updates.
465        let (txns_key_schema, txns_val_schema) = TxnsCodecRow::schemas();
466        let mut txns_write = txns_client
467            .open_writer(
468                txns_id,
469                Arc::new(txns_key_schema),
470                Arc::new(txns_val_schema),
471                Diagnostics {
472                    shard_name: "txns".to_owned(),
473                    handle_purpose: "commit txns".to_owned(),
474                },
475            )
476            .await
477            .expect("txns schema shouldn't change");
478
479        let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
480
481        let collections = Arc::new(std::sync::Mutex::new(BTreeMap::default()));
482        let finalizable_shards =
483            Arc::new(ShardIdSet::new(metrics.finalization_outstanding.clone()));
484        let finalized_shards =
485            Arc::new(ShardIdSet::new(metrics.finalization_pending_commit.clone()));
486        let config = Arc::new(Mutex::new(StorageConfiguration::new(
487            connection_context,
488            mz_dyncfgs::all_dyncfgs(),
489        )));
490
491        let initial_txn_upper = txns_write.fetch_recent_upper().await.to_owned();
492
493        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
494        let (holds_tx, holds_rx) = mpsc::unbounded_channel();
495        let mut background_task = BackgroundTask {
496            config: Arc::clone(&config),
497            cmds_tx: cmd_tx.clone(),
498            cmds_rx: cmd_rx,
499            holds_rx,
500            collections: Arc::clone(&collections),
501            finalizable_shards: Arc::clone(&finalizable_shards),
502            shard_by_id: BTreeMap::new(),
503            since_handles: BTreeMap::new(),
504            txns_handle: Some(txns_write),
505            txns_shards: Default::default(),
506        };
507
508        let background_task =
509            mz_ore::task::spawn(|| "storage_collections::background_task", async move {
510                background_task.run().await
511            });
512
513        let finalize_shards_task = mz_ore::task::spawn(
514            || "storage_collections::finalize_shards_task",
515            finalize_shards_task(FinalizeShardsTaskConfig {
516                envd_epoch: envd_epoch.clone(),
517                config: Arc::clone(&config),
518                metrics,
519                finalizable_shards: Arc::clone(&finalizable_shards),
520                finalized_shards: Arc::clone(&finalized_shards),
521                persist_location: persist_location.clone(),
522                persist: Arc::clone(&persist_clients),
523                read_only,
524            }),
525        );
526
527        Self {
528            finalizable_shards,
529            finalized_shards,
530            collections,
531            txns_read,
532            envd_epoch,
533            read_only,
534            config,
535            initial_txn_upper,
536            persist_location,
537            persist: persist_clients,
538            cmd_tx,
539            holds_tx,
540            _background_task: Arc::new(background_task.abort_on_drop()),
541            _finalize_shards_task: Arc::new(finalize_shards_task.abort_on_drop()),
542        }
543    }
544
545    /// Opens a [WriteHandle] and a [SinceHandleWrapper], for holding back the since.
546    ///
547    /// `since` is an optional since that the read handle will be forwarded to
548    /// if it is less than its current since.
549    ///
550    /// This will `halt!` the process if we cannot successfully acquire a
551    /// critical handle with our current epoch.
552    async fn open_data_handles(
553        &self,
554        id: &GlobalId,
555        shard: ShardId,
556        since: Option<&Antichain<Timestamp>>,
557        relation_desc: RelationDesc,
558        persist_client: &PersistClient,
559    ) -> (
560        WriteHandle<SourceData, (), Timestamp, StorageDiff>,
561        SinceHandleWrapper,
562    ) {
563        let since_handle = if self.read_only {
564            let read_handle = self
565                .open_leased_handle(id, shard, relation_desc.clone(), since, persist_client)
566                .await;
567            SinceHandleWrapper::Leased(read_handle)
568        } else {
569            // We're managing the data for this shard in read-write mode, which would fence out other
570            // processes in read-only mode; it's safe to upgrade the metadata version.
571            persist_client
572                .upgrade_version::<SourceData, (), Timestamp, StorageDiff>(
573                    shard,
574                    Diagnostics {
575                        shard_name: id.to_string(),
576                        handle_purpose: format!("controller data for {}", id),
577                    },
578                )
579                .await
580                .expect("invalid persist usage");
581
582            let since_handle = self
583                .open_critical_handle(id, shard, since, persist_client)
584                .await;
585
586            SinceHandleWrapper::Critical(since_handle)
587        };
588
589        let mut write_handle = self
590            .open_write_handle(id, shard, relation_desc, persist_client)
591            .await;
592
593        // N.B.
594        // Fetch the most recent upper for the write handle. Otherwise, this may
595        // be behind the since of the since handle. Its vital this happens AFTER
596        // we create the since handle as it needs to be linearized with that
597        // operation. It may be true that creating the write handle after the
598        // since handle already ensures this, but we do this out of an abundance
599        // of caution.
600        //
601        // Note that this returns the upper, but also sets it on the handle to
602        // be fetched later.
603        write_handle.fetch_recent_upper().await;
604
605        (write_handle, since_handle)
606    }
607
608    /// Opens a write handle for the given `shard`.
609    async fn open_write_handle(
610        &self,
611        id: &GlobalId,
612        shard: ShardId,
613        relation_desc: RelationDesc,
614        persist_client: &PersistClient,
615    ) -> WriteHandle<SourceData, (), Timestamp, StorageDiff> {
616        let diagnostics = Diagnostics {
617            shard_name: id.to_string(),
618            handle_purpose: format!("controller data for {}", id),
619        };
620
621        let write = persist_client
622            .open_writer(
623                shard,
624                Arc::new(relation_desc),
625                Arc::new(UnitSchema),
626                diagnostics.clone(),
627            )
628            .await
629            .expect("invalid persist usage");
630
631        write
632    }
633
634    /// Opens a critical since handle for the given `shard`.
635    ///
636    /// `since` is an optional since that the read handle will be forwarded to
637    /// if it is less than its current since.
638    ///
639    /// This will `halt!` the process if we cannot successfully acquire a
640    /// critical handle with our current epoch.
641    async fn open_critical_handle(
642        &self,
643        id: &GlobalId,
644        shard: ShardId,
645        since: Option<&Antichain<Timestamp>>,
646        persist_client: &PersistClient,
647    ) -> SinceHandle<SourceData, (), Timestamp, StorageDiff> {
648        tracing::debug!(%id, ?since, "opening critical handle");
649
650        assert!(
651            !self.read_only,
652            "attempting to open critical SinceHandle in read-only mode"
653        );
654
655        let diagnostics = Diagnostics {
656            shard_name: id.to_string(),
657            handle_purpose: format!("controller data for {}", id),
658        };
659
660        // Construct the handle in a separate block to ensure all error paths
661        // are diverging
662        let since_handle = {
663            // This block's aim is to ensure the handle is in terms of our epoch
664            // by the time we return it.
665            let mut handle = persist_client
666                .open_critical_since(
667                    shard,
668                    PersistClient::CONTROLLER_CRITICAL_SINCE,
669                    Opaque::encode(&PersistEpoch::default()),
670                    diagnostics.clone(),
671                )
672                .await
673                .expect("invalid persist usage");
674
675            // Take the join of the handle's since and the provided `since`;
676            // this lets materialized views express the since at which their
677            // read handles "start."
678            let provided_since = match since {
679                Some(since) => since,
680                None => &Antichain::from_elem(Timestamp::MIN),
681            };
682            let since = handle.since().join(provided_since);
683
684            let our_epoch = self.envd_epoch;
685
686            loop {
687                let current_epoch: PersistEpoch = handle.opaque().decode();
688
689                // Ensure the current epoch is <= our epoch.
690                let unchecked_success = current_epoch.0.map(|e| e <= our_epoch).unwrap_or(true);
691
692                if unchecked_success {
693                    // Update the handle's state so that it is in terms of our
694                    // epoch.
695                    let checked_success = handle
696                        .compare_and_downgrade_since(
697                            &Opaque::encode(&current_epoch),
698                            (&Opaque::encode(&PersistEpoch::from(our_epoch)), &since),
699                        )
700                        .await
701                        .is_ok();
702                    if checked_success {
703                        break handle;
704                    }
705                } else {
706                    mz_ore::halt!("fenced by envd @ {current_epoch:?}. ours = {our_epoch}");
707                }
708            }
709        };
710
711        since_handle
712    }
713
714    /// Opens a leased [ReadHandle], for the purpose of holding back a since,
715    /// for the given `shard`.
716    ///
717    /// `since` is an optional since that the read handle will be forwarded to
718    /// if it is less than its current since.
719    async fn open_leased_handle(
720        &self,
721        id: &GlobalId,
722        shard: ShardId,
723        relation_desc: RelationDesc,
724        since: Option<&Antichain<Timestamp>>,
725        persist_client: &PersistClient,
726    ) -> ReadHandle<SourceData, (), Timestamp, StorageDiff> {
727        tracing::debug!(%id, ?since, "opening leased handle");
728
729        let diagnostics = Diagnostics {
730            shard_name: id.to_string(),
731            handle_purpose: format!("controller data for {}", id),
732        };
733
734        let use_critical_since = false;
735        let mut handle: ReadHandle<_, _, _, _> = persist_client
736            .open_leased_reader(
737                shard,
738                Arc::new(relation_desc),
739                Arc::new(UnitSchema),
740                diagnostics.clone(),
741                use_critical_since,
742            )
743            .await
744            .expect("invalid persist usage");
745
746        // Take the join of the handle's since and the provided `since`;
747        // this lets materialized views express the since at which their
748        // read handles "start."
749        let provided_since = match since {
750            Some(since) => since,
751            None => &Antichain::from_elem(Timestamp::MIN),
752        };
753        let since = handle.since().join(provided_since);
754
755        handle.downgrade_since(&since).await;
756
757        handle
758    }
759
760    fn register_handles(
761        &self,
762        id: GlobalId,
763        is_in_txns: bool,
764        since_handle: SinceHandleWrapper,
765        write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
766    ) {
767        self.send(BackgroundCmd::Register {
768            id,
769            is_in_txns,
770            since_handle,
771            write_handle,
772        });
773    }
774
775    fn send(&self, cmd: BackgroundCmd) {
776        let _ = self.cmd_tx.send(cmd);
777    }
778
779    async fn snapshot_stats_inner(
780        &self,
781        id: GlobalId,
782        as_of: SnapshotStatsAsOf,
783    ) -> Result<SnapshotStats, StorageError> {
784        // TODO: Pull this out of BackgroundTask. Unlike the other methods, the
785        // caller of this one drives it to completion.
786        //
787        // We'd need to either share the critical handle somehow or maybe have
788        // two instances around, one in the worker and one in the
789        // StorageCollections.
790        let (tx, rx) = oneshot::channel();
791        self.send(BackgroundCmd::SnapshotStats(id, as_of, tx));
792        rx.await.expect("BackgroundTask should be live").0.await
793    }
794
795    /// If this identified collection has a dependency, install a read hold on
796    /// it.
797    ///
798    /// This is necessary to ensure that the dependency's since does not advance
799    /// beyond its dependents'.
800    fn install_collection_dependency_read_holds_inner(
801        &self,
802        self_collections: &mut BTreeMap<GlobalId, CollectionState>,
803        id: GlobalId,
804    ) -> Result<(), StorageError> {
805        let (deps, collection_implied_capability) = match self_collections.get(&id) {
806            Some(CollectionState {
807                storage_dependencies: deps,
808                implied_capability,
809                ..
810            }) => (deps.clone(), implied_capability),
811            _ => return Ok(()),
812        };
813
814        for dep in deps.iter() {
815            let dep_collection = self_collections
816                .get(dep)
817                .ok_or(StorageError::IdentifierMissing(id))?;
818
819            mz_ore::soft_assert_or_log!(
820                PartialOrder::less_equal(
821                    &dep_collection.implied_capability,
822                    collection_implied_capability
823                ),
824                "dependency since ({dep}@{:?}) cannot be in advance of dependent's since ({id}@{:?})",
825                dep_collection.implied_capability,
826                collection_implied_capability,
827            );
828        }
829
830        self.install_read_capabilities_inner(
831            self_collections,
832            id,
833            &deps,
834            collection_implied_capability.clone(),
835        )?;
836
837        Ok(())
838    }
839
840    /// Returns the given collection's dependencies.
841    fn determine_collection_dependencies(
842        self_collections: &BTreeMap<GlobalId, CollectionState>,
843        source_id: GlobalId,
844        collection_desc: &CollectionDescription,
845    ) -> Result<Vec<GlobalId>, StorageError> {
846        let mut dependencies = Vec::new();
847
848        if let Some(id) = collection_desc.primary {
849            dependencies.push(id);
850        }
851
852        match &collection_desc.data_source {
853            DataSource::Introspection(_)
854            | DataSource::Webhook
855            | DataSource::Table
856            | DataSource::Progress
857            | DataSource::Other => (),
858            DataSource::IngestionExport {
859                ingestion_id,
860                data_config,
861                ..
862            } => {
863                // Ingestion exports depend on their primary source's remap
864                // collection, except when they use a CDCv2 envelope.
865                let source = self_collections
866                    .get(ingestion_id)
867                    .ok_or(StorageError::IdentifierMissing(*ingestion_id))?;
868                let Some(remap_collection_id) = &source.ingestion_remap_collection_id else {
869                    panic!("SourceExport must refer to a primary source that already exists");
870                };
871
872                match data_config.envelope {
873                    SourceEnvelope::CdcV2 => (),
874                    _ => dependencies.push(*remap_collection_id),
875                }
876            }
877            // Ingestions depend on their remap collection.
878            DataSource::Ingestion(ingestion) => {
879                if ingestion.remap_collection_id != source_id {
880                    dependencies.push(ingestion.remap_collection_id);
881                }
882            }
883            DataSource::Sink { desc } => dependencies.push(desc.sink.from),
884        }
885
886        Ok(dependencies)
887    }
888
889    /// Install read capabilities on the given `storage_dependencies`.
890    #[instrument(level = "debug")]
891    fn install_read_capabilities_inner(
892        &self,
893        self_collections: &mut BTreeMap<GlobalId, CollectionState>,
894        from_id: GlobalId,
895        storage_dependencies: &[GlobalId],
896        read_capability: Antichain<Timestamp>,
897    ) -> Result<(), StorageError> {
898        let mut changes = ChangeBatch::new();
899        for time in read_capability.iter() {
900            changes.update(*time, 1);
901        }
902
903        if tracing::span_enabled!(tracing::Level::TRACE) {
904            // Collecting `user_capabilities` is potentially slow, thus only do it when needed.
905            let user_capabilities = self_collections
906                .iter_mut()
907                .filter(|(id, _c)| id.is_user())
908                .map(|(id, c)| {
909                    let updates = c.read_capabilities.updates().cloned().collect_vec();
910                    (*id, c.implied_capability.clone(), updates)
911                })
912                .collect_vec();
913
914            trace!(
915                %from_id,
916                ?storage_dependencies,
917                ?read_capability,
918                ?user_capabilities,
919                "install_read_capabilities_inner");
920        }
921
922        let mut storage_read_updates = storage_dependencies
923            .iter()
924            .map(|id| (*id, changes.clone()))
925            .collect();
926
927        StorageCollectionsImpl::update_read_capabilities_inner(
928            &self.cmd_tx,
929            self_collections,
930            &mut storage_read_updates,
931        );
932
933        if tracing::span_enabled!(tracing::Level::TRACE) {
934            // Collecting `user_capabilities` is potentially slow, thus only do it when needed.
935            let user_capabilities = self_collections
936                .iter_mut()
937                .filter(|(id, _c)| id.is_user())
938                .map(|(id, c)| {
939                    let updates = c.read_capabilities.updates().cloned().collect_vec();
940                    (*id, c.implied_capability.clone(), updates)
941                })
942                .collect_vec();
943
944            trace!(
945                %from_id,
946                ?storage_dependencies,
947                ?read_capability,
948                ?user_capabilities,
949                "after install_read_capabilities_inner!");
950        }
951
952        Ok(())
953    }
954
955    async fn recent_upper(&self, id: GlobalId) -> Result<Antichain<Timestamp>, StorageError> {
956        let metadata = &self.collection_metadata(id)?;
957        let persist_client = self
958            .persist
959            .open(metadata.persist_location.clone())
960            .await
961            .unwrap();
962        // Duplicate part of open_data_handles here because we don't need the
963        // fetch_recent_upper call. The pubsub-updated shared_upper is enough.
964        let diagnostics = Diagnostics {
965            shard_name: id.to_string(),
966            handle_purpose: format!("controller data for {}", id),
967        };
968        // NB: Opening a WriteHandle is cheap if it's never used in a
969        // compare_and_append operation.
970        let write = persist_client
971            .open_writer::<SourceData, (), Timestamp, StorageDiff>(
972                metadata.data_shard,
973                Arc::new(metadata.relation_desc.clone()),
974                Arc::new(UnitSchema),
975                diagnostics.clone(),
976            )
977            .await
978            .expect("invalid persist usage");
979        Ok(write.shared_upper())
980    }
981
982    async fn read_handle_for_snapshot(
983        persist: Arc<PersistClientCache>,
984        metadata: &CollectionMetadata,
985        id: GlobalId,
986    ) -> Result<ReadHandle<SourceData, (), Timestamp, StorageDiff>, StorageError> {
987        let persist_client = persist
988            .open(metadata.persist_location.clone())
989            .await
990            .unwrap();
991
992        // We create a new read handle every time someone requests a snapshot
993        // and then immediately expire it instead of keeping a read handle
994        // permanently in our state to avoid having it heartbeat continually.
995        // The assumption is that calls to snapshot are rare and therefore worth
996        // it to always create a new handle.
997        let read_handle = persist_client
998            .open_leased_reader::<SourceData, (), _, _>(
999                metadata.data_shard,
1000                Arc::new(metadata.relation_desc.clone()),
1001                Arc::new(UnitSchema),
1002                Diagnostics {
1003                    shard_name: id.to_string(),
1004                    handle_purpose: format!("snapshot {}", id),
1005                },
1006                USE_CRITICAL_SINCE_SNAPSHOT.get(&persist.cfg),
1007            )
1008            .await
1009            .expect("invalid persist usage");
1010        Ok(read_handle)
1011    }
1012
1013    fn snapshot(
1014        &self,
1015        id: GlobalId,
1016        as_of: Timestamp,
1017        txns_read: &TxnsRead<Timestamp>,
1018    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError>> {
1019        let metadata = match self.collection_metadata(id) {
1020            Ok(metadata) => metadata.clone(),
1021            Err(e) => return async { Err(e.into()) }.boxed(),
1022        };
1023        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1024            assert_eq!(txns_id, txns_read.txns_id());
1025            txns_read.clone()
1026        });
1027        let persist = Arc::clone(&self.persist);
1028        async move {
1029            let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1030            let contents = match txns_read {
1031                None => {
1032                    // We're not using txn-wal for tables, so we can take a snapshot directly.
1033                    read_handle
1034                        .snapshot_and_fetch(Antichain::from_elem(as_of))
1035                        .await
1036                }
1037                Some(txns_read) => {
1038                    // We _are_ using txn-wal for tables. It advances the physical upper of the
1039                    // shard lazily, so we need to ask it for the snapshot to ensure the read is
1040                    // unblocked.
1041                    //
1042                    // Consider the following scenario:
1043                    // - Table A is written to via txns at time 5
1044                    // - Tables other than A are written to via txns consuming timestamps up to 10
1045                    // - We'd like to read A at 7
1046                    // - The application process of A's txn has advanced the upper to 5+1, but we need
1047                    //   it to be past 7, but the txns shard knows that (5,10) is empty of writes to A
1048                    // - This branch allows it to handle that advancing the physical upper of Table A to
1049                    //   10 (NB but only once we see it get past the write at 5!)
1050                    // - Then we can read it normally.
1051                    txns_read.update_gt(as_of).await;
1052                    let data_snapshot = txns_read.data_snapshot(metadata.data_shard, as_of).await;
1053                    data_snapshot.snapshot_and_fetch(&mut read_handle).await
1054                }
1055            };
1056            match contents {
1057                Ok(contents) => {
1058                    let mut snapshot = Vec::with_capacity(contents.len());
1059                    for ((data, _), _, diff) in contents {
1060                        // TODO(petrosagg): We should accumulate the errors too and let the user
1061                        // interpret the result
1062                        let row = data.0?;
1063                        snapshot.push((row, diff));
1064                    }
1065                    Ok(snapshot)
1066                }
1067                Err(_) => Err(StorageError::ReadBeforeSince(id)),
1068            }
1069        }
1070        .boxed()
1071    }
1072
1073    fn snapshot_and_stream(
1074        &self,
1075        id: GlobalId,
1076        as_of: Timestamp,
1077        txns_read: &TxnsRead<Timestamp>,
1078    ) -> BoxFuture<'static, Result<SourceDataStream, StorageError>> {
1079        use futures::stream::StreamExt;
1080
1081        let metadata = match self.collection_metadata(id) {
1082            Ok(metadata) => metadata.clone(),
1083            Err(e) => return async { Err(e.into()) }.boxed(),
1084        };
1085        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1086            assert_eq!(txns_id, txns_read.txns_id());
1087            txns_read.clone()
1088        });
1089        let persist = Arc::clone(&self.persist);
1090
1091        async move {
1092            let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1093            let stream = match txns_read {
1094                None => {
1095                    // We're not using txn-wal for tables, so we can take a snapshot directly.
1096                    read_handle
1097                        .snapshot_and_stream(Antichain::from_elem(as_of))
1098                        .await
1099                        .map_err(|_| StorageError::ReadBeforeSince(id))?
1100                        .boxed()
1101                }
1102                Some(txns_read) => {
1103                    txns_read.update_gt(as_of).await;
1104                    let data_snapshot = txns_read.data_snapshot(metadata.data_shard, as_of).await;
1105                    data_snapshot
1106                        .snapshot_and_stream(&mut read_handle)
1107                        .await
1108                        .map_err(|_| StorageError::ReadBeforeSince(id))?
1109                        .boxed()
1110                }
1111            };
1112
1113            // Map our stream, unwrapping Persist internal errors.
1114            let stream = stream.map(|((data, _v), t, d)| (data, t, d)).boxed();
1115            Ok(stream)
1116        }
1117        .boxed()
1118    }
1119
1120    fn set_read_policies_inner(
1121        &self,
1122        collections: &mut BTreeMap<GlobalId, CollectionState>,
1123        policies: Vec<(GlobalId, ReadPolicy)>,
1124    ) {
1125        trace!("set_read_policies: {:?}", policies);
1126
1127        let mut read_capability_changes = BTreeMap::default();
1128
1129        for (id, policy) in policies.into_iter() {
1130            let collection = match collections.get_mut(&id) {
1131                Some(c) => c,
1132                None => {
1133                    panic!("Reference to absent collection {id}");
1134                }
1135            };
1136
1137            let mut new_read_capability = policy.frontier(collection.write_frontier.borrow());
1138
1139            if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
1140                let mut update = ChangeBatch::new();
1141                update.extend(new_read_capability.iter().map(|time| (*time, 1)));
1142                std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
1143                update.extend(new_read_capability.iter().map(|time| (*time, -1)));
1144                if !update.is_empty() {
1145                    read_capability_changes.insert(id, update);
1146                }
1147            }
1148
1149            collection.read_policy = policy;
1150        }
1151
1152        for (id, changes) in read_capability_changes.iter() {
1153            if id.is_user() {
1154                trace!(%id, ?changes, "in set_read_policies, capability changes");
1155            }
1156        }
1157
1158        if !read_capability_changes.is_empty() {
1159            StorageCollectionsImpl::update_read_capabilities_inner(
1160                &self.cmd_tx,
1161                collections,
1162                &mut read_capability_changes,
1163            );
1164        }
1165    }
1166
1167    // This is not an associated function so that we can share it with the task
1168    // that updates the persist handles and also has a reference to the shared
1169    // collections state.
1170    fn update_read_capabilities_inner(
1171        cmd_tx: &mpsc::UnboundedSender<BackgroundCmd>,
1172        collections: &mut BTreeMap<GlobalId, CollectionState>,
1173        updates: &mut BTreeMap<GlobalId, ChangeBatch<Timestamp>>,
1174    ) {
1175        // Location to record consequences that we need to act on.
1176        let mut collections_net = BTreeMap::new();
1177
1178        // We must not rely on any specific relative ordering of `GlobalId`s.
1179        // That said, it is reasonable to assume that collections generally have
1180        // greater IDs than their dependencies, so starting with the largest is
1181        // a useful optimization.
1182        while let Some(id) = updates.keys().rev().next().cloned() {
1183            let mut update = updates.remove(&id).unwrap();
1184
1185            if id.is_user() {
1186                trace!(id = ?id, update = ?update, "update_read_capabilities");
1187            }
1188
1189            let collection = if let Some(c) = collections.get_mut(&id) {
1190                c
1191            } else {
1192                let has_positive_updates = update.iter().any(|(_ts, diff)| *diff > 0);
1193                if has_positive_updates {
1194                    panic!(
1195                        "reference to absent collection {id} but we have positive updates: {:?}",
1196                        update
1197                    );
1198                } else {
1199                    // Continue purely negative updates. Someone has probably
1200                    // already dropped this collection!
1201                    continue;
1202                }
1203            };
1204
1205            let current_read_capabilities = collection.read_capabilities.frontier().to_owned();
1206            for (time, diff) in update.iter() {
1207                assert!(
1208                    collection.read_capabilities.count_for(time) + diff >= 0,
1209                    "update {:?} for collection {id} would lead to negative \
1210                        read capabilities, read capabilities before applying: {:?}",
1211                    update,
1212                    collection.read_capabilities
1213                );
1214
1215                if collection.read_capabilities.count_for(time) + diff > 0 {
1216                    assert!(
1217                        current_read_capabilities.less_equal(time),
1218                        "update {:?} for collection {id} is trying to \
1219                            install read capabilities before the current \
1220                            frontier of read capabilities, read capabilities before applying: {:?}",
1221                        update,
1222                        collection.read_capabilities
1223                    );
1224                }
1225            }
1226
1227            let changes = collection.read_capabilities.update_iter(update.drain());
1228            update.extend(changes);
1229
1230            if id.is_user() {
1231                trace!(
1232                %id,
1233                ?collection.storage_dependencies,
1234                ?update,
1235                "forwarding update to storage dependencies");
1236            }
1237
1238            for id in collection.storage_dependencies.iter() {
1239                updates
1240                    .entry(*id)
1241                    .or_insert_with(ChangeBatch::new)
1242                    .extend(update.iter().cloned());
1243            }
1244
1245            let (changes, frontier) = collections_net
1246                .entry(id)
1247                .or_insert_with(|| (<ChangeBatch<_>>::new(), Antichain::new()));
1248
1249            changes.extend(update.drain());
1250            *frontier = collection.read_capabilities.frontier().to_owned();
1251        }
1252
1253        // Translate our net compute actions into downgrades of persist sinces.
1254        // The actual downgrades are performed by a Tokio task asynchronously.
1255        let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
1256        for (key, (mut changes, frontier)) in collections_net {
1257            if !changes.is_empty() {
1258                // If the collection has a "primary" collection, let that primary drive compaction.
1259                let collection = collections.get(&key).expect("must still exist");
1260                let should_emit_persist_compaction = collection.primary.is_none();
1261
1262                if frontier.is_empty() {
1263                    info!(id = %key, "removing collection state because the since advanced to []!");
1264                    collections.remove(&key).expect("must still exist");
1265                }
1266
1267                if should_emit_persist_compaction {
1268                    persist_compaction_commands.push((key, frontier));
1269                }
1270            }
1271        }
1272
1273        if !persist_compaction_commands.is_empty() {
1274            cmd_tx
1275                .send(BackgroundCmd::DowngradeSince(persist_compaction_commands))
1276                .expect("cannot fail to send");
1277        }
1278    }
1279
1280    /// Remove any shards that we know are finalized
1281    fn synchronize_finalized_shards(&self, storage_metadata: &StorageMetadata) {
1282        self.finalized_shards
1283            .lock()
1284            .retain(|shard| storage_metadata.unfinalized_shards.contains(shard));
1285    }
1286}
1287
1288// See comments on the above impl for StorageCollectionsImpl.
1289#[async_trait]
1290impl StorageCollections for StorageCollectionsImpl {
1291    async fn initialize_state(
1292        &self,
1293        txn: &mut (dyn StorageTxn + Send),
1294        init_ids: BTreeSet<GlobalId>,
1295    ) -> Result<(), StorageError> {
1296        let metadata = txn.get_collection_metadata();
1297        let existing_metadata: BTreeSet<_> = metadata.into_iter().map(|(id, _)| id).collect();
1298
1299        // Determine which collections we do not yet have metadata for.
1300        let new_collections: BTreeSet<GlobalId> =
1301            init_ids.difference(&existing_metadata).cloned().collect();
1302
1303        self.prepare_state(
1304            txn,
1305            new_collections,
1306            BTreeSet::default(),
1307            BTreeMap::default(),
1308        )
1309        .await?;
1310
1311        // All shards that belong to collections dropped in the last epoch are
1312        // eligible for finalization.
1313        //
1314        // n.b. this introduces an unlikely race condition: if a collection is
1315        // dropped from the catalog, but the dataflow is still running on a
1316        // worker, assuming the shard is safe to finalize on reboot may cause
1317        // the cluster to panic.
1318        let unfinalized_shards = txn.get_unfinalized_shards().into_iter().collect_vec();
1319
1320        info!(?unfinalized_shards, "initializing finalizable_shards");
1321
1322        self.finalizable_shards.lock().extend(unfinalized_shards);
1323
1324        Ok(())
1325    }
1326
1327    fn update_parameters(&self, config_params: StorageParameters) {
1328        // We serialize the dyncfg updates in StorageParameters, but configure
1329        // persist separately.
1330        config_params.dyncfg_updates.apply(self.persist.cfg());
1331
1332        self.config
1333            .lock()
1334            .expect("lock poisoned")
1335            .update(config_params);
1336    }
1337
1338    fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing> {
1339        let collections = self.collections.lock().expect("lock poisoned");
1340
1341        collections
1342            .get(&id)
1343            .map(|c| c.collection_metadata.clone())
1344            .ok_or(CollectionMissing(id))
1345    }
1346
1347    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
1348        let collections = self.collections.lock().expect("lock poisoned");
1349
1350        collections
1351            .iter()
1352            .filter(|(_id, c)| !c.is_dropped())
1353            .map(|(id, c)| (*id, c.collection_metadata.clone()))
1354            .collect()
1355    }
1356
1357    fn collections_frontiers(
1358        &self,
1359        ids: Vec<GlobalId>,
1360    ) -> Result<Vec<CollectionFrontiers>, CollectionMissing> {
1361        if ids.is_empty() {
1362            return Ok(vec![]);
1363        }
1364
1365        let collections = self.collections.lock().expect("lock poisoned");
1366
1367        let res = ids
1368            .into_iter()
1369            .map(|id| {
1370                collections
1371                    .get(&id)
1372                    .map(|c| CollectionFrontiers {
1373                        id: id.clone(),
1374                        write_frontier: c.write_frontier.clone(),
1375                        implied_capability: c.implied_capability.clone(),
1376                        read_capabilities: c.read_capabilities.frontier().to_owned(),
1377                    })
1378                    .ok_or(CollectionMissing(id))
1379            })
1380            .collect::<Result<Vec<_>, _>>()?;
1381
1382        Ok(res)
1383    }
1384
1385    fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers> {
1386        let collections = self.collections.lock().expect("lock poisoned");
1387
1388        let res = collections
1389            .iter()
1390            .filter(|(_id, c)| !c.is_dropped())
1391            .map(|(id, c)| CollectionFrontiers {
1392                id: id.clone(),
1393                write_frontier: c.write_frontier.clone(),
1394                implied_capability: c.implied_capability.clone(),
1395                read_capabilities: c.read_capabilities.frontier().to_owned(),
1396            })
1397            .collect_vec();
1398
1399        res
1400    }
1401
1402    async fn snapshot_stats(
1403        &self,
1404        id: GlobalId,
1405        as_of: Antichain<Timestamp>,
1406    ) -> Result<SnapshotStats, StorageError> {
1407        let metadata = self.collection_metadata(id)?;
1408
1409        // See the comments in StorageController::snapshot for what's going on
1410        // here.
1411        let as_of = match metadata.txns_shard.as_ref() {
1412            None => SnapshotStatsAsOf::Direct(as_of),
1413            Some(txns_id) => {
1414                assert_eq!(txns_id, self.txns_read.txns_id());
1415                let as_of = as_of
1416                    .into_option()
1417                    .expect("cannot read as_of the empty antichain");
1418                self.txns_read.update_gt(as_of).await;
1419                let data_snapshot = self
1420                    .txns_read
1421                    .data_snapshot(metadata.data_shard, as_of)
1422                    .await;
1423                SnapshotStatsAsOf::Txns(data_snapshot)
1424            }
1425        };
1426        self.snapshot_stats_inner(id, as_of).await
1427    }
1428
1429    async fn snapshot_parts_stats(
1430        &self,
1431        id: GlobalId,
1432        as_of: Antichain<Timestamp>,
1433    ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError>> {
1434        let metadata = {
1435            let self_collections = self.collections.lock().expect("lock poisoned");
1436
1437            let collection_metadata = self_collections
1438                .get(&id)
1439                .ok_or(StorageError::IdentifierMissing(id))
1440                .map(|c| c.collection_metadata.clone());
1441
1442            match collection_metadata {
1443                Ok(m) => m,
1444                Err(e) => return Box::pin(async move { Err(e) }),
1445            }
1446        };
1447
1448        // See the comments in StorageController::snapshot for what's going on
1449        // here.
1450        let persist = Arc::clone(&self.persist);
1451        let read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await;
1452
1453        let data_snapshot = match (metadata, as_of.as_option()) {
1454            (
1455                CollectionMetadata {
1456                    txns_shard: Some(txns_id),
1457                    data_shard,
1458                    ..
1459                },
1460                Some(as_of),
1461            ) => {
1462                assert_eq!(txns_id, *self.txns_read.txns_id());
1463                self.txns_read.update_gt(*as_of).await;
1464                let data_snapshot = self.txns_read.data_snapshot(data_shard, *as_of).await;
1465                Some(data_snapshot)
1466            }
1467            _ => None,
1468        };
1469
1470        Box::pin(async move {
1471            let read_handle = read_handle?;
1472            let result = match data_snapshot {
1473                Some(data_snapshot) => data_snapshot.snapshot_parts_stats(&read_handle).await,
1474                None => read_handle.snapshot_parts_stats(as_of).await,
1475            };
1476            read_handle.expire().await;
1477            result.map_err(|_| StorageError::ReadBeforeSince(id))
1478        })
1479    }
1480
1481    fn snapshot(
1482        &self,
1483        id: GlobalId,
1484        as_of: Timestamp,
1485    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError>> {
1486        self.snapshot(id, as_of, &self.txns_read)
1487    }
1488
1489    async fn snapshot_latest(&self, id: GlobalId) -> Result<Vec<Row>, StorageError> {
1490        let upper = self.recent_upper(id).await?;
1491        let res = match upper.as_option() {
1492            Some(f) if f > &Timestamp::MIN => {
1493                let as_of = f.step_back().unwrap();
1494
1495                let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap();
1496                snapshot
1497                    .into_iter()
1498                    .map(|(row, diff)| {
1499                        assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
1500                        row
1501                    })
1502                    .collect()
1503            }
1504            Some(_min) => {
1505                // The collection must be empty!
1506                Vec::new()
1507            }
1508            // The collection is closed, we cannot determine a latest read
1509            // timestamp based on the upper.
1510            _ => {
1511                return Err(StorageError::InvalidUsage(
1512                    "collection closed, cannot determine a read timestamp based on the upper"
1513                        .to_string(),
1514                ));
1515            }
1516        };
1517
1518        Ok(res)
1519    }
1520
1521    fn snapshot_cursor(
1522        &self,
1523        id: GlobalId,
1524        as_of: Timestamp,
1525    ) -> BoxFuture<'static, Result<SnapshotCursor, StorageError>> {
1526        let metadata = match self.collection_metadata(id) {
1527            Ok(metadata) => metadata.clone(),
1528            Err(e) => return async { Err(e.into()) }.boxed(),
1529        };
1530        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1531            // Ensure the txn's shard the controller has is the same that this
1532            // collection is registered to.
1533            assert_eq!(txns_id, self.txns_read.txns_id());
1534            self.txns_read.clone()
1535        });
1536        let persist = Arc::clone(&self.persist);
1537
1538        // See the comments in Self::snapshot for what's going on here.
1539        async move {
1540            let mut handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1541            let cursor = match txns_read {
1542                None => {
1543                    let cursor = handle
1544                        .snapshot_cursor(Antichain::from_elem(as_of), |_| true)
1545                        .await
1546                        .map_err(|_| StorageError::ReadBeforeSince(id))?;
1547                    SnapshotCursor {
1548                        _read_handle: handle,
1549                        cursor,
1550                    }
1551                }
1552                Some(txns_read) => {
1553                    txns_read.update_gt(as_of).await;
1554                    let data_snapshot = txns_read.data_snapshot(metadata.data_shard, as_of).await;
1555                    let cursor = data_snapshot
1556                        .snapshot_cursor(&mut handle, |_| true)
1557                        .await
1558                        .map_err(|_| StorageError::ReadBeforeSince(id))?;
1559                    SnapshotCursor {
1560                        _read_handle: handle,
1561                        cursor,
1562                    }
1563                }
1564            };
1565
1566            Ok(cursor)
1567        }
1568        .boxed()
1569    }
1570
1571    fn snapshot_and_stream(
1572        &self,
1573        id: GlobalId,
1574        as_of: Timestamp,
1575    ) -> BoxFuture<
1576        'static,
1577        Result<BoxStream<'static, (SourceData, Timestamp, StorageDiff)>, StorageError>,
1578    > {
1579        self.snapshot_and_stream(id, as_of, &self.txns_read)
1580    }
1581
1582    fn create_update_builder(
1583        &self,
1584        id: GlobalId,
1585    ) -> BoxFuture<
1586        'static,
1587        Result<TimestamplessUpdateBuilder<SourceData, (), StorageDiff>, StorageError>,
1588    > {
1589        let metadata = match self.collection_metadata(id) {
1590            Ok(m) => m,
1591            Err(e) => return Box::pin(async move { Err(e.into()) }),
1592        };
1593        let persist = Arc::clone(&self.persist);
1594
1595        async move {
1596            let persist_client = persist
1597                .open(metadata.persist_location.clone())
1598                .await
1599                .expect("invalid persist usage");
1600            let write_handle = persist_client
1601                .open_writer::<SourceData, (), Timestamp, StorageDiff>(
1602                    metadata.data_shard,
1603                    Arc::new(metadata.relation_desc.clone()),
1604                    Arc::new(UnitSchema),
1605                    Diagnostics {
1606                        shard_name: id.to_string(),
1607                        handle_purpose: format!("create write batch {}", id),
1608                    },
1609                )
1610                .await
1611                .expect("invalid persist usage");
1612            let builder = TimestamplessUpdateBuilder::new(&write_handle);
1613
1614            Ok(builder)
1615        }
1616        .boxed()
1617    }
1618
1619    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError> {
1620        let collections = self.collections.lock().expect("lock poisoned");
1621
1622        if collections.contains_key(&id) {
1623            Ok(())
1624        } else {
1625            Err(StorageError::IdentifierMissing(id))
1626        }
1627    }
1628
1629    async fn prepare_state(
1630        &self,
1631        txn: &mut (dyn StorageTxn + Send),
1632        ids_to_add: BTreeSet<GlobalId>,
1633        ids_to_drop: BTreeSet<GlobalId>,
1634        ids_to_register: BTreeMap<GlobalId, ShardId>,
1635    ) -> Result<(), StorageError> {
1636        txn.insert_collection_metadata(
1637            ids_to_add
1638                .into_iter()
1639                .map(|id| (id, ShardId::new()))
1640                .collect(),
1641        )?;
1642        txn.insert_collection_metadata(ids_to_register)?;
1643
1644        // Delete the metadata for any dropped collections.
1645        let dropped_mappings = txn.delete_collection_metadata(ids_to_drop);
1646
1647        // Only finalize the shards of dropped collections that don't have a primary.
1648        // Otherwise the shard might still be in use by the primary.
1649        let mut dropped_shards = BTreeSet::new();
1650        {
1651            let collections = self.collections.lock().expect("poisoned");
1652            for (id, shard) in dropped_mappings {
1653                let coll = collections.get(&id).expect("must exist");
1654                if coll.primary.is_none() {
1655                    dropped_shards.insert(shard);
1656                }
1657            }
1658        }
1659        txn.insert_unfinalized_shards(dropped_shards)?;
1660
1661        // Reconcile any shards we've successfully finalized with the shard
1662        // finalization collection.
1663        let finalized_shards = self.finalized_shards.lock().iter().copied().collect();
1664        txn.mark_shards_as_finalized(finalized_shards);
1665
1666        Ok(())
1667    }
1668
1669    // TODO(aljoscha): It would be swell if we could refactor this Leviathan of
1670    // a method/move individual parts to their own methods.
1671    #[instrument(level = "debug")]
1672    async fn create_collections_for_bootstrap(
1673        &self,
1674        storage_metadata: &StorageMetadata,
1675        register_ts: Option<Timestamp>,
1676        mut collections: Vec<(GlobalId, CollectionDescription)>,
1677        migrated_storage_collections: &BTreeSet<GlobalId>,
1678    ) -> Result<(), StorageError> {
1679        let is_in_txns = |id, metadata: &CollectionMetadata| {
1680            metadata.txns_shard.is_some()
1681                && !(self.read_only && migrated_storage_collections.contains(&id))
1682        };
1683
1684        // Validate first, to avoid corrupting state.
1685        // 1. create a dropped identifier, or
1686        // 2. create an existing identifier with a new description.
1687        // Make sure to check for errors within `ingestions` as well.
1688        collections.sort_by_key(|(id, _)| *id);
1689        collections.dedup();
1690        for pos in 1..collections.len() {
1691            if collections[pos - 1].0 == collections[pos].0 {
1692                return Err(StorageError::CollectionIdReused(collections[pos].0));
1693            }
1694        }
1695
1696        // We first enrich each collection description with some additional
1697        // metadata...
1698        let enriched_with_metadata = collections
1699            .into_iter()
1700            .map(|(id, description)| {
1701                let data_shard = storage_metadata.get_collection_shard(id)?;
1702
1703                // If the shard is being managed by txn-wal (initially,
1704                // tables), then we need to pass along the shard id for the txns
1705                // shard to dataflow rendering.
1706                let txns_shard = description
1707                    .data_source
1708                    .in_txns()
1709                    .then(|| *self.txns_read.txns_id());
1710
1711                let metadata = CollectionMetadata {
1712                    persist_location: self.persist_location.clone(),
1713                    data_shard,
1714                    relation_desc: description.desc.clone(),
1715                    txns_shard,
1716                };
1717
1718                Ok((id, description, metadata))
1719            })
1720            .collect_vec();
1721
1722        // So that we can open `SinceHandle`s for each collections concurrently.
1723        let persist_client = self
1724            .persist
1725            .open(self.persist_location.clone())
1726            .await
1727            .unwrap();
1728        let persist_client = &persist_client;
1729        // Reborrow the `&mut self` as immutable, as all the concurrent work to
1730        // be processed in this stream cannot all have exclusive access.
1731        use futures::stream::{StreamExt, TryStreamExt};
1732        let this = &*self;
1733        let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
1734            .map(|data: Result<_, StorageError>| {
1735                async move {
1736                    let (id, description, metadata) = data?;
1737
1738                    // should be replaced with real introspection
1739                    // (https://github.com/MaterializeInc/database-issues/issues/4078)
1740                    // but for now, it's helpful to have this mapping written down
1741                    // somewhere
1742                    debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
1743
1744                    // If this collection has a primary, the primary is responsible for downgrading
1745                    // the critical since and it would be an error if we did so here while opening
1746                    // the since handle.
1747                    let since = if description.primary.is_some() {
1748                        None
1749                    } else {
1750                        description.since.as_ref()
1751                    };
1752
1753                    let (write, mut since_handle) = this
1754                        .open_data_handles(
1755                            &id,
1756                            metadata.data_shard,
1757                            since,
1758                            metadata.relation_desc.clone(),
1759                            persist_client,
1760                        )
1761                        .await;
1762
1763                    // Present tables as springing into existence at the register_ts
1764                    // by advancing the since. Otherwise, we could end up in a
1765                    // situation where a table with a long compaction window appears
1766                    // to exist before the environment (and this the table) existed.
1767                    //
1768                    // We could potentially also do the same thing for other
1769                    // sources, in particular storage's internal sources and perhaps
1770                    // others, but leave them for now.
1771                    match description.data_source {
1772                        DataSource::Introspection(_)
1773                        | DataSource::IngestionExport { .. }
1774                        | DataSource::Webhook
1775                        | DataSource::Ingestion(_)
1776                        | DataSource::Progress
1777                        | DataSource::Other => {}
1778                        DataSource::Sink { .. } => {}
1779                        DataSource::Table => {
1780                            let register_ts = register_ts.expect(
1781                                "caller should have provided a register_ts when creating a table",
1782                            );
1783                            if since_handle.since().elements() == &[Timestamp::MIN]
1784                                && !migrated_storage_collections.contains(&id)
1785                            {
1786                                debug!("advancing {} to initial since of {:?}", id, register_ts);
1787                                let token = since_handle.opaque();
1788                                let _ = since_handle
1789                                    .compare_and_downgrade_since(
1790                                        &token,
1791                                        (&token, &Antichain::from_elem(register_ts)),
1792                                    )
1793                                    .await;
1794                            }
1795                        }
1796                    }
1797
1798                    Ok::<_, StorageError>((id, description, write, since_handle, metadata))
1799                }
1800            })
1801            // Poll each future for each collection concurrently, maximum of 50 at a time.
1802            .buffer_unordered(50)
1803            // HERE BE DRAGONS:
1804            //
1805            // There are at least 2 subtleties in using `FuturesUnordered`
1806            // (which `buffer_unordered` uses underneath:
1807            // - One is captured here
1808            //   <https://github.com/rust-lang/futures-rs/issues/2387>
1809            // - And the other is deadlocking if processing an OUTPUT of a
1810            //   `FuturesUnordered` stream attempts to obtain an async mutex that
1811            //   is also obtained in the futures being polled.
1812            //
1813            // Both of these could potentially be issues in all usages of
1814            // `buffer_unordered` in this method, so we stick the standard
1815            // advice: only use `try_collect` or `collect`!
1816            .try_collect()
1817            .await?;
1818
1819        // Reorder in dependency order.
1820        #[derive(Ord, PartialOrd, Eq, PartialEq)]
1821        enum DependencyOrder {
1822            /// Tables should always be registered first, and large ids before small ones.
1823            Table(Reverse<GlobalId>),
1824            /// For most collections the id order is the correct one.
1825            Collection(GlobalId),
1826            /// Sinks should always be registered last.
1827            Sink(GlobalId),
1828        }
1829        to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
1830            DataSource::Table => DependencyOrder::Table(Reverse(*id)),
1831            DataSource::Sink { .. } => DependencyOrder::Sink(*id),
1832            _ => DependencyOrder::Collection(*id),
1833        });
1834
1835        // We hold this lock for a very short amount of time, just doing some
1836        // hashmap inserts and unbounded channel sends.
1837        let mut self_collections = self.collections.lock().expect("lock poisoned");
1838
1839        for (id, description, write_handle, since_handle, metadata) in to_register {
1840            let write_frontier = write_handle.upper();
1841            let data_shard_since = since_handle.since().clone();
1842
1843            // Determine if this collection has any dependencies.
1844            let storage_dependencies =
1845                Self::determine_collection_dependencies(&*self_collections, id, &description)?;
1846
1847            // Determine the initial since of the collection.
1848            let initial_since = match storage_dependencies
1849                .iter()
1850                .at_most_one()
1851                .expect("should have at most one dependency")
1852            {
1853                Some(dep) => {
1854                    let dependency_collection = self_collections
1855                        .get(dep)
1856                        .ok_or(StorageError::IdentifierMissing(*dep))?;
1857                    let dependency_since = dependency_collection.implied_capability.clone();
1858
1859                    // If an item has a dependency, its initial since must be
1860                    // advanced as far as its dependency, i.e. a dependency's
1861                    // since may never be in advance of its dependents.
1862                    //
1863                    // We have to do this every time we initialize the
1864                    // collection, though––the invariant might have been upheld
1865                    // correctly in the previous epoch, but the
1866                    // `data_shard_since` might not have compacted and, on
1867                    // establishing a new persist connection, still have data we
1868                    // said _could_ be compacted.
1869                    if PartialOrder::less_than(&data_shard_since, &dependency_since) {
1870                        // The dependency since cannot be beyond the dependent
1871                        // (our) upper unless the collection is new. In
1872                        // practice, the depdenency is the remap shard of a
1873                        // source (export), and if the since is allowed to
1874                        // "catch up" to the upper, that is `upper <= since`, a
1875                        // restarting ingestion cannot differentiate between
1876                        // updates that have already been written out to the
1877                        // backing persist shard and updates that have yet to be
1878                        // written. We would write duplicate updates.
1879                        //
1880                        // If this check fails, it means that the read hold
1881                        // installed on the dependency was probably not upheld
1882                        // –– if it were, the dependency's since could not have
1883                        // advanced as far the dependent's upper.
1884                        //
1885                        // We don't care about the dependency since when the
1886                        // write frontier is empty. In that case, no-one can
1887                        // write down any more updates.
1888                        mz_ore::soft_assert_or_log!(
1889                            write_frontier.elements() == &[Timestamp::MIN]
1890                                || write_frontier.is_empty()
1891                                || PartialOrder::less_than(&dependency_since, write_frontier),
1892                            "dependency ({dep}) since has advanced past dependent ({id}) upper \n
1893                            dependent ({id}): since {:?}, upper {:?} \n
1894                            dependency ({dep}): since {:?}",
1895                            data_shard_since,
1896                            write_frontier,
1897                            dependency_since
1898                        );
1899
1900                        dependency_since
1901                    } else {
1902                        data_shard_since
1903                    }
1904                }
1905                None => data_shard_since,
1906            };
1907
1908            // Determine the time dependence of the collection.
1909            let time_dependence = {
1910                use DataSource::*;
1911                if let Some(timeline) = &description.timeline
1912                    && *timeline != Timeline::EpochMilliseconds
1913                {
1914                    // Only the epoch timeline follows wall-clock.
1915                    None
1916                } else {
1917                    match &description.data_source {
1918                        Ingestion(ingestion) => {
1919                            use GenericSourceConnection::*;
1920                            match ingestion.desc.connection {
1921                                // Kafka, Postgres, MySql, and SQL Server sources all
1922                                // follow wall clock.
1923                                Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => {
1924                                    Some(TimeDependence::default())
1925                                }
1926                                // Load generators not further specified.
1927                                LoadGenerator(_) => None,
1928                            }
1929                        }
1930                        IngestionExport { ingestion_id, .. } => {
1931                            let c = self_collections.get(ingestion_id).expect("known to exist");
1932                            c.time_dependence.clone()
1933                        }
1934                        // Introspection, other, progress, table, and webhook sources follow wall clock.
1935                        Introspection(_) | Progress | Table { .. } | Webhook { .. } => {
1936                            Some(TimeDependence::default())
1937                        }
1938                        // Materialized views, continual tasks, etc, aren't managed by storage.
1939                        Other => None,
1940                        Sink { .. } => None,
1941                    }
1942                }
1943            };
1944
1945            let ingestion_remap_collection_id = match &description.data_source {
1946                DataSource::Ingestion(desc) => Some(desc.remap_collection_id),
1947                _ => None,
1948            };
1949
1950            let mut collection_state = CollectionState::new(
1951                description.primary,
1952                time_dependence,
1953                ingestion_remap_collection_id,
1954                initial_since,
1955                write_frontier.clone(),
1956                storage_dependencies,
1957                metadata.clone(),
1958            );
1959
1960            // Install the collection state in the appropriate spot.
1961            match &description.data_source {
1962                DataSource::Introspection(_) => {
1963                    self_collections.insert(id, collection_state);
1964                }
1965                DataSource::Webhook => {
1966                    self_collections.insert(id, collection_state);
1967                }
1968                DataSource::IngestionExport { .. } => {
1969                    self_collections.insert(id, collection_state);
1970                }
1971                DataSource::Table => {
1972                    // See comment on self.initial_txn_upper on why we're doing
1973                    // this.
1974                    if is_in_txns(id, &metadata)
1975                        && PartialOrder::less_than(
1976                            &collection_state.write_frontier,
1977                            &self.initial_txn_upper,
1978                        )
1979                    {
1980                        // We could try and be cute and use the join of the txn
1981                        // upper and the table upper. But that has more
1982                        // complicated reasoning for why it is or isn't correct,
1983                        // and we're only dealing with totally ordered times
1984                        // here.
1985                        collection_state
1986                            .write_frontier
1987                            .clone_from(&self.initial_txn_upper);
1988                    }
1989                    self_collections.insert(id, collection_state);
1990                }
1991                DataSource::Progress | DataSource::Other => {
1992                    self_collections.insert(id, collection_state);
1993                }
1994                DataSource::Ingestion(_) => {
1995                    self_collections.insert(id, collection_state);
1996                }
1997                DataSource::Sink { .. } => {
1998                    self_collections.insert(id, collection_state);
1999                }
2000            }
2001
2002            self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle);
2003
2004            // If this collection has a dependency, install a read hold on it.
2005            self.install_collection_dependency_read_holds_inner(&mut *self_collections, id)?;
2006        }
2007
2008        drop(self_collections);
2009
2010        self.synchronize_finalized_shards(storage_metadata);
2011
2012        Ok(())
2013    }
2014
2015    async fn alter_table_desc(
2016        &self,
2017        existing_collection: GlobalId,
2018        new_collection: GlobalId,
2019        new_desc: RelationDesc,
2020        expected_version: RelationVersion,
2021    ) -> Result<(), StorageError> {
2022        let data_shard = {
2023            let self_collections = self.collections.lock().expect("lock poisoned");
2024            let existing = self_collections
2025                .get(&existing_collection)
2026                .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;
2027
2028            existing.collection_metadata.data_shard
2029        };
2030
2031        let persist_client = self
2032            .persist
2033            .open(self.persist_location.clone())
2034            .await
2035            .unwrap();
2036
2037        // Evolve the schema of this shard.
2038        let diagnostics = Diagnostics {
2039            shard_name: existing_collection.to_string(),
2040            handle_purpose: "alter_table_desc".to_string(),
2041        };
2042        // We map the Adapter's RelationVersion 1:1 with SchemaId.
2043        let expected_schema = expected_version.into();
2044        let schema_result = persist_client
2045            .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
2046                data_shard,
2047                expected_schema,
2048                &new_desc,
2049                &UnitSchema,
2050                diagnostics,
2051            )
2052            .await
2053            .map_err(|e| StorageError::InvalidUsage(e.to_string()))?;
2054        tracing::info!(
2055            ?existing_collection,
2056            ?new_collection,
2057            ?new_desc,
2058            "evolved schema"
2059        );
2060
2061        match schema_result {
2062            CaESchema::Ok(id) => id,
2063            // TODO(alter_table): If we get an expected mismatch we should retry.
2064            CaESchema::ExpectedMismatch {
2065                schema_id,
2066                key,
2067                val,
2068            } => {
2069                mz_ore::soft_panic_or_log!(
2070                    "schema expectation mismatch {schema_id:?}, {key:?}, {val:?}"
2071                );
2072                return Err(StorageError::Generic(anyhow::anyhow!(
2073                    "schema expected mismatch, {existing_collection:?}",
2074                )));
2075            }
2076            CaESchema::Incompatible => {
2077                mz_ore::soft_panic_or_log!(
2078                    "incompatible schema! {existing_collection} {new_desc:?}"
2079                );
2080                return Err(StorageError::Generic(anyhow::anyhow!(
2081                    "schema incompatible, {existing_collection:?}"
2082                )));
2083            }
2084        };
2085
2086        // Once the new schema is registered we can open new data handles.
2087        let (write_handle, since_handle) = self
2088            .open_data_handles(
2089                &new_collection,
2090                data_shard,
2091                None,
2092                new_desc.clone(),
2093                &persist_client,
2094            )
2095            .await;
2096
2097        // TODO(alter_table): Do we need to advance the since of the table to match the time this
2098        // new version was registered with txn-wal?
2099
2100        // Great! Our new schema is registered with Persist, now we need to update our internal
2101        // data structures.
2102        {
2103            let mut self_collections = self.collections.lock().expect("lock poisoned");
2104
2105            // Update the existing collection so we know it's a "projection" of this new one.
2106            let existing = self_collections
2107                .get_mut(&existing_collection)
2108                .expect("existing collection missing");
2109
2110            // A higher level should already be asserting this, but let's make sure.
2111            assert_none!(existing.primary);
2112
2113            // The existing version of the table will depend on the new version.
2114            existing.primary = Some(new_collection);
2115            existing.storage_dependencies.push(new_collection);
2116
2117            // Copy over the frontiers from the previous version.
2118            // The new table starts with two holds - the implied capability, and the hold from
2119            // the previous version - both at the previous version's read frontier.
2120            let implied_capability = existing.read_capabilities.frontier().to_owned();
2121            let write_frontier = existing.write_frontier.clone();
2122
2123            // Determine the relevant read capabilities on the new collection.
2124            //
2125            // Note(parkmycar): Originally we used `install_collection_dependency_read_holds_inner`
2126            // here, but that only installed a ReadHold on the new collection for the implied
2127            // capability of the existing collection. This would cause runtime panics because it
2128            // would eventually result in negative read capabilities.
2129            let mut changes = ChangeBatch::new();
2130            changes.extend(implied_capability.iter().map(|t| (*t, 1)));
2131
2132            // Note: The new collection is now the "primary collection".
2133            let collection_meta = CollectionMetadata {
2134                persist_location: self.persist_location.clone(),
2135                relation_desc: new_desc.clone(),
2136                data_shard,
2137                txns_shard: Some(self.txns_read.txns_id().clone()),
2138            };
2139            let collection_state = CollectionState::new(
2140                None,
2141                existing.time_dependence.clone(),
2142                existing.ingestion_remap_collection_id.clone(),
2143                implied_capability,
2144                write_frontier,
2145                Vec::new(),
2146                collection_meta,
2147            );
2148
2149            // Add a record of the new collection.
2150            self_collections.insert(new_collection, collection_state);
2151
2152            let mut updates = BTreeMap::from([(new_collection, changes)]);
2153            StorageCollectionsImpl::update_read_capabilities_inner(
2154                &self.cmd_tx,
2155                &mut *self_collections,
2156                &mut updates,
2157            );
2158        };
2159
2160        // TODO(alter_table): Support changes to sources.
2161        self.register_handles(new_collection, true, since_handle, write_handle);
2162
2163        info!(%existing_collection, %new_collection, ?new_desc, "altered table");
2164
2165        Ok(())
2166    }
2167
2168    fn drop_collections_unvalidated(
2169        &self,
2170        storage_metadata: &StorageMetadata,
2171        identifiers: Vec<GlobalId>,
2172    ) {
2173        debug!(?identifiers, "drop_collections_unvalidated");
2174
2175        let mut self_collections = self.collections.lock().expect("lock poisoned");
2176
2177        // Policies that advance the since to the empty antichain. We do still
2178        // honor outstanding read holds, and collections will only be dropped
2179        // once those are removed as well.
2180        //
2181        // We don't explicitly remove read capabilities! Downgrading the
2182        // frontier of the source to `[]` (the empty Antichain), will propagate
2183        // to the storage dependencies.
2184        let mut finalized_policies = Vec::new();
2185
2186        for id in identifiers {
2187            // Make sure it's still there, might already have been deleted.
2188            let Some(collection) = self_collections.get(&id) else {
2189                continue;
2190            };
2191
2192            // Unless the collection has a primary, its shard must have been previously removed
2193            // by `StorageCollections::prepare_state`.
2194            if collection.primary.is_none() {
2195                let metadata = storage_metadata.get_collection_shard(id);
2196                mz_ore::soft_assert_or_log!(
2197                    matches!(metadata, Err(StorageError::IdentifierMissing(_))),
2198                    "dropping {id}, but drop was not synchronized with storage \
2199                     controller via `prepare_state`"
2200                );
2201            }
2202
2203            finalized_policies.push((id, ReadPolicy::ValidFrom(Antichain::new())));
2204        }
2205
2206        self.set_read_policies_inner(&mut self_collections, finalized_policies);
2207
2208        drop(self_collections);
2209
2210        self.synchronize_finalized_shards(storage_metadata);
2211    }
2212
2213    fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy)>) {
2214        let mut collections = self.collections.lock().expect("lock poisoned");
2215
2216        if tracing::enabled!(tracing::Level::TRACE) {
2217            let user_capabilities = collections
2218                .iter_mut()
2219                .filter(|(id, _c)| id.is_user())
2220                .map(|(id, c)| {
2221                    let updates = c.read_capabilities.updates().cloned().collect_vec();
2222                    (*id, c.implied_capability.clone(), updates)
2223                })
2224                .collect_vec();
2225
2226            trace!(?policies, ?user_capabilities, "set_read_policies");
2227        }
2228
2229        self.set_read_policies_inner(&mut collections, policies);
2230
2231        if tracing::enabled!(tracing::Level::TRACE) {
2232            let user_capabilities = collections
2233                .iter_mut()
2234                .filter(|(id, _c)| id.is_user())
2235                .map(|(id, c)| {
2236                    let updates = c.read_capabilities.updates().cloned().collect_vec();
2237                    (*id, c.implied_capability.clone(), updates)
2238                })
2239                .collect_vec();
2240
2241            trace!(?user_capabilities, "after! set_read_policies");
2242        }
2243    }
2244
2245    fn acquire_read_holds(
2246        &self,
2247        desired_holds: Vec<GlobalId>,
2248    ) -> Result<Vec<ReadHold>, CollectionMissing> {
2249        if desired_holds.is_empty() {
2250            return Ok(vec![]);
2251        }
2252
2253        let mut collections = self.collections.lock().expect("lock poisoned");
2254
2255        let mut advanced_holds = Vec::new();
2256        // We advance the holds by our current since frontier. Can't acquire
2257        // holds for times that have been compacted away!
2258        //
2259        // NOTE: We acquire read holds at the earliest possible time rather than
2260        // at the implied capability. This is so that, for example, adapter can
2261        // acquire a read hold to hold back the frontier, giving the COMPUTE
2262        // controller a chance to also acquire a read hold at that early
2263        // frontier. If/when we change the interplay between adapter and COMPUTE
2264        // to pass around ReadHold tokens, we might tighten this up and instead
2265        // acquire read holds at the implied capability.
2266        for id in desired_holds.iter() {
2267            let collection = collections.get(id).ok_or(CollectionMissing(*id))?;
2268            let since = collection.read_capabilities.frontier().to_owned();
2269            advanced_holds.push((*id, since));
2270        }
2271
2272        let mut updates = advanced_holds
2273            .iter()
2274            .map(|(id, hold)| {
2275                let mut changes = ChangeBatch::new();
2276                changes.extend(hold.iter().map(|time| (*time, 1)));
2277                (*id, changes)
2278            })
2279            .collect::<BTreeMap<_, _>>();
2280
2281        StorageCollectionsImpl::update_read_capabilities_inner(
2282            &self.cmd_tx,
2283            &mut collections,
2284            &mut updates,
2285        );
2286
2287        let acquired_holds = advanced_holds
2288            .into_iter()
2289            .map(|(id, since)| ReadHold::with_channel(id, since, self.holds_tx.clone()))
2290            .collect_vec();
2291
2292        trace!(?desired_holds, ?acquired_holds, "acquire_read_holds");
2293
2294        Ok(acquired_holds)
2295    }
2296
2297    /// Determine time dependence information for the object.
2298    fn determine_time_dependence(
2299        &self,
2300        id: GlobalId,
2301    ) -> Result<Option<TimeDependence>, TimeDependenceError> {
2302        use TimeDependenceError::CollectionMissing;
2303        let collections = self.collections.lock().expect("lock poisoned");
2304        let state = collections.get(&id).ok_or(CollectionMissing(id))?;
2305        Ok(state.time_dependence.clone())
2306    }
2307
2308    fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2309        // Destructure `self` here so we don't forget to consider dumping newly added fields.
2310        let Self {
2311            envd_epoch,
2312            read_only,
2313            finalizable_shards,
2314            finalized_shards,
2315            collections,
2316            txns_read: _,
2317            config,
2318            initial_txn_upper,
2319            persist_location,
2320            persist: _,
2321            cmd_tx: _,
2322            holds_tx: _,
2323            _background_task: _,
2324            _finalize_shards_task: _,
2325        } = self;
2326
2327        let finalizable_shards: Vec<_> = finalizable_shards
2328            .lock()
2329            .iter()
2330            .map(ToString::to_string)
2331            .collect();
2332        let finalized_shards: Vec<_> = finalized_shards
2333            .lock()
2334            .iter()
2335            .map(ToString::to_string)
2336            .collect();
2337        let collections: BTreeMap<_, _> = collections
2338            .lock()
2339            .expect("poisoned")
2340            .iter()
2341            .map(|(id, c)| (id.to_string(), format!("{c:?}")))
2342            .collect();
2343        let config = format!("{:?}", config.lock().expect("poisoned"));
2344
2345        Ok(serde_json::json!({
2346            "envd_epoch": envd_epoch,
2347            "read_only": read_only,
2348            "finalizable_shards": finalizable_shards,
2349            "finalized_shards": finalized_shards,
2350            "collections": collections,
2351            "config": config,
2352            "initial_txn_upper": initial_txn_upper,
2353            "persist_location": format!("{persist_location:?}"),
2354        }))
2355    }
2356}
2357
2358/// Wraps either a "critical" [SinceHandle] or a leased [ReadHandle].
2359///
2360/// When a [StorageCollections] is in read-only mode, we will only ever acquire
2361/// [ReadHandle], because acquiring the [SinceHandle] and driving forward its
2362/// since is considered a write. Conversely, when in read-write mode, we acquire
2363/// [SinceHandle].
2364#[derive(Debug)]
2365enum SinceHandleWrapper {
2366    Critical(SinceHandle<SourceData, (), Timestamp, StorageDiff>),
2367    Leased(ReadHandle<SourceData, (), Timestamp, StorageDiff>),
2368}
2369
2370impl SinceHandleWrapper {
2371    pub fn since(&self) -> &Antichain<Timestamp> {
2372        match self {
2373            Self::Critical(handle) => handle.since(),
2374            Self::Leased(handle) => handle.since(),
2375        }
2376    }
2377
2378    pub fn opaque(&self) -> PersistEpoch {
2379        match self {
2380            Self::Critical(handle) => handle.opaque().decode(),
2381            Self::Leased(_handle) => {
2382                // The opaque is expected to be used with
2383                // `compare_and_downgrade_since`, and the leased handle doesn't
2384                // have a notion of an opaque. We pretend here and in
2385                // `compare_and_downgrade_since`.
2386                PersistEpoch(None)
2387            }
2388        }
2389    }
2390
2391    pub async fn compare_and_downgrade_since(
2392        &mut self,
2393        expected: &PersistEpoch,
2394        (opaque, since): (&PersistEpoch, &Antichain<Timestamp>),
2395    ) -> Result<Antichain<Timestamp>, PersistEpoch> {
2396        match self {
2397            Self::Critical(handle) => handle
2398                .compare_and_downgrade_since(
2399                    &Opaque::encode(expected),
2400                    (&Opaque::encode(opaque), since),
2401                )
2402                .await
2403                .map_err(|e| e.decode()),
2404            Self::Leased(handle) => {
2405                assert_none!(opaque.0);
2406
2407                handle.downgrade_since(since).await;
2408
2409                Ok(since.clone())
2410            }
2411        }
2412    }
2413
2414    pub async fn maybe_compare_and_downgrade_since(
2415        &mut self,
2416        expected: &PersistEpoch,
2417        (opaque, since): (&PersistEpoch, &Antichain<Timestamp>),
2418    ) -> Option<Result<Antichain<Timestamp>, PersistEpoch>> {
2419        match self {
2420            Self::Critical(handle) => handle
2421                .maybe_compare_and_downgrade_since(
2422                    &Opaque::encode(expected),
2423                    (&Opaque::encode(opaque), since),
2424                )
2425                .await
2426                .map(|r| r.map_err(|o| o.decode())),
2427            Self::Leased(handle) => {
2428                assert_none!(opaque.0);
2429
2430                handle.maybe_downgrade_since(since).await;
2431
2432                Some(Ok(since.clone()))
2433            }
2434        }
2435    }
2436
2437    pub fn snapshot_stats(
2438        &self,
2439        id: GlobalId,
2440        as_of: Option<Antichain<Timestamp>>,
2441    ) -> BoxFuture<'static, Result<SnapshotStats, StorageError>> {
2442        match self {
2443            Self::Critical(handle) => {
2444                let res = handle
2445                    .snapshot_stats(as_of)
2446                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2447                Box::pin(res)
2448            }
2449            Self::Leased(handle) => {
2450                let res = handle
2451                    .snapshot_stats(as_of)
2452                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2453                Box::pin(res)
2454            }
2455        }
2456    }
2457
2458    pub fn snapshot_stats_from_txn(
2459        &self,
2460        id: GlobalId,
2461        data_snapshot: DataSnapshot<Timestamp>,
2462    ) -> BoxFuture<'static, Result<SnapshotStats, StorageError>> {
2463        match self {
2464            Self::Critical(handle) => Box::pin(
2465                data_snapshot
2466                    .snapshot_stats_from_critical(handle)
2467                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2468            ),
2469            Self::Leased(handle) => Box::pin(
2470                data_snapshot
2471                    .snapshot_stats_from_leased(handle)
2472                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2473            ),
2474        }
2475    }
2476}
2477
2478/// State maintained about individual collections.
2479#[derive(Debug, Clone)]
2480struct CollectionState {
2481    /// The primary of this collections.
2482    ///
2483    /// Multiple storage collections can point to the same persist shard,
2484    /// possibly with different schemas. In such a configuration, we select one
2485    /// of the involved collections as the primary, who "owns" the persist
2486    /// shard. All other involved collections have a dependency on the primary.
2487    primary: Option<GlobalId>,
2488
2489    /// Description of how this collection's frontier follows time.
2490    time_dependence: Option<TimeDependence>,
2491    /// The ID of the source remap/progress collection, if this is an ingestion.
2492    ingestion_remap_collection_id: Option<GlobalId>,
2493
2494    /// Accumulation of read capabilities for the collection.
2495    ///
2496    /// This accumulation will always contain `self.implied_capability`, but may
2497    /// also contain capabilities held by others who have read dependencies on
2498    /// this collection.
2499    pub read_capabilities: MutableAntichain<Timestamp>,
2500
2501    /// The implicit capability associated with collection creation.  This
2502    /// should never be less than the since of the associated persist
2503    /// collection.
2504    pub implied_capability: Antichain<Timestamp>,
2505
2506    /// The policy to use to downgrade `self.implied_capability`.
2507    pub read_policy: ReadPolicy,
2508
2509    /// Storage identifiers on which this collection depends.
2510    pub storage_dependencies: Vec<GlobalId>,
2511
2512    /// Reported write frontier.
2513    pub write_frontier: Antichain<Timestamp>,
2514
2515    pub collection_metadata: CollectionMetadata,
2516}
2517
2518impl CollectionState {
2519    /// Creates a new collection state, with an initial read policy valid from
2520    /// `since`.
2521    pub fn new(
2522        primary: Option<GlobalId>,
2523        time_dependence: Option<TimeDependence>,
2524        ingestion_remap_collection_id: Option<GlobalId>,
2525        since: Antichain<Timestamp>,
2526        write_frontier: Antichain<Timestamp>,
2527        storage_dependencies: Vec<GlobalId>,
2528        metadata: CollectionMetadata,
2529    ) -> Self {
2530        let mut read_capabilities = MutableAntichain::new();
2531        read_capabilities.update_iter(since.iter().map(|time| (*time, 1)));
2532        Self {
2533            primary,
2534            time_dependence,
2535            ingestion_remap_collection_id,
2536            read_capabilities,
2537            implied_capability: since.clone(),
2538            read_policy: ReadPolicy::NoPolicy {
2539                initial_since: since,
2540            },
2541            storage_dependencies,
2542            write_frontier,
2543            collection_metadata: metadata,
2544        }
2545    }
2546
2547    /// Returns whether the collection was dropped.
2548    pub fn is_dropped(&self) -> bool {
2549        self.read_capabilities.is_empty()
2550    }
2551}
2552
2553/// A task that keeps persist handles, downgrades sinces when asked,
2554/// periodically gets recent uppers from them, and updates the shard collection
2555/// state when needed.
2556///
2557/// This shares state with [StorageCollectionsImpl] via `Arcs` and channels.
2558#[derive(Debug)]
2559struct BackgroundTask {
2560    config: Arc<Mutex<StorageConfiguration>>,
2561    cmds_tx: mpsc::UnboundedSender<BackgroundCmd>,
2562    cmds_rx: mpsc::UnboundedReceiver<BackgroundCmd>,
2563    holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch<Timestamp>)>,
2564    finalizable_shards: Arc<ShardIdSet>,
2565    collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState>>>,
2566    // So we know what shard ID corresponds to what global ID, which we need
2567    // when re-enqueing futures for determining the next upper update.
2568    shard_by_id: BTreeMap<GlobalId, ShardId>,
2569    since_handles: BTreeMap<GlobalId, SinceHandleWrapper>,
2570    txns_handle: Option<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
2571    txns_shards: BTreeSet<GlobalId>,
2572}
2573
2574#[derive(Debug)]
2575enum BackgroundCmd {
2576    Register {
2577        id: GlobalId,
2578        is_in_txns: bool,
2579        write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
2580        since_handle: SinceHandleWrapper,
2581    },
2582    DowngradeSince(Vec<(GlobalId, Antichain<Timestamp>)>),
2583    SnapshotStats(
2584        GlobalId,
2585        SnapshotStatsAsOf,
2586        oneshot::Sender<SnapshotStatsRes>,
2587    ),
2588}
2589
2590/// A newtype wrapper to hang a Debug impl off of.
2591pub(crate) struct SnapshotStatsRes(BoxFuture<'static, Result<SnapshotStats, StorageError>>);
2592
2593impl Debug for SnapshotStatsRes {
2594    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2595        f.debug_struct("SnapshotStatsRes").finish_non_exhaustive()
2596    }
2597}
2598
2599impl BackgroundTask {
2600    async fn run(&mut self) {
2601        // Futures that fetch the recent upper from all other shards.
2602        let mut upper_futures: FuturesUnordered<
2603            std::pin::Pin<
2604                Box<
2605                    dyn Future<
2606                            Output = (
2607                                GlobalId,
2608                                WriteHandle<SourceData, (), Timestamp, StorageDiff>,
2609                                Antichain<Timestamp>,
2610                            ),
2611                        > + Send,
2612                >,
2613            >,
2614        > = FuturesUnordered::new();
2615
2616        let gen_upper_future =
2617            |id, mut handle: WriteHandle<_, _, _, _>, prev_upper: Antichain<Timestamp>| {
2618                let fut = async move {
2619                    soft_assert_or_log!(
2620                        !prev_upper.is_empty(),
2621                        "cannot await progress when upper is already empty"
2622                    );
2623                    handle.wait_for_upper_past(&prev_upper).await;
2624                    let new_upper = handle.shared_upper();
2625                    (id, handle, new_upper)
2626                };
2627
2628                fut
2629            };
2630
2631        let mut txns_upper_future = match self.txns_handle.take() {
2632            Some(txns_handle) => {
2633                let upper = txns_handle.upper().clone();
2634                let txns_upper_future =
2635                    gen_upper_future(GlobalId::Transient(1), txns_handle, upper);
2636                txns_upper_future.boxed()
2637            }
2638            None => async { std::future::pending().await }.boxed(),
2639        };
2640
2641        loop {
2642            tokio::select! {
2643                (id, handle, upper) = &mut txns_upper_future => {
2644                    trace!("new upper from txns shard: {:?}", upper);
2645                    let mut uppers = Vec::new();
2646                    for id in self.txns_shards.iter() {
2647                        uppers.push((*id, &upper));
2648                    }
2649                    self.update_write_frontiers(&uppers).await;
2650
2651                    let fut = gen_upper_future(id, handle, upper);
2652                    txns_upper_future = fut.boxed();
2653                }
2654                Some((id, handle, upper)) = upper_futures.next() => {
2655                    if id.is_user() {
2656                        trace!("new upper for collection {id}: {:?}", upper);
2657                    }
2658                    let current_shard = self.shard_by_id.get(&id);
2659                    if let Some(shard_id) = current_shard {
2660                        if shard_id == &handle.shard_id() {
2661                            // Still current, so process the update and enqueue
2662                            // again!
2663                            let uppers = &[(id, &upper)];
2664                            self.update_write_frontiers(uppers).await;
2665                            if !upper.is_empty() {
2666                                let fut = gen_upper_future(id, handle, upper);
2667                                upper_futures.push(fut.boxed());
2668                            }
2669                        } else {
2670                            // Be polite and expire the write handle. This can
2671                            // happen when we get an upper update for a write
2672                            // handle that has since been replaced via Update.
2673                            handle.expire().await;
2674                        }
2675                    }
2676                }
2677                cmd = self.cmds_rx.recv() => {
2678                    let Some(cmd) = cmd else {
2679                        // We're done!
2680                        break;
2681                    };
2682
2683                    // Drain all commands so we can merge `DowngradeSince` requests. Without this
2684                    // optimization, downgrading sinces could fall behind in the face of a large
2685                    // amount of storage collections.
2686                    let commands = iter::once(cmd).chain(
2687                        iter::from_fn(|| self.cmds_rx.try_recv().ok())
2688                    );
2689                    let mut downgrades = BTreeMap::<_, Antichain<_>>::new();
2690                    for cmd in commands {
2691                        match cmd {
2692                            BackgroundCmd::Register{
2693                                id,
2694                                is_in_txns,
2695                                write_handle,
2696                                since_handle
2697                            } => {
2698                                debug!("registering handles for {}", id);
2699                                let previous = self.shard_by_id.insert(id, write_handle.shard_id());
2700                                if previous.is_some() {
2701                                    panic!("already registered a WriteHandle for collection {id}");
2702                                }
2703
2704                                let previous = self.since_handles.insert(id, since_handle);
2705                                if previous.is_some() {
2706                                    panic!("already registered a SinceHandle for collection {id}");
2707                                }
2708
2709                                if is_in_txns {
2710                                    self.txns_shards.insert(id);
2711                                } else {
2712                                    let upper = write_handle.upper().clone();
2713                                    if !upper.is_empty() {
2714                                        let fut = gen_upper_future(id, write_handle, upper);
2715                                        upper_futures.push(fut.boxed());
2716                                    }
2717                                }
2718                            }
2719                            BackgroundCmd::DowngradeSince(cmds) => {
2720                                for (id, new) in cmds {
2721                                    downgrades.entry(id)
2722                                        .and_modify(|since| since.join_assign(&new))
2723                                        .or_insert(new);
2724                                }
2725                            }
2726                            BackgroundCmd::SnapshotStats(id, as_of, tx) => {
2727                                // NB: The requested as_of could be arbitrarily far
2728                                // in the future. So, in order to avoid blocking
2729                                // this loop until it's available and the
2730                                // `snapshot_stats` call resolves, instead return
2731                                // the future to the caller and await it there.
2732                                let res = match self.since_handles.get(&id) {
2733                                    Some(x) => {
2734                                        let fut: BoxFuture<
2735                                            'static,
2736                                            Result<SnapshotStats, StorageError>,
2737                                        > = match as_of {
2738                                            SnapshotStatsAsOf::Direct(as_of) => {
2739                                                x.snapshot_stats(id, Some(as_of))
2740                                            }
2741                                            SnapshotStatsAsOf::Txns(data_snapshot) => {
2742                                                x.snapshot_stats_from_txn(id, data_snapshot)
2743                                            }
2744                                        };
2745                                        SnapshotStatsRes(fut)
2746                                    }
2747                                    None => SnapshotStatsRes(Box::pin(futures::future::ready(Err(
2748                                        StorageError::IdentifierMissing(id),
2749                                    )))),
2750                                };
2751                                // It's fine if the listener hung up.
2752                                let _ = tx.send(res);
2753                            }
2754                        }
2755                    }
2756
2757                    if !downgrades.is_empty() {
2758                        self.downgrade_sinces(downgrades).await;
2759                    }
2760                }
2761                Some(holds_changes) = self.holds_rx.recv() => {
2762                    let mut batched_changes = BTreeMap::new();
2763                    batched_changes.insert(holds_changes.0, holds_changes.1);
2764
2765                    while let Ok(mut holds_changes) = self.holds_rx.try_recv() {
2766                        let entry = batched_changes.entry(holds_changes.0);
2767                        entry
2768                            .and_modify(|existing| existing.extend(holds_changes.1.drain()))
2769                            .or_insert_with(|| holds_changes.1);
2770                    }
2771
2772                    let mut collections = self.collections.lock().expect("lock poisoned");
2773
2774                    let user_changes = batched_changes
2775                        .iter()
2776                        .filter(|(id, _c)| id.is_user())
2777                        .map(|(id, c)| {
2778                            (id.clone(), c.clone())
2779                        })
2780                        .collect_vec();
2781
2782                    if !user_changes.is_empty() {
2783                        trace!(?user_changes, "applying holds changes from channel");
2784                    }
2785
2786                    StorageCollectionsImpl::update_read_capabilities_inner(
2787                        &self.cmds_tx,
2788                        &mut collections,
2789                        &mut batched_changes,
2790                    );
2791                }
2792            }
2793        }
2794
2795        warn!("BackgroundTask shutting down");
2796    }
2797
2798    #[instrument(level = "debug")]
2799    async fn update_write_frontiers(&self, updates: &[(GlobalId, &Antichain<Timestamp>)]) {
2800        let mut read_capability_changes = BTreeMap::default();
2801
2802        let mut self_collections = self.collections.lock().expect("lock poisoned");
2803
2804        for (id, new_upper) in updates.iter() {
2805            let collection = if let Some(c) = self_collections.get_mut(id) {
2806                c
2807            } else {
2808                trace!(
2809                    "Reference to absent collection {id}, due to concurrent removal of that collection"
2810                );
2811                continue;
2812            };
2813
2814            if PartialOrder::less_than(&collection.write_frontier, *new_upper) {
2815                collection.write_frontier.clone_from(new_upper);
2816            }
2817
2818            let mut new_read_capability = collection
2819                .read_policy
2820                .frontier(collection.write_frontier.borrow());
2821
2822            if id.is_user() {
2823                trace!(
2824                    %id,
2825                    implied_capability = ?collection.implied_capability,
2826                    policy = ?collection.read_policy,
2827                    write_frontier = ?collection.write_frontier,
2828                    ?new_read_capability,
2829                    "update_write_frontiers");
2830            }
2831
2832            if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
2833                let mut update = ChangeBatch::new();
2834                update.extend(new_read_capability.iter().map(|time| (*time, 1)));
2835                std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
2836                update.extend(new_read_capability.iter().map(|time| (*time, -1)));
2837
2838                if !update.is_empty() {
2839                    read_capability_changes.insert(*id, update);
2840                }
2841            }
2842        }
2843
2844        if !read_capability_changes.is_empty() {
2845            StorageCollectionsImpl::update_read_capabilities_inner(
2846                &self.cmds_tx,
2847                &mut self_collections,
2848                &mut read_capability_changes,
2849            );
2850        }
2851    }
2852
2853    async fn downgrade_sinces(&mut self, cmds: BTreeMap<GlobalId, Antichain<Timestamp>>) {
2854        // Process all persist calls concurrently.
2855        let mut futures = Vec::with_capacity(cmds.len());
2856        for (id, new_since) in cmds {
2857            // We need to take the since handles here, to satisfy the borrow checker.
2858            // We make sure to always put them back below.
2859            let Some(mut since_handle) = self.since_handles.remove(&id) else {
2860                // This can happen when someone concurrently drops a collection.
2861                trace!("downgrade_sinces: reference to absent collection {id}");
2862                continue;
2863            };
2864
2865            let fut = async move {
2866                if id.is_user() {
2867                    trace!("downgrading since of {} to {:?}", id, new_since);
2868                }
2869
2870                let epoch = since_handle.opaque().clone();
2871                let result = if new_since.is_empty() {
2872                    // A shard's since reaching the empty frontier is a prereq for
2873                    // being able to finalize a shard, so the final downgrade should
2874                    // never be rate-limited.
2875                    Some(
2876                        since_handle
2877                            .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2878                            .await,
2879                    )
2880                } else {
2881                    since_handle
2882                        .maybe_compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2883                        .await
2884                };
2885                (id, since_handle, result)
2886            };
2887            futures.push(fut);
2888        }
2889
2890        for (id, since_handle, result) in futures::future::join_all(futures).await {
2891            let new_since = match result {
2892                Some(Ok(since)) => Some(since),
2893                Some(Err(other_epoch)) => mz_ore::halt!(
2894                    "fenced by envd @ {other_epoch:?}. ours = {:?}",
2895                    since_handle.opaque(),
2896                ),
2897                None => None,
2898            };
2899
2900            self.since_handles.insert(id, since_handle);
2901
2902            if new_since.is_some_and(|s| s.is_empty()) {
2903                info!(%id, "removing persist handles because the since advanced to []!");
2904
2905                let _since_handle = self.since_handles.remove(&id).expect("known to exist");
2906                let Some(dropped_shard_id) = self.shard_by_id.remove(&id) else {
2907                    panic!("missing GlobalId -> ShardId mapping for id {id}");
2908                };
2909
2910                // We're not responsible for writes to tables, so we also don't
2911                // de-register them from the txn system. Whoever is responsible
2912                // will remove them. We only make sure to remove the table from
2913                // our tracking.
2914                self.txns_shards.remove(&id);
2915
2916                if self
2917                    .config
2918                    .lock()
2919                    .expect("lock poisoned")
2920                    .parameters
2921                    .finalize_shards
2922                {
2923                    info!(
2924                        %id, %dropped_shard_id,
2925                        "enqueuing shard finalization due to dropped collection and dropped \
2926                         persist handle",
2927                    );
2928                    self.finalizable_shards.lock().insert(dropped_shard_id);
2929                } else {
2930                    info!(
2931                        "not triggering shard finalization due to dropped storage object \
2932                         because enable_storage_shard_finalization parameter is false"
2933                    );
2934                }
2935            }
2936        }
2937    }
2938}
2939
2940struct FinalizeShardsTaskConfig {
2941    envd_epoch: NonZeroI64,
2942    config: Arc<Mutex<StorageConfiguration>>,
2943    metrics: StorageCollectionsMetrics,
2944    finalizable_shards: Arc<ShardIdSet>,
2945    finalized_shards: Arc<ShardIdSet>,
2946    persist_location: PersistLocation,
2947    persist: Arc<PersistClientCache>,
2948    read_only: bool,
2949}
2950
2951async fn finalize_shards_task(
2952    FinalizeShardsTaskConfig {
2953        envd_epoch,
2954        config,
2955        metrics,
2956        finalizable_shards,
2957        finalized_shards,
2958        persist_location,
2959        persist,
2960        read_only,
2961    }: FinalizeShardsTaskConfig,
2962) {
2963    if read_only {
2964        info!("disabling shard finalization in read only mode");
2965        return;
2966    }
2967
2968    let mut interval = tokio::time::interval(Duration::from_secs(5));
2969    interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
2970    loop {
2971        interval.tick().await;
2972
2973        if !config
2974            .lock()
2975            .expect("lock poisoned")
2976            .parameters
2977            .finalize_shards
2978        {
2979            debug!(
2980                "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
2981            );
2982            continue;
2983        }
2984
2985        let current_finalizable_shards = {
2986            // We hold the lock for as short as possible and pull our cloned set
2987            // of shards.
2988            finalizable_shards.lock().iter().cloned().collect_vec()
2989        };
2990
2991        if current_finalizable_shards.is_empty() {
2992            debug!("no shards to finalize");
2993            continue;
2994        }
2995
2996        debug!(?current_finalizable_shards, "attempting to finalize shards");
2997
2998        // Open a persist client to delete unused shards.
2999        let persist_client = persist.open(persist_location.clone()).await.unwrap();
3000
3001        let metrics = &metrics;
3002        let finalizable_shards = &finalizable_shards;
3003        let finalized_shards = &finalized_shards;
3004        let persist_client = &persist_client;
3005        let diagnostics = &Diagnostics::from_purpose("finalizing shards");
3006
3007        let force_downgrade_since = STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION
3008            .get(config.lock().expect("lock poisoned").config_set());
3009
3010        let epoch = &PersistEpoch::from(envd_epoch);
3011
3012        futures::stream::iter(current_finalizable_shards.clone())
3013            .map(|shard_id| async move {
3014                let persist_client = persist_client.clone();
3015                let diagnostics = diagnostics.clone();
3016                let epoch = epoch.clone();
3017
3018                metrics.finalization_started.inc();
3019
3020                let is_finalized = persist_client
3021                    .is_finalized::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics)
3022                    .await
3023                    .expect("invalid persist usage");
3024
3025                if is_finalized {
3026                    debug!(%shard_id, "shard is already finalized!");
3027                    Some(shard_id)
3028                } else {
3029                    debug!(%shard_id, "finalizing shard");
3030                    let finalize = || async move {
3031                        // TODO: thread the global ID into the shard finalization WAL
3032                        let diagnostics = Diagnostics::from_purpose("finalizing shards");
3033
3034                        // We only use the writer to advance the upper, so using a dummy schema is
3035                        // fine.
3036                        let mut write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff> =
3037                            persist_client
3038                                .open_writer(
3039                                    shard_id,
3040                                    Arc::new(RelationDesc::empty()),
3041                                    Arc::new(UnitSchema),
3042                                    diagnostics,
3043                                )
3044                                .await
3045                                .expect("invalid persist usage");
3046                        write_handle.advance_upper(&Antichain::new()).await;
3047                        write_handle.expire().await;
3048
3049                        if force_downgrade_since {
3050                            let our_opaque = Opaque::encode(&epoch);
3051                            let mut since_handle: SinceHandle<
3052                                SourceData,
3053                                (),
3054                                Timestamp,
3055                                StorageDiff,
3056                            > = persist_client
3057                                .open_critical_since(
3058                                    shard_id,
3059                                    PersistClient::CONTROLLER_CRITICAL_SINCE,
3060                                    our_opaque.clone(),
3061                                    Diagnostics::from_purpose("finalizing shards"),
3062                                )
3063                                .await
3064                                .expect("invalid persist usage");
3065                            let handle_opaque = since_handle.opaque().clone();
3066                            let opaque = if our_opaque.codec_name() == handle_opaque.codec_name()
3067                                && epoch.0 > handle_opaque.decode::<PersistEpoch>().0
3068                            {
3069                                // We're newer, but it's fine to use the
3070                                // handle's old epoch to try and downgrade.
3071                                handle_opaque
3072                            } else {
3073                                // Good luck, buddy! The downgrade below will
3074                                // not succeed. There's a process with a newer
3075                                // epoch out there and someone at some juncture
3076                                // will fence out this process.
3077                                // TODO: consider applying the downgrade no matter what!
3078                                our_opaque
3079                            };
3080                            let new_since = Antichain::new();
3081                            let downgrade = since_handle
3082                                .compare_and_downgrade_since(&opaque, (&opaque, &new_since))
3083                                .await;
3084                            if let Err(e) = downgrade {
3085                                warn!("tried to finalize a shard with an advancing epoch: {e:?}");
3086                                return Ok(());
3087                            }
3088                            // Not available now, so finalization is broken.
3089                            // since_handle.expire().await;
3090                        }
3091
3092                        persist_client
3093                            .finalize_shard::<SourceData, (), Timestamp, StorageDiff>(
3094                                shard_id,
3095                                Diagnostics::from_purpose("finalizing shards"),
3096                            )
3097                            .await
3098                    };
3099
3100                    match finalize().await {
3101                        Err(e) => {
3102                            // Rather than error, just leave this shard as
3103                            // one to finalize later.
3104                            warn!("error during finalization of shard {shard_id}: {e:?}");
3105                            None
3106                        }
3107                        Ok(()) => {
3108                            debug!(%shard_id, "finalize success!");
3109                            Some(shard_id)
3110                        }
3111                    }
3112                }
3113            })
3114            // Poll each future for each collection concurrently, maximum of 10
3115            // at a time.
3116            // TODO(benesch): the concurrency here should be configurable
3117            // via LaunchDarkly.
3118            .buffer_unordered(10)
3119            // HERE BE DRAGONS: see warning on other uses of buffer_unordered.
3120            // The closure passed to `for_each` must remain fast or we risk
3121            // starving the finalization futures of calls to `poll`.
3122            .for_each(|shard_id| async move {
3123                match shard_id {
3124                    None => metrics.finalization_failed.inc(),
3125                    Some(shard_id) => {
3126                        // We make successfully finalized shards available for
3127                        // removal from the finalization WAL one by one, so that
3128                        // a handful of stuck shards don't prevent us from
3129                        // removing the shards that have made progress. The
3130                        // overhead of repeatedly acquiring and releasing the
3131                        // locks is negligible.
3132                        {
3133                            let mut finalizable_shards = finalizable_shards.lock();
3134                            let mut finalized_shards = finalized_shards.lock();
3135                            finalizable_shards.remove(&shard_id);
3136                            finalized_shards.insert(shard_id);
3137                        }
3138
3139                        metrics.finalization_succeeded.inc();
3140                    }
3141                }
3142            })
3143            .await;
3144
3145        debug!("done finalizing shards");
3146    }
3147}
3148
3149#[derive(Debug)]
3150pub(crate) enum SnapshotStatsAsOf {
3151    /// Stats for a shard with an "eager" upper (one that continually advances
3152    /// as time passes, even if no writes are coming in).
3153    Direct(Antichain<Timestamp>),
3154    /// Stats for a shard with a "lazy" upper (one that only physically advances
3155    /// in response to writes).
3156    Txns(DataSnapshot<Timestamp>),
3157}
3158
3159#[cfg(test)]
3160mod tests {
3161    use std::str::FromStr;
3162    use std::sync::Arc;
3163
3164    use mz_build_info::DUMMY_BUILD_INFO;
3165    use mz_dyncfg::ConfigSet;
3166    use mz_ore::assert_err;
3167    use mz_ore::metrics::{MetricsRegistry, UIntGauge};
3168    use mz_ore::now::SYSTEM_TIME;
3169    use mz_ore::url::SensitiveUrl;
3170    use mz_persist_client::cache::PersistClientCache;
3171    use mz_persist_client::cfg::PersistConfig;
3172    use mz_persist_client::rpc::PubSubClientConnection;
3173    use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
3174    use mz_persist_types::codec_impls::UnitSchema;
3175    use mz_repr::{RelationDesc, Row};
3176    use mz_secrets::InMemorySecretsController;
3177
3178    use super::*;
3179
3180    #[mz_ore::test(tokio::test)]
3181    #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr`
3182    async fn test_snapshot_stats(&self) {
3183        let persist_location = PersistLocation {
3184            blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3185            consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3186        };
3187        let persist_client = PersistClientCache::new(
3188            PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
3189            &MetricsRegistry::new(),
3190            |_, _| PubSubClientConnection::noop(),
3191        );
3192        let persist_client = Arc::new(persist_client);
3193
3194        let (cmds_tx, mut background_task) =
3195            BackgroundTask::new_for_test(persist_location.clone(), Arc::clone(&persist_client));
3196        let background_task =
3197            mz_ore::task::spawn(|| "storage_collections::background_task", async move {
3198                background_task.run().await
3199            });
3200
3201        let persist = persist_client.open(persist_location).await.unwrap();
3202
3203        let shard_id = ShardId::new();
3204        let since_handle = persist
3205            .open_critical_since(
3206                shard_id,
3207                PersistClient::CONTROLLER_CRITICAL_SINCE,
3208                Opaque::encode(&PersistEpoch::default()),
3209                Diagnostics::for_tests(),
3210            )
3211            .await
3212            .unwrap();
3213        let write_handle = persist
3214            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3215                shard_id,
3216                Arc::new(RelationDesc::empty()),
3217                Arc::new(UnitSchema),
3218                Diagnostics::for_tests(),
3219            )
3220            .await
3221            .unwrap();
3222
3223        cmds_tx
3224            .send(BackgroundCmd::Register {
3225                id: GlobalId::User(1),
3226                is_in_txns: false,
3227                since_handle: SinceHandleWrapper::Critical(since_handle),
3228                write_handle,
3229            })
3230            .unwrap();
3231
3232        let mut write_handle = persist
3233            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3234                shard_id,
3235                Arc::new(RelationDesc::empty()),
3236                Arc::new(UnitSchema),
3237                Diagnostics::for_tests(),
3238            )
3239            .await
3240            .unwrap();
3241
3242        // No stats for unknown GlobalId.
3243        let stats =
3244            snapshot_stats(&cmds_tx, GlobalId::User(2), Antichain::from_elem(0.into())).await;
3245        assert_err!(stats);
3246
3247        // Stats don't resolve for as_of past the upper.
3248        let stats_fut = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3249        assert_none!(stats_fut.now_or_never());
3250
3251        // // Call it again because now_or_never consumed our future and it's not clone-able.
3252        let stats_ts1_fut =
3253            snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3254
3255        // Write some data.
3256        let data = (
3257            (SourceData(Ok(Row::default())), ()),
3258            mz_repr::Timestamp::from(0),
3259            1i64,
3260        );
3261        let () = write_handle
3262            .compare_and_append(
3263                &[data],
3264                Antichain::from_elem(0.into()),
3265                Antichain::from_elem(1.into()),
3266            )
3267            .await
3268            .unwrap()
3269            .unwrap();
3270
3271        // Verify that we can resolve stats for ts 0 while the ts 1 stats call is outstanding.
3272        let stats = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(0.into()))
3273            .await
3274            .unwrap();
3275        assert_eq!(stats.num_updates, 1);
3276
3277        // Write more data and unblock the ts 1 call
3278        let data = (
3279            (SourceData(Ok(Row::default())), ()),
3280            mz_repr::Timestamp::from(1),
3281            1i64,
3282        );
3283        let () = write_handle
3284            .compare_and_append(
3285                &[data],
3286                Antichain::from_elem(1.into()),
3287                Antichain::from_elem(2.into()),
3288            )
3289            .await
3290            .unwrap()
3291            .unwrap();
3292
3293        let stats = stats_ts1_fut.await.unwrap();
3294        assert_eq!(stats.num_updates, 2);
3295
3296        // Make sure it runs until at least here.
3297        drop(background_task);
3298    }
3299
3300    async fn snapshot_stats(
3301        cmds_tx: &mpsc::UnboundedSender<BackgroundCmd>,
3302        id: GlobalId,
3303        as_of: Antichain<Timestamp>,
3304    ) -> Result<SnapshotStats, StorageError> {
3305        let (tx, rx) = oneshot::channel();
3306        cmds_tx
3307            .send(BackgroundCmd::SnapshotStats(
3308                id,
3309                SnapshotStatsAsOf::Direct(as_of),
3310                tx,
3311            ))
3312            .unwrap();
3313        let res = rx.await.expect("BackgroundTask should be live").0;
3314
3315        res.await
3316    }
3317
3318    impl BackgroundTask {
3319        fn new_for_test(
3320            _persist_location: PersistLocation,
3321            _persist_client: Arc<PersistClientCache>,
3322        ) -> (mpsc::UnboundedSender<BackgroundCmd>, Self) {
3323            let (cmds_tx, cmds_rx) = mpsc::unbounded_channel();
3324            let (_holds_tx, holds_rx) = mpsc::unbounded_channel();
3325            let connection_context =
3326                ConnectionContext::for_tests(Arc::new(InMemorySecretsController::new()));
3327
3328            let task = Self {
3329                config: Arc::new(Mutex::new(StorageConfiguration::new(
3330                    connection_context,
3331                    ConfigSet::default(),
3332                ))),
3333                cmds_tx: cmds_tx.clone(),
3334                cmds_rx,
3335                holds_rx,
3336                finalizable_shards: Arc::new(ShardIdSet::new(
3337                    UIntGauge::new("finalizable_shards", "dummy gauge for tests").unwrap(),
3338                )),
3339                collections: Arc::new(Mutex::new(BTreeMap::new())),
3340                shard_by_id: BTreeMap::new(),
3341                since_handles: BTreeMap::new(),
3342                txns_handle: None,
3343                txns_shards: BTreeSet::new(),
3344            };
3345
3346            (cmds_tx, task)
3347        }
3348    }
3349}