mz_storage_controller/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of the storage controller trait.
11
12use std::any::Any;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::{Debug, Display};
15use std::num::NonZeroI64;
16use std::str::FromStr;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use async_trait::async_trait;
21use chrono::{DateTime, DurationRound, TimeDelta, Utc};
22use derivative::Derivative;
23use differential_dataflow::lattice::Lattice;
24use futures::FutureExt;
25use futures::StreamExt;
26use itertools::Itertools;
27use mz_build_info::BuildInfo;
28use mz_cluster_client::client::ClusterReplicaLocation;
29use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
30use mz_cluster_client::{ReplicaId, WallclockLagFn};
31use mz_controller_types::dyncfgs::{
32    ENABLE_0DT_DEPLOYMENT_SOURCES, ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION,
33    WALLCLOCK_LAG_RECORDING_INTERVAL,
34};
35use mz_ore::collections::CollectionExt;
36use mz_ore::metrics::MetricsRegistry;
37use mz_ore::now::{EpochMillis, NowFn};
38use mz_ore::task::AbortOnDropHandle;
39use mz_ore::{assert_none, instrument, soft_panic_or_log};
40use mz_persist_client::batch::ProtoBatch;
41use mz_persist_client::cache::PersistClientCache;
42use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
43use mz_persist_client::read::ReadHandle;
44use mz_persist_client::schema::CaESchema;
45use mz_persist_client::write::WriteHandle;
46use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
47use mz_persist_types::Codec64;
48use mz_persist_types::codec_impls::UnitSchema;
49use mz_proto::RustType;
50use mz_repr::adt::interval::Interval;
51use mz_repr::adt::timestamp::CheckedTimestamp;
52use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
53use mz_storage_client::client::{
54    AppendOnlyUpdate, ProtoStorageCommand, ProtoStorageResponse, RunIngestionCommand,
55    RunOneshotIngestion, RunSinkCommand, Status, StatusUpdate, StorageCommand, StorageResponse,
56    TableData,
57};
58use mz_storage_client::controller::{
59    BoxFuture, CollectionDescription, DataSource, ExportDescription, ExportState,
60    IntrospectionType, MonotonicAppender, PersistEpoch, Response, StorageController,
61    StorageMetadata, StorageTxn, StorageWriteOp, WallclockLagHistogramPeriod,
62};
63use mz_storage_client::healthcheck::{
64    MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC,
65    MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_STATUS_HISTORY_DESC,
66};
67use mz_storage_client::metrics::StorageControllerMetrics;
68use mz_storage_client::statistics::{
69    SinkStatisticsUpdate, SourceStatisticsUpdate, WebhookStatistics,
70};
71use mz_storage_client::storage_collections::StorageCollections;
72use mz_storage_types::configuration::StorageConfiguration;
73use mz_storage_types::connections::ConnectionContext;
74use mz_storage_types::connections::inline::InlinedConnection;
75use mz_storage_types::controller::{AlterError, CollectionMetadata, StorageError, TxnsCodecRow};
76use mz_storage_types::instances::StorageInstanceId;
77use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
78use mz_storage_types::parameters::StorageParameters;
79use mz_storage_types::read_holds::ReadHold;
80use mz_storage_types::read_policy::ReadPolicy;
81use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
82use mz_storage_types::sources::{
83    GenericSourceConnection, IngestionDescription, SourceConnection, SourceData, SourceDesc,
84    SourceExport, SourceExportDataConfig,
85};
86use mz_storage_types::{AlterCompatible, StorageDiff, dyncfgs};
87use mz_txn_wal::metrics::Metrics as TxnMetrics;
88use mz_txn_wal::txn_read::TxnsRead;
89use mz_txn_wal::txns::TxnsHandle;
90use timely::order::{PartialOrder, TotalOrder};
91use timely::progress::Timestamp as TimelyTimestamp;
92use timely::progress::frontier::MutableAntichain;
93use timely::progress::{Antichain, ChangeBatch, Timestamp};
94use tokio::sync::watch::{Sender, channel};
95use tokio::sync::{mpsc, oneshot};
96use tokio::time::MissedTickBehavior;
97use tokio::time::error::Elapsed;
98use tracing::{debug, info, warn};
99
100use crate::collection_mgmt::{
101    AppendOnlyIntrospectionConfig, CollectionManagerKind, DifferentialIntrospectionConfig,
102};
103use crate::instance::{Instance, ReplicaConfig};
104use crate::statistics::StatsState;
105
106mod collection_mgmt;
107mod history;
108mod instance;
109mod persist_handles;
110mod rtr;
111mod statistics;
112
113#[derive(Derivative)]
114#[derivative(Debug)]
115struct PendingOneshotIngestion {
116    /// Callback used to provide results of the ingestion.
117    #[derivative(Debug = "ignore")]
118    result_tx: OneshotResultCallback<ProtoBatch>,
119    /// Cluster currently running this ingestion
120    cluster_id: StorageInstanceId,
121}
122
123impl PendingOneshotIngestion {
124    /// Consume the pending ingestion, responding with a cancelation message.
125    ///
126    /// TODO(cf2): Refine these error messages so they're not stringly typed.
127    pub(crate) fn cancel(self) {
128        (self.result_tx)(vec![Err("canceled".to_string())])
129    }
130}
131
132/// A storage controller for a storage instance.
133#[derive(Derivative)]
134#[derivative(Debug)]
135pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation>
136{
137    /// The build information for this process.
138    build_info: &'static BuildInfo,
139    /// A function that returns the current time.
140    now: NowFn,
141    /// The fencing token for this instance of the controller.
142    envd_epoch: NonZeroI64,
143
144    /// Whether or not this controller is in read-only mode.
145    ///
146    /// When in read-only mode, neither this controller nor the instances
147    /// controlled by it are allowed to affect changes to external systems
148    /// (largely persist).
149    read_only: bool,
150
151    /// Collections maintained by the storage controller.
152    ///
153    /// This collection only grows, although individual collections may be rendered unusable.
154    /// This is to prevent the re-binding of identifiers to other descriptions.
155    pub(crate) collections: BTreeMap<GlobalId, CollectionState<T>>,
156
157    /// Map from IDs of objects that have been dropped to replicas we are still
158    /// expecting DroppedId messages from. This is cleared out once all replicas
159    /// have responded.
160    ///
161    /// We use this only to catch problems in the protocol between controller
162    /// and replicas, for example we can differentiate between late messages for
163    /// objects that have already been dropped and unexpected (read erroneous)
164    /// messages from the replica.
165    dropped_objects: BTreeMap<GlobalId, BTreeSet<ReplicaId>>,
166
167    /// Write handle for table shards.
168    pub(crate) persist_table_worker: persist_handles::PersistTableWriteWorker<T>,
169    /// A shared TxnsCache running in a task and communicated with over a channel.
170    txns_read: TxnsRead<T>,
171    txns_metrics: Arc<TxnMetrics>,
172    stashed_responses: Vec<(Option<ReplicaId>, StorageResponse<T>)>,
173    /// Channel for sending table handle drops.
174    #[derivative(Debug = "ignore")]
175    pending_table_handle_drops_tx: mpsc::UnboundedSender<GlobalId>,
176    /// Channel for receiving table handle drops.
177    #[derivative(Debug = "ignore")]
178    pending_table_handle_drops_rx: mpsc::UnboundedReceiver<GlobalId>,
179    /// Closures that can be used to send responses from oneshot ingestions.
180    #[derivative(Debug = "ignore")]
181    pending_oneshot_ingestions: BTreeMap<uuid::Uuid, PendingOneshotIngestion>,
182
183    /// Interface for managed collections
184    pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
185
186    /// Tracks which collection is responsible for which [`IntrospectionType`].
187    pub(crate) introspection_ids: BTreeMap<IntrospectionType, GlobalId>,
188    /// Tokens for tasks that drive updating introspection collections. Dropping
189    /// this will make sure that any tasks (or other resources) will stop when
190    /// needed.
191    // TODO(aljoscha): Should these live somewhere else?
192    introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
193
194    // The following two fields must always be locked in order.
195    /// Consolidated metrics updates to periodically write. We do not eagerly initialize this,
196    /// and its contents are entirely driven by `StorageResponse::StatisticsUpdates`'s, as well
197    /// as webhook statistics.
198    source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
199    /// Consolidated metrics updates to periodically write. We do not eagerly initialize this,
200    /// and its contents are entirely driven by `StorageResponse::StatisticsUpdates`'s.
201    sink_statistics: Arc<Mutex<BTreeMap<GlobalId, statistics::StatsState<SinkStatisticsUpdate>>>>,
202    /// A way to update the statistics interval in the statistics tasks.
203    statistics_interval_sender: Sender<Duration>,
204
205    /// Clients for all known storage instances.
206    instances: BTreeMap<StorageInstanceId, Instance<T>>,
207    /// Set to `true` once `initialization_complete` has been called.
208    initialized: bool,
209    /// Storage configuration to apply to newly provisioned instances, and use during purification.
210    config: StorageConfiguration,
211    /// The persist location where all storage collections are being written to
212    persist_location: PersistLocation,
213    /// A persist client used to write to storage collections
214    persist: Arc<PersistClientCache>,
215    /// Metrics of the Storage controller
216    metrics: StorageControllerMetrics,
217    /// `(read, write)` frontiers that have been recorded in the `Frontiers` collection, kept to be
218    /// able to retract old rows.
219    recorded_frontiers: BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)>,
220    /// Write frontiers that have been recorded in the `ReplicaFrontiers` collection, kept to be
221    /// able to retract old rows.
222    recorded_replica_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<T>>,
223
224    /// A function that computes the lag between the given time and wallclock time.
225    #[derivative(Debug = "ignore")]
226    wallclock_lag: WallclockLagFn<T>,
227    /// The last time wallclock lag introspection was recorded.
228    wallclock_lag_last_recorded: DateTime<Utc>,
229
230    /// Handle to a [StorageCollections].
231    storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
232    /// Migrated storage collections that can be written even in read only mode.
233    migrated_storage_collections: BTreeSet<GlobalId>,
234
235    /// Ticker for scheduling periodic maintenance work.
236    maintenance_ticker: tokio::time::Interval,
237    /// Whether maintenance work was scheduled.
238    maintenance_scheduled: bool,
239
240    /// Shared transmit channel for replicas to send responses.
241    instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
242    /// Receive end for replica responses.
243    instance_response_rx: mpsc::UnboundedReceiver<(Option<ReplicaId>, StorageResponse<T>)>,
244
245    /// Background task run at startup to warm persist state.
246    persist_warm_task: Option<AbortOnDropHandle<Box<dyn Debug + Send>>>,
247}
248
249/// Warm up persist state for `shard_ids` in a background task.
250///
251/// With better parallelism during startup this would likely be unnecessary, but empirically we see
252/// some nice speedups with this relatively simple function.
253fn warm_persist_state_in_background(
254    client: PersistClient,
255    shard_ids: impl Iterator<Item = ShardId> + Send + 'static,
256) -> mz_ore::task::JoinHandle<Box<dyn Debug + Send>> {
257    /// Bound the number of shards that we warm at a single time, to limit our overall resource use.
258    const MAX_CONCURRENT_WARMS: usize = 16;
259    let logic = async move {
260        let fetchers: Vec<_> = tokio_stream::iter(shard_ids)
261            .map(|shard_id| {
262                let client = client.clone();
263                async move {
264                    client
265                        .create_batch_fetcher::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
266                            shard_id,
267                            Arc::new(RelationDesc::empty()),
268                            Arc::new(UnitSchema),
269                            true,
270                            Diagnostics::from_purpose("warm persist load state"),
271                        )
272                        .await
273                }
274            })
275            .buffer_unordered(MAX_CONCURRENT_WARMS)
276            .collect()
277            .await;
278        let fetchers: Box<dyn Debug + Send> = Box::new(fetchers);
279        fetchers
280    };
281    mz_ore::task::spawn(|| "warm_persist_load_state", logic)
282}
283
284#[async_trait(?Send)]
285impl<T> StorageController for Controller<T>
286where
287    T: Timestamp
288        + Lattice
289        + TotalOrder
290        + Codec64
291        + From<EpochMillis>
292        + TimestampManipulation
293        + Into<Datum<'static>>
294        + Display,
295    StorageCommand<T>: RustType<ProtoStorageCommand>,
296    StorageResponse<T>: RustType<ProtoStorageResponse>,
297{
298    type Timestamp = T;
299
300    fn initialization_complete(&mut self) {
301        self.reconcile_dangling_statistics();
302        self.initialized = true;
303
304        for instance in self.instances.values_mut() {
305            instance.send(StorageCommand::InitializationComplete);
306        }
307    }
308
309    fn update_parameters(&mut self, config_params: StorageParameters) {
310        self.storage_collections
311            .update_parameters(config_params.clone());
312
313        // We serialize the dyncfg updates in StorageParameters, but configure
314        // persist separately.
315        self.persist.cfg().apply_from(&config_params.dyncfg_updates);
316
317        for instance in self.instances.values_mut() {
318            let params = Box::new(config_params.clone());
319            instance.send(StorageCommand::UpdateConfiguration(params));
320        }
321        self.config.update(config_params);
322        self.statistics_interval_sender
323            .send_replace(self.config.parameters.statistics_interval);
324        self.collection_manager.update_user_batch_duration(
325            self.config
326                .parameters
327                .user_storage_managed_collections_batch_duration,
328        );
329    }
330
331    /// Get the current configuration
332    fn config(&self) -> &StorageConfiguration {
333        &self.config
334    }
335
336    fn collection_metadata(
337        &self,
338        id: GlobalId,
339    ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>> {
340        self.storage_collections.collection_metadata(id)
341    }
342
343    fn collection_hydrated(
344        &self,
345        collection_id: GlobalId,
346    ) -> Result<bool, StorageError<Self::Timestamp>> {
347        let collection = self.collection(collection_id)?;
348
349        let instance_id = match &collection.data_source {
350            DataSource::Ingestion(ingestion_description) => ingestion_description.instance_id,
351            DataSource::IngestionExport { ingestion_id, .. } => {
352                let ingestion_state = self.collections.get(ingestion_id).expect("known to exist");
353
354                let instance_id = match &ingestion_state.data_source {
355                    DataSource::Ingestion(ingestion_desc) => ingestion_desc.instance_id,
356                    _ => unreachable!("SourceExport must only refer to primary source"),
357                };
358
359                instance_id
360            }
361            _ => return Ok(true),
362        };
363
364        let instance = self.instances.get(&instance_id).ok_or_else(|| {
365            StorageError::IngestionInstanceMissing {
366                storage_instance_id: instance_id,
367                ingestion_id: collection_id,
368            }
369        })?;
370
371        if instance.replica_ids().next().is_none() {
372            // Ingestions on zero-replica clusters are always considered
373            // hydrated.
374            return Ok(true);
375        }
376
377        match &collection.extra_state {
378            CollectionStateExtra::Ingestion(ingestion_state) => {
379                // An ingestion is hydrated if it is hydrated on at least one replica.
380                Ok(ingestion_state.hydrated_on.len() >= 1)
381            }
382            CollectionStateExtra::Export(_) => {
383                // For now, sinks are always considered hydrated. We rely on
384                // them starting up "instantly" and don't wait for them when
385                // checking hydration status of a replica.  TODO(sinks): base
386                // this off of the sink shard's frontier?
387                Ok(true)
388            }
389            CollectionStateExtra::None => {
390                // For now, objects that are not ingestions are always
391                // considered hydrated. This is tables and webhooks, as of
392                // today.
393                Ok(true)
394            }
395        }
396    }
397
398    #[mz_ore::instrument(level = "debug")]
399    fn collections_hydrated_on_replicas(
400        &self,
401        target_replica_ids: Option<Vec<ReplicaId>>,
402        exclude_collections: &BTreeSet<GlobalId>,
403    ) -> Result<bool, StorageError<Self::Timestamp>> {
404        // If target_replica_ids is provided, use it as the set of target
405        // replicas. Otherwise check for hydration on any replica.
406        let target_replicas: Option<BTreeSet<ReplicaId>> =
407            target_replica_ids.map(|ids| ids.into_iter().collect());
408
409        let mut all_hydrated = true;
410        for (collection_id, collection_state) in self.collections.iter() {
411            if collection_id.is_transient() || exclude_collections.contains(collection_id) {
412                continue;
413            }
414            let hydrated = match &collection_state.extra_state {
415                CollectionStateExtra::Ingestion(state) => {
416                    match &target_replicas {
417                        Some(target_replicas) => !state.hydrated_on.is_disjoint(target_replicas),
418                        None => {
419                            // Not target replicas, so check that it's hydrated
420                            // on at least one replica.
421                            state.hydrated_on.len() >= 1
422                        }
423                    }
424                }
425                CollectionStateExtra::Export(_) => {
426                    // For now, sinks are always considered hydrated. We rely on
427                    // them starting up "instantly" and don't wait for them when
428                    // checking hydration status of a replica.  TODO(sinks):
429                    // base this off of the sink shard's frontier?
430                    true
431                }
432                CollectionStateExtra::None => {
433                    // For now, objects that are not ingestions are always
434                    // considered hydrated. This is tables and webhooks, as of
435                    // today.
436                    true
437                }
438            };
439            if !hydrated {
440                tracing::info!(%collection_id, "collection is not hydrated on any replica");
441                all_hydrated = false;
442                // We continue with our loop instead of breaking out early, so
443                // that we log all non-hydrated replicas.
444            }
445        }
446        Ok(all_hydrated)
447    }
448
449    fn collection_frontiers(
450        &self,
451        id: GlobalId,
452    ) -> Result<
453        (Antichain<Self::Timestamp>, Antichain<Self::Timestamp>),
454        StorageError<Self::Timestamp>,
455    > {
456        let frontiers = self.storage_collections.collection_frontiers(id)?;
457        Ok((frontiers.implied_capability, frontiers.write_frontier))
458    }
459
460    fn collections_frontiers(
461        &self,
462        mut ids: Vec<GlobalId>,
463    ) -> Result<Vec<(GlobalId, Antichain<T>, Antichain<T>)>, StorageError<Self::Timestamp>> {
464        let mut result = vec![];
465        // In theory, we could pull all our frontiers from storage collections...
466        // but in practice those frontiers may not be identical. For historical reasons, we use the
467        // locally-tracked frontier for sinks but the storage-collections-maintained frontier for
468        // sources.
469        ids.retain(|&id| match self.export(id) {
470            Ok(export) => {
471                result.push((
472                    id,
473                    export.input_hold().since().clone(),
474                    export.write_frontier.clone(),
475                ));
476                false
477            }
478            Err(_) => true,
479        });
480        result.extend(
481            self.storage_collections
482                .collections_frontiers(ids)?
483                .into_iter()
484                .map(|frontiers| {
485                    (
486                        frontiers.id,
487                        frontiers.implied_capability,
488                        frontiers.write_frontier,
489                    )
490                }),
491        );
492
493        Ok(result)
494    }
495
496    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
497        self.storage_collections.active_collection_metadatas()
498    }
499
500    fn active_ingestions(
501        &self,
502        instance_id: StorageInstanceId,
503    ) -> Box<dyn Iterator<Item = &GlobalId> + '_> {
504        Box::new(self.instances[&instance_id].active_ingestions())
505    }
506
507    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
508        self.storage_collections.check_exists(id)
509    }
510
511    fn create_instance(&mut self, id: StorageInstanceId) {
512        let metrics = self.metrics.for_instance(id);
513        let mut instance = Instance::new(
514            self.envd_epoch,
515            metrics,
516            Arc::clone(self.config().config_set()),
517            self.now.clone(),
518            self.instance_response_tx.clone(),
519        );
520        if self.initialized {
521            instance.send(StorageCommand::InitializationComplete);
522        }
523        if !self.read_only {
524            instance.send(StorageCommand::AllowWrites);
525        }
526
527        let params = Box::new(self.config.parameters.clone());
528        instance.send(StorageCommand::UpdateConfiguration(params));
529
530        let old_instance = self.instances.insert(id, instance);
531        assert_none!(old_instance, "storage instance {id} already exists");
532    }
533
534    fn drop_instance(&mut self, id: StorageInstanceId) {
535        let instance = self.instances.remove(&id);
536        assert!(instance.is_some(), "storage instance {id} does not exist");
537    }
538
539    fn update_instance_workload_class(
540        &mut self,
541        id: StorageInstanceId,
542        workload_class: Option<String>,
543    ) {
544        let instance = self
545            .instances
546            .get_mut(&id)
547            .unwrap_or_else(|| panic!("instance {id} does not exist"));
548
549        instance.workload_class = workload_class;
550    }
551
552    fn connect_replica(
553        &mut self,
554        instance_id: StorageInstanceId,
555        replica_id: ReplicaId,
556        location: ClusterReplicaLocation,
557    ) {
558        let instance = self
559            .instances
560            .get_mut(&instance_id)
561            .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
562
563        let config = ReplicaConfig {
564            build_info: self.build_info,
565            location,
566            grpc_client: self.config.parameters.grpc_client.clone(),
567        };
568        instance.add_replica(replica_id, config);
569    }
570
571    fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId) {
572        let instance = self
573            .instances
574            .get_mut(&instance_id)
575            .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
576
577        let status_now = mz_ore::now::to_datetime((self.now)());
578        let mut source_status_updates = vec![];
579        let mut sink_status_updates = vec![];
580
581        let make_update = |id, object_type| StatusUpdate {
582            id,
583            status: Status::Paused,
584            timestamp: status_now,
585            error: None,
586            hints: BTreeSet::from([format!(
587                "The replica running this {object_type} has been dropped"
588            )]),
589            namespaced_errors: Default::default(),
590            replica_id: Some(replica_id),
591        };
592
593        for id in instance.active_ingestions() {
594            if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
595                active_replicas.remove(&replica_id);
596                if active_replicas.is_empty() {
597                    self.dropped_objects.remove(id);
598                }
599            }
600
601            let ingestion = self
602                .collections
603                .get_mut(id)
604                .expect("instance contains unknown ingestion");
605
606            let ingestion_description = match &ingestion.data_source {
607                DataSource::Ingestion(ingestion_description) => ingestion_description.clone(),
608                _ => panic!(
609                    "unexpected data source for ingestion: {:?}",
610                    ingestion.data_source
611                ),
612            };
613
614            // NOTE(aljoscha): We filter out the remap collection because we
615            // don't get any status updates about it from the replica side. So
616            // we don't want to synthesize a 'paused' status here.
617            //
618            // TODO(aljoscha): I think we want to fix this eventually, and make
619            // sure we get status updates for the remap shard as well. Currently
620            // its handling in the source status collection is a bit difficult
621            // because we don't have updates for it in the status history
622            // collection.
623            let subsource_ids = ingestion_description
624                .collection_ids()
625                .filter(|id| id != &ingestion_description.remap_collection_id);
626            for id in subsource_ids {
627                source_status_updates.push(make_update(id, "source"));
628            }
629        }
630
631        for id in instance.active_exports() {
632            if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
633                active_replicas.remove(&replica_id);
634                if active_replicas.is_empty() {
635                    self.dropped_objects.remove(id);
636                }
637            }
638
639            sink_status_updates.push(make_update(*id, "sink"));
640        }
641
642        instance.drop_replica(replica_id);
643
644        if !self.read_only {
645            if !source_status_updates.is_empty() {
646                self.append_status_introspection_updates(
647                    IntrospectionType::SourceStatusHistory,
648                    source_status_updates,
649                );
650            }
651            if !sink_status_updates.is_empty() {
652                self.append_status_introspection_updates(
653                    IntrospectionType::SinkStatusHistory,
654                    sink_status_updates,
655                );
656            }
657        }
658    }
659
660    async fn evolve_nullability_for_bootstrap(
661        &mut self,
662        storage_metadata: &StorageMetadata,
663        collections: Vec<(GlobalId, RelationDesc)>,
664    ) -> Result<(), StorageError<Self::Timestamp>> {
665        let persist_client = self
666            .persist
667            .open(self.persist_location.clone())
668            .await
669            .unwrap();
670
671        for (global_id, relation_desc) in collections {
672            let shard_id = storage_metadata.get_collection_shard(global_id)?;
673            let diagnostics = Diagnostics {
674                shard_name: global_id.to_string(),
675                handle_purpose: "evolve nullability for bootstrap".to_string(),
676            };
677            let latest_schema = persist_client
678                .latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
679                .await
680                .expect("invalid persist usage");
681            let Some((schema_id, current_schema, _)) = latest_schema else {
682                tracing::debug!(?global_id, "no schema registered");
683                continue;
684            };
685            tracing::debug!(?global_id, ?current_schema, new_schema = ?relation_desc, "migrating schema");
686
687            let diagnostics = Diagnostics {
688                shard_name: global_id.to_string(),
689                handle_purpose: "evolve nullability for bootstrap".to_string(),
690            };
691            let evolve_result = persist_client
692                .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
693                    shard_id,
694                    schema_id,
695                    &relation_desc,
696                    &UnitSchema,
697                    diagnostics,
698                )
699                .await
700                .expect("invalid persist usage");
701            match evolve_result {
702                CaESchema::Ok(_) => (),
703                CaESchema::ExpectedMismatch {
704                    schema_id,
705                    key,
706                    val: _,
707                } => {
708                    return Err(StorageError::PersistSchemaEvolveRace {
709                        global_id,
710                        shard_id,
711                        schema_id,
712                        relation_desc: key,
713                    });
714                }
715                CaESchema::Incompatible => {
716                    return Err(StorageError::PersistInvalidSchemaEvolve {
717                        global_id,
718                        shard_id,
719                    });
720                }
721            };
722        }
723
724        Ok(())
725    }
726
727    /// Create and "execute" the described collection.
728    ///
729    /// "Execute" is in scare quotes because what executing a collection means
730    /// varies widely based on the type of collection you're creating.
731    ///
732    /// The general process creating a collection undergoes is:
733    /// 1. Enrich the description we get from the user with the metadata only
734    ///    the storage controller's metadata. This is mostly a matter of
735    ///    separating concerns.
736    /// 2. Generate write and read persist handles for the collection.
737    /// 3. Store the collection's metadata in the appropriate field.
738    /// 4. "Execute" the collection. What that means is contingent on the type of
739    ///    collection. so consult the code for more details.
740    ///
741    // TODO(aljoscha): It would be swell if we could refactor this Leviathan of
742    // a method/move individual parts to their own methods. @guswynn observes
743    // that a number of these operations could be moved into fns on
744    // `DataSource`.
745    #[instrument(name = "storage::create_collections")]
746    async fn create_collections_for_bootstrap(
747        &mut self,
748        storage_metadata: &StorageMetadata,
749        register_ts: Option<Self::Timestamp>,
750        mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
751        migrated_storage_collections: &BTreeSet<GlobalId>,
752    ) -> Result<(), StorageError<Self::Timestamp>> {
753        self.migrated_storage_collections
754            .extend(migrated_storage_collections.iter().cloned());
755
756        self.storage_collections
757            .create_collections_for_bootstrap(
758                storage_metadata,
759                register_ts.clone(),
760                collections.clone(),
761                migrated_storage_collections,
762            )
763            .await?;
764
765        // At this point we're connected to all the collection shards in persist. Our warming task
766        // is no longer useful, so abort it if it's still running.
767        drop(self.persist_warm_task.take());
768
769        // Validate first, to avoid corrupting state.
770        // 1. create a dropped identifier, or
771        // 2. create an existing identifier with a new description.
772        // Make sure to check for errors within `ingestions` as well.
773        collections.sort_by_key(|(id, _)| *id);
774        collections.dedup();
775        for pos in 1..collections.len() {
776            if collections[pos - 1].0 == collections[pos].0 {
777                return Err(StorageError::CollectionIdReused(collections[pos].0));
778            }
779        }
780
781        // We first enrich each collection description with some additional metadata...
782        let enriched_with_metadata = collections
783            .into_iter()
784            .map(|(id, description)| {
785                let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
786
787                let get_shard = |id| -> Result<ShardId, StorageError<T>> {
788                    let shard = storage_metadata.get_collection_shard::<T>(id)?;
789                    Ok(shard)
790                };
791
792                let remap_shard = match &description.data_source {
793                    // Only ingestions can have remap shards.
794                    DataSource::Ingestion(IngestionDescription {
795                        remap_collection_id,
796                        ..
797                    }) => {
798                        // Iff ingestion has a remap collection, its metadata must
799                        // exist (and be correct) by this point.
800                        Some(get_shard(*remap_collection_id)?)
801                    }
802                    _ => None,
803                };
804
805                // If the shard is being managed by txn-wal (initially, tables), then we need to
806                // pass along the shard id for the txns shard to dataflow rendering.
807                let txns_shard = description
808                    .data_source
809                    .in_txns()
810                    .then(|| *self.txns_read.txns_id());
811
812                let metadata = CollectionMetadata {
813                    persist_location: self.persist_location.clone(),
814                    remap_shard,
815                    data_shard,
816                    relation_desc: description.desc.clone(),
817                    txns_shard,
818                };
819
820                Ok((id, description, metadata))
821            })
822            .collect_vec();
823
824        // So that we can open persist handles for each collections concurrently.
825        let persist_client = self
826            .persist
827            .open(self.persist_location.clone())
828            .await
829            .unwrap();
830        let persist_client = &persist_client;
831
832        // Reborrow the `&mut self` as immutable, as all the concurrent work to be processed in
833        // this stream cannot all have exclusive access.
834        use futures::stream::{StreamExt, TryStreamExt};
835        let this = &*self;
836        let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
837            .map(|data: Result<_, StorageError<Self::Timestamp>>| {
838                async move {
839                    let (id, description, metadata) = data?;
840
841                    // should be replaced with real introspection (https://github.com/MaterializeInc/database-issues/issues/4078)
842                    // but for now, it's helpful to have this mapping written down somewhere
843                    debug!(
844                        "mapping GlobalId={} to remap shard ({:?}), data shard ({})",
845                        id, metadata.remap_shard, metadata.data_shard
846                    );
847
848                    let write = this
849                        .open_data_handles(
850                            &id,
851                            metadata.data_shard,
852                            metadata.relation_desc.clone(),
853                            persist_client,
854                        )
855                        .await;
856
857                    Ok::<_, StorageError<T>>((id, description, write, metadata))
858                }
859            })
860            // Poll each future for each collection concurrently, maximum of 50 at a time.
861            .buffer_unordered(50)
862            // HERE BE DRAGONS:
863            //
864            // There are at least 2 subtleties in using `FuturesUnordered` (which
865            // `buffer_unordered` uses underneath:
866            // - One is captured here <https://github.com/rust-lang/futures-rs/issues/2387>
867            // - And the other is deadlocking if processing an OUTPUT of a `FuturesUnordered`
868            // stream attempts to obtain an async mutex that is also obtained in the futures
869            // being polled.
870            //
871            // Both of these could potentially be issues in all usages of `buffer_unordered` in
872            // this method, so we stick the standard advice: only use `try_collect` or
873            // `collect`!
874            .try_collect()
875            .await?;
876
877        // The set of collections that we should render at the end of this
878        // function.
879        let mut to_execute = BTreeSet::new();
880        // New collections that are being created; this is distinct from the set
881        // of collections we plan to execute because
882        // `DataSource::IngestionExport` is added as a new collection, but is
883        // not executed directly.
884        let mut new_collections = BTreeSet::new();
885        let mut table_registers = Vec::with_capacity(to_register.len());
886
887        // Reorder in dependency order.
888        to_register.sort_by_key(|(id, ..)| *id);
889
890        // Register tables first, but register them in reverse order since earlier tables
891        // can depend on later tables.
892        //
893        // Note: We could do more complex sorting to avoid the allocations, but IMO it's
894        // easier to reason about it this way.
895        let (tables_to_register, collections_to_register): (Vec<_>, Vec<_>) = to_register
896            .into_iter()
897            .partition(|(_id, desc, ..)| matches!(desc.data_source, DataSource::Table { .. }));
898        let to_register = tables_to_register
899            .into_iter()
900            .rev()
901            .chain(collections_to_register.into_iter());
902
903        // Statistics need a level of indirection so we can mutably borrow
904        // `self` when registering collections and when we are inserting
905        // statistics.
906        let mut new_source_statistic_entries = BTreeSet::new();
907        let mut new_webhook_statistic_entries = BTreeSet::new();
908        let mut new_sink_statistic_entries = BTreeSet::new();
909
910        for (id, description, write, metadata) in to_register {
911            let is_in_txns = |id, metadata: &CollectionMetadata| {
912                metadata.txns_shard.is_some()
913                    && !(self.read_only && migrated_storage_collections.contains(&id))
914            };
915
916            let mut data_source = description.data_source;
917
918            to_execute.insert(id);
919            new_collections.insert(id);
920
921            // Ensure that the ingestion has an export for its primary source if applicable.
922            // This is done in an awkward spot to appease the borrow checker.
923            // TODO(database-issues#8620): This will be removed once sources no longer export
924            // to primary collections and only export to explicit SourceExports (tables).
925            if let DataSource::Ingestion(ingestion) = &mut data_source {
926                let export = ingestion.desc.primary_source_export();
927                ingestion.source_exports.insert(id, export);
928            }
929
930            let write_frontier = write.upper();
931
932            // Determine if this collection has another dependency.
933            let storage_dependencies = self.determine_collection_dependencies(id, &data_source)?;
934
935            let dependency_read_holds = self
936                .storage_collections
937                .acquire_read_holds(storage_dependencies)
938                .expect("can acquire read holds");
939
940            let mut dependency_since = Antichain::from_elem(T::minimum());
941            for read_hold in dependency_read_holds.iter() {
942                dependency_since.join_assign(read_hold.since());
943            }
944
945            // Assert some invariants.
946            //
947            // TODO(alter_table): Include Tables (is_in_txns) in this check. After
948            // supporting ALTER TABLE, it's now possible for a table to have a dependency
949            // and thus run this check. But tables are managed by txn-wal and thus the
950            // upper of the shard's `write_handle` generally isn't the logical upper of
951            // the shard. Instead we need to thread through the upper of the `txn` shard
952            // here so we can check this invariant.
953            if !dependency_read_holds.is_empty()
954                && !is_in_txns(id, &metadata)
955                && !matches!(&data_source, DataSource::Sink { .. })
956            {
957                // The dependency since cannot be beyond the dependent (our)
958                // upper unless the collection is new. In practice, the
959                // depdenency is the remap shard of a source (export), and if
960                // the since is allowed to "catch up" to the upper, that is
961                // `upper <= since`, a restarting ingestion cannot differentiate
962                // between updates that have already been written out to the
963                // backing persist shard and updates that have yet to be
964                // written. We would write duplicate updates.
965                //
966                // If this check fails, it means that the read hold installed on
967                // the dependency was probably not upheld –– if it were, the
968                // dependency's since could not have advanced as far the
969                // dependent's upper.
970                //
971                // We don't care about the dependency since when the write
972                // frontier is empty. In that case, no-one can write down any
973                // more updates.
974                mz_ore::soft_assert_or_log!(
975                    write_frontier.elements() == &[T::minimum()]
976                        || write_frontier.is_empty()
977                        || PartialOrder::less_than(&dependency_since, write_frontier),
978                    "dependency since has advanced past dependent ({id}) upper \n
979                            dependent ({id}): upper {:?} \n
980                            dependency since {:?} \n
981                            dependency read holds: {:?}",
982                    write_frontier,
983                    dependency_since,
984                    dependency_read_holds,
985                );
986            }
987
988            // Perform data source-specific setup.
989            let mut extra_state = CollectionStateExtra::None;
990            let mut maybe_instance_id = None;
991            match &data_source {
992                DataSource::Introspection(typ) => {
993                    debug!(
994                        ?data_source, meta = ?metadata,
995                        "registering {id} with persist monotonic worker",
996                    );
997                    // We always register the collection with the collection manager,
998                    // regardless of read-only mode. The CollectionManager itself is
999                    // aware of read-only mode and will not attempt to write before told
1000                    // to do so.
1001                    //
1002                    self.register_introspection_collection(
1003                        id,
1004                        *typ,
1005                        write,
1006                        persist_client.clone(),
1007                    )?;
1008                }
1009                DataSource::Webhook => {
1010                    debug!(
1011                        ?data_source, meta = ?metadata,
1012                        "registering {id} with persist monotonic worker",
1013                    );
1014                    new_source_statistic_entries.insert(id);
1015                    // This collection of statistics is periodically aggregated into
1016                    // `source_statistics`.
1017                    new_webhook_statistic_entries.insert(id);
1018                    // Register the collection so our manager knows about it.
1019                    //
1020                    // NOTE: Maybe this shouldn't be in the collection manager,
1021                    // and collection manager should only be responsible for
1022                    // built-in introspection collections?
1023                    self.collection_manager
1024                        .register_append_only_collection(id, write, false, None);
1025                }
1026                DataSource::IngestionExport {
1027                    ingestion_id,
1028                    details,
1029                    data_config,
1030                } => {
1031                    debug!(
1032                        ?data_source, meta = ?metadata,
1033                        "not registering {id} with a controller persist worker",
1034                    );
1035                    // Adjust the source to contain this export.
1036                    let ingestion_state = self
1037                        .collections
1038                        .get_mut(ingestion_id)
1039                        .expect("known to exist");
1040
1041                    let instance_id = match &mut ingestion_state.data_source {
1042                        DataSource::Ingestion(ingestion_desc) => {
1043                            ingestion_desc.source_exports.insert(
1044                                id,
1045                                SourceExport {
1046                                    storage_metadata: (),
1047                                    details: details.clone(),
1048                                    data_config: data_config.clone(),
1049                                },
1050                            );
1051
1052                            // Record the ingestion's cluster ID for the
1053                            // ingestion export. This way we always have a
1054                            // record of it, even if the ingestion's collection
1055                            // description disappears.
1056                            ingestion_desc.instance_id
1057                        }
1058                        _ => unreachable!(
1059                            "SourceExport must only refer to primary sources that already exist"
1060                        ),
1061                    };
1062
1063                    // Executing the source export doesn't do anything, ensure we execute the source instead.
1064                    to_execute.remove(&id);
1065                    to_execute.insert(*ingestion_id);
1066
1067                    let ingestion_state = IngestionState {
1068                        read_capabilities: MutableAntichain::from(dependency_since.clone()),
1069                        dependency_read_holds,
1070                        derived_since: dependency_since,
1071                        write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1072                        hold_policy: ReadPolicy::step_back(),
1073                        instance_id,
1074                        hydrated_on: BTreeSet::new(),
1075                    };
1076
1077                    extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1078                    maybe_instance_id = Some(instance_id);
1079
1080                    new_source_statistic_entries.insert(id);
1081                }
1082                DataSource::Table { .. } => {
1083                    debug!(
1084                        ?data_source, meta = ?metadata,
1085                        "registering {id} with persist table worker",
1086                    );
1087                    table_registers.push((id, write));
1088                }
1089                DataSource::Progress | DataSource::Other => {
1090                    debug!(
1091                        ?data_source, meta = ?metadata,
1092                        "not registering {id} with a controller persist worker",
1093                    );
1094                }
1095                DataSource::Ingestion(ingestion_desc) => {
1096                    debug!(
1097                        ?data_source, meta = ?metadata,
1098                        "not registering {id} with a controller persist worker",
1099                    );
1100
1101                    let mut dependency_since = Antichain::from_elem(T::minimum());
1102                    for read_hold in dependency_read_holds.iter() {
1103                        dependency_since.join_assign(read_hold.since());
1104                    }
1105
1106                    let ingestion_state = IngestionState {
1107                        read_capabilities: MutableAntichain::from(dependency_since.clone()),
1108                        dependency_read_holds,
1109                        derived_since: dependency_since,
1110                        write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1111                        hold_policy: ReadPolicy::step_back(),
1112                        instance_id: ingestion_desc.instance_id,
1113                        hydrated_on: BTreeSet::new(),
1114                    };
1115
1116                    extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1117                    maybe_instance_id = Some(ingestion_desc.instance_id);
1118
1119                    new_source_statistic_entries.insert(id);
1120                }
1121                DataSource::Sink { desc } => {
1122                    let mut dependency_since = Antichain::from_elem(T::minimum());
1123                    for read_hold in dependency_read_holds.iter() {
1124                        dependency_since.join_assign(read_hold.since());
1125                    }
1126
1127                    let [self_hold, read_hold] =
1128                        dependency_read_holds.try_into().expect("two holds");
1129
1130                    let state = ExportState::new(
1131                        desc.instance_id,
1132                        read_hold,
1133                        self_hold,
1134                        write_frontier.clone(),
1135                        ReadPolicy::step_back(),
1136                    );
1137                    maybe_instance_id = Some(state.cluster_id);
1138                    extra_state = CollectionStateExtra::Export(state);
1139
1140                    new_sink_statistic_entries.insert(id);
1141                }
1142            }
1143
1144            let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(id, maybe_instance_id);
1145            let collection_state =
1146                CollectionState::new(data_source, metadata, extra_state, wallclock_lag_metrics);
1147
1148            self.collections.insert(id, collection_state);
1149        }
1150
1151        {
1152            // Ensure all sources are associated with the statistics.
1153            //
1154            // We currently do not call `create_collections` after we have initialized the source
1155            // statistics scrapers, but in the interest of safety, avoid overriding existing
1156            // statistics values.
1157            let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1158            let mut sink_statistics = self.sink_statistics.lock().expect("poisoned");
1159
1160            for id in new_source_statistic_entries {
1161                source_statistics
1162                    .source_statistics
1163                    .entry(id)
1164                    .or_insert_with(|| StatsState::new(SourceStatisticsUpdate::new(id)));
1165            }
1166            for id in new_webhook_statistic_entries {
1167                source_statistics.webhook_statistics.entry(id).or_default();
1168            }
1169            for id in new_sink_statistic_entries {
1170                sink_statistics
1171                    .entry(id)
1172                    .or_insert_with(|| StatsState::new(SinkStatisticsUpdate::new(id)));
1173            }
1174        }
1175
1176        // Register the tables all in one batch.
1177        if !table_registers.is_empty() {
1178            let register_ts = register_ts
1179                .expect("caller should have provided a register_ts when creating a table");
1180
1181            if self.read_only {
1182                // In read-only mode, we use a special read-only table worker
1183                // that allows writing to migrated tables and will continually
1184                // bump their shard upper so that it tracks the txn shard upper.
1185                // We do this, so that they remain readable at a recent
1186                // timestamp, which in turn allows dataflows that depend on them
1187                // to (re-)hydrate.
1188                //
1189                // We only want to register migrated tables, though, and leave
1190                // existing tables out/never write to them in read-only mode.
1191                table_registers
1192                    .retain(|(id, _write_handle)| migrated_storage_collections.contains(id));
1193
1194                self.persist_table_worker
1195                    .register(register_ts, table_registers)
1196                    .await
1197                    .expect("table worker unexpectedly shut down");
1198            } else {
1199                self.persist_table_worker
1200                    .register(register_ts, table_registers)
1201                    .await
1202                    .expect("table worker unexpectedly shut down");
1203            }
1204        }
1205
1206        self.append_shard_mappings(new_collections.into_iter(), Diff::ONE);
1207
1208        // TODO(guswynn): perform the io in this final section concurrently.
1209        for id in to_execute {
1210            match &self.collection(id)?.data_source {
1211                DataSource::Ingestion(ingestion) => {
1212                    if !self.read_only
1213                        || (ENABLE_0DT_DEPLOYMENT_SOURCES.get(self.config.config_set())
1214                            && ingestion.desc.connection.supports_read_only())
1215                    {
1216                        self.run_ingestion(id)?;
1217                    }
1218                }
1219                DataSource::IngestionExport { .. } => unreachable!(
1220                    "ingestion exports do not execute directly, but instead schedule their source to be re-executed"
1221                ),
1222                DataSource::Introspection(_)
1223                | DataSource::Webhook
1224                | DataSource::Table { .. }
1225                | DataSource::Progress
1226                | DataSource::Other => {}
1227                DataSource::Sink { .. } => {
1228                    if !self.read_only {
1229                        self.run_export(id)?;
1230                    }
1231                }
1232            };
1233        }
1234
1235        Ok(())
1236    }
1237
1238    fn check_alter_ingestion_source_desc(
1239        &mut self,
1240        ingestion_id: GlobalId,
1241        source_desc: &SourceDesc,
1242    ) -> Result<(), StorageError<Self::Timestamp>> {
1243        let source_collection = self.collection(ingestion_id)?;
1244        let data_source = &source_collection.data_source;
1245        match &data_source {
1246            DataSource::Ingestion(cur_ingestion) => {
1247                cur_ingestion
1248                    .desc
1249                    .alter_compatible(ingestion_id, source_desc)?;
1250            }
1251            o => {
1252                tracing::info!(
1253                    "{ingestion_id} inalterable because its data source is {:?} and not an ingestion",
1254                    o
1255                );
1256                Err(AlterError { id: ingestion_id })?
1257            }
1258        }
1259
1260        Ok(())
1261    }
1262
1263    async fn alter_ingestion_source_desc(
1264        &mut self,
1265        ingestion_id: GlobalId,
1266        source_desc: SourceDesc,
1267    ) -> Result<(), StorageError<Self::Timestamp>> {
1268        self.check_alter_ingestion_source_desc(ingestion_id, &source_desc)?;
1269
1270        // Update the `SourceDesc` and the source exports
1271        // simultaneously.
1272        let collection = self
1273            .collections
1274            .get_mut(&ingestion_id)
1275            .expect("validated exists");
1276        let curr_ingestion = match &mut collection.data_source {
1277            DataSource::Ingestion(curr_ingestion) => curr_ingestion,
1278            _ => unreachable!("verified collection refers to ingestion"),
1279        };
1280
1281        curr_ingestion.desc = source_desc;
1282        tracing::debug!("altered {ingestion_id}'s SourceDesc");
1283
1284        // n.b. we do not re-run updated ingestions because updating the source
1285        // desc is only done in preparation for adding subsources, which will
1286        // then run the ingestion.
1287        //
1288        // If this expectation ever changes, we will almost certainly know
1289        // because failing to run an altered ingestion means that whatever
1290        // changes you expect to occur will not be reflected in the running
1291        // dataflow.
1292
1293        Ok(())
1294    }
1295
1296    async fn alter_ingestion_connections(
1297        &mut self,
1298        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
1299    ) -> Result<(), StorageError<Self::Timestamp>> {
1300        // Also have to let StorageCollections know!
1301        self.storage_collections
1302            .alter_ingestion_connections(source_connections.clone())
1303            .await?;
1304
1305        let mut ingestions_to_run = BTreeSet::new();
1306
1307        for (id, conn) in source_connections {
1308            let collection = self
1309                .collections
1310                .get_mut(&id)
1311                .ok_or_else(|| StorageError::IdentifierMissing(id))?;
1312
1313            match &mut collection.data_source {
1314                DataSource::Ingestion(ingestion) => {
1315                    // If the connection hasn't changed, there's no sense in
1316                    // re-rendering the dataflow.
1317                    if ingestion.desc.connection != conn {
1318                        tracing::info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
1319                        ingestion.desc.connection = conn;
1320                        ingestions_to_run.insert(id);
1321                    } else {
1322                        tracing::warn!(
1323                            "update_source_connection called on {id} but the \
1324                            connection was the same"
1325                        );
1326                    }
1327                }
1328                o => {
1329                    tracing::warn!("update_source_connection called on {:?}", o);
1330                    Err(StorageError::IdentifierInvalid(id))?;
1331                }
1332            }
1333        }
1334
1335        for id in ingestions_to_run {
1336            self.run_ingestion(id)?;
1337        }
1338        Ok(())
1339    }
1340
1341    async fn alter_ingestion_export_data_configs(
1342        &mut self,
1343        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
1344    ) -> Result<(), StorageError<Self::Timestamp>> {
1345        // Also have to let StorageCollections know!
1346        self.storage_collections
1347            .alter_ingestion_export_data_configs(source_exports.clone())
1348            .await?;
1349
1350        let mut ingestions_to_run = BTreeSet::new();
1351
1352        for (source_export_id, new_data_config) in source_exports {
1353            // We need to adjust the data config on the CollectionState for
1354            // the source export collection directly
1355            let source_export_collection = self
1356                .collections
1357                .get_mut(&source_export_id)
1358                .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1359            let ingestion_id = match &mut source_export_collection.data_source {
1360                DataSource::IngestionExport {
1361                    ingestion_id,
1362                    details: _,
1363                    data_config,
1364                } => {
1365                    *data_config = new_data_config.clone();
1366                    *ingestion_id
1367                }
1368                o => {
1369                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1370                    Err(StorageError::IdentifierInvalid(source_export_id))?
1371                }
1372            };
1373            // We also need to adjust the data config on the CollectionState of the
1374            // Ingestion that the export is associated with.
1375            let ingestion_collection = self
1376                .collections
1377                .get_mut(&ingestion_id)
1378                .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
1379
1380            match &mut ingestion_collection.data_source {
1381                DataSource::Ingestion(ingestion_desc) => {
1382                    let source_export = ingestion_desc
1383                        .source_exports
1384                        .get_mut(&source_export_id)
1385                        .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1386
1387                    // If the data config hasn't changed, there's no sense in
1388                    // re-rendering the dataflow.
1389                    if source_export.data_config != new_data_config {
1390                        tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
1391                        source_export.data_config = new_data_config;
1392
1393                        ingestions_to_run.insert(ingestion_id);
1394                    } else {
1395                        tracing::warn!(
1396                            "alter_ingestion_export_data_configs called on \
1397                                    export {source_export_id} of {ingestion_id} but \
1398                                    the data config was the same"
1399                        );
1400                    }
1401                }
1402                o => {
1403                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1404                    Err(StorageError::IdentifierInvalid(ingestion_id))?
1405                }
1406            }
1407        }
1408
1409        for id in ingestions_to_run {
1410            self.run_ingestion(id)?;
1411        }
1412        Ok(())
1413    }
1414
1415    async fn alter_table_desc(
1416        &mut self,
1417        existing_collection: GlobalId,
1418        new_collection: GlobalId,
1419        new_desc: RelationDesc,
1420        expected_version: RelationVersion,
1421        register_ts: Self::Timestamp,
1422    ) -> Result<(), StorageError<Self::Timestamp>> {
1423        let data_shard = {
1424            let Controller {
1425                collections,
1426                storage_collections,
1427                ..
1428            } = self;
1429
1430            let existing = collections
1431                .get(&existing_collection)
1432                .ok_or(StorageError::IdentifierMissing(existing_collection))?;
1433            if !matches!(existing.data_source, DataSource::Table { .. }) {
1434                return Err(StorageError::IdentifierInvalid(existing_collection));
1435            }
1436
1437            // Let StorageCollections know!
1438            storage_collections
1439                .alter_table_desc(
1440                    existing_collection,
1441                    new_collection,
1442                    new_desc.clone(),
1443                    expected_version,
1444                )
1445                .await?;
1446
1447            existing.collection_metadata.data_shard.clone()
1448        };
1449
1450        let persist_client = self
1451            .persist
1452            .open(self.persist_location.clone())
1453            .await
1454            .expect("invalid persist location");
1455        let write_handle = self
1456            .open_data_handles(
1457                &existing_collection,
1458                data_shard,
1459                new_desc.clone(),
1460                &persist_client,
1461            )
1462            .await;
1463
1464        // Note: The new collection is now the "primary collection" so we specify `None` here.
1465        let collection_desc = CollectionDescription::<T>::for_table(new_desc.clone(), None);
1466        let collection_meta = CollectionMetadata {
1467            persist_location: self.persist_location.clone(),
1468            data_shard,
1469            relation_desc: new_desc.clone(),
1470            // TODO(alter_table): Support schema evolution on sources.
1471            remap_shard: None,
1472            txns_shard: Some(self.txns_read.txns_id().clone()),
1473        };
1474        // TODO(alter_table): Support schema evolution on sources.
1475        let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(new_collection, None);
1476        let collection_state = CollectionState::new(
1477            collection_desc.data_source.clone(),
1478            collection_meta,
1479            CollectionStateExtra::None,
1480            wallclock_lag_metrics,
1481        );
1482
1483        // Great! We have successfully evolved the schema of our Table, now we need to update our
1484        // in-memory data structures.
1485        self.collections.insert(new_collection, collection_state);
1486        let existing = self
1487            .collections
1488            .get_mut(&existing_collection)
1489            .expect("missing existing collection");
1490        assert!(matches!(
1491            existing.data_source,
1492            DataSource::Table { primary: None }
1493        ));
1494        existing.data_source = DataSource::Table {
1495            primary: Some(new_collection),
1496        };
1497
1498        self.persist_table_worker
1499            .register(register_ts, vec![(new_collection, write_handle)])
1500            .await
1501            .expect("table worker unexpectedly shut down");
1502
1503        Ok(())
1504    }
1505
1506    fn export(
1507        &self,
1508        id: GlobalId,
1509    ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1510        self.collections
1511            .get(&id)
1512            .and_then(|c| match &c.extra_state {
1513                CollectionStateExtra::Export(state) => Some(state),
1514                _ => None,
1515            })
1516            .ok_or(StorageError::IdentifierMissing(id))
1517    }
1518
1519    fn export_mut(
1520        &mut self,
1521        id: GlobalId,
1522    ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1523        self.collections
1524            .get_mut(&id)
1525            .and_then(|c| match &mut c.extra_state {
1526                CollectionStateExtra::Export(state) => Some(state),
1527                _ => None,
1528            })
1529            .ok_or(StorageError::IdentifierMissing(id))
1530    }
1531
1532    /// Create a oneshot ingestion.
1533    async fn create_oneshot_ingestion(
1534        &mut self,
1535        ingestion_id: uuid::Uuid,
1536        collection_id: GlobalId,
1537        instance_id: StorageInstanceId,
1538        request: OneshotIngestionRequest,
1539        result_tx: OneshotResultCallback<ProtoBatch>,
1540    ) -> Result<(), StorageError<Self::Timestamp>> {
1541        let collection_meta = self
1542            .collections
1543            .get(&collection_id)
1544            .ok_or_else(|| StorageError::IdentifierMissing(collection_id))?
1545            .collection_metadata
1546            .clone();
1547        let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1548            // TODO(cf2): Refine this error.
1549            StorageError::Generic(anyhow::anyhow!("missing cluster {instance_id}"))
1550        })?;
1551        let oneshot_cmd = RunOneshotIngestion {
1552            ingestion_id,
1553            collection_id,
1554            collection_meta,
1555            request,
1556        };
1557
1558        if !self.read_only {
1559            instance.send(StorageCommand::RunOneshotIngestion(Box::new(oneshot_cmd)));
1560            let pending = PendingOneshotIngestion {
1561                result_tx,
1562                cluster_id: instance_id,
1563            };
1564            let novel = self
1565                .pending_oneshot_ingestions
1566                .insert(ingestion_id, pending);
1567            assert_none!(novel);
1568            Ok(())
1569        } else {
1570            Err(StorageError::ReadOnly)
1571        }
1572    }
1573
1574    fn cancel_oneshot_ingestion(
1575        &mut self,
1576        ingestion_id: uuid::Uuid,
1577    ) -> Result<(), StorageError<Self::Timestamp>> {
1578        if self.read_only {
1579            return Err(StorageError::ReadOnly);
1580        }
1581
1582        let pending = self
1583            .pending_oneshot_ingestions
1584            .remove(&ingestion_id)
1585            .ok_or_else(|| {
1586                // TODO(cf2): Refine this error.
1587                StorageError::Generic(anyhow::anyhow!("missing oneshot ingestion {ingestion_id}"))
1588            })?;
1589
1590        match self.instances.get_mut(&pending.cluster_id) {
1591            Some(instance) => {
1592                instance.send(StorageCommand::CancelOneshotIngestion(ingestion_id));
1593            }
1594            None => {
1595                mz_ore::soft_panic_or_log!(
1596                    "canceling oneshot ingestion on non-existent cluster, ingestion {:?}, instance {}",
1597                    ingestion_id,
1598                    pending.cluster_id,
1599                );
1600            }
1601        }
1602        // Respond to the user that the request has been canceled.
1603        pending.cancel();
1604
1605        Ok(())
1606    }
1607
1608    async fn alter_export(
1609        &mut self,
1610        id: GlobalId,
1611        new_description: ExportDescription<Self::Timestamp>,
1612    ) -> Result<(), StorageError<Self::Timestamp>> {
1613        let from_id = new_description.sink.from;
1614
1615        // Acquire read holds at StorageCollections to ensure that the
1616        // sinked collection is not dropped while we're sinking it.
1617        let desired_read_holds = vec![from_id.clone(), id.clone()];
1618        let [input_hold, self_hold] = self
1619            .storage_collections
1620            .acquire_read_holds(desired_read_holds)
1621            .expect("missing dependency")
1622            .try_into()
1623            .expect("expected number of holds");
1624        let from_storage_metadata = self.storage_collections.collection_metadata(from_id)?;
1625        let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1626
1627        // Check whether the sink's write frontier is beyond the read hold we got
1628        let cur_export = self.export_mut(id)?;
1629        let input_readable = cur_export
1630            .write_frontier
1631            .iter()
1632            .all(|t| input_hold.since().less_than(t));
1633        if !input_readable {
1634            return Err(StorageError::ReadBeforeSince(from_id));
1635        }
1636
1637        let new_export = ExportState {
1638            read_capabilities: cur_export.read_capabilities.clone(),
1639            cluster_id: new_description.instance_id,
1640            derived_since: cur_export.derived_since.clone(),
1641            read_holds: [input_hold, self_hold],
1642            read_policy: cur_export.read_policy.clone(),
1643            write_frontier: cur_export.write_frontier.clone(),
1644        };
1645        *cur_export = new_export;
1646
1647        let cmd = RunSinkCommand {
1648            id,
1649            description: StorageSinkDesc {
1650                from: from_id,
1651                from_desc: new_description.sink.from_desc,
1652                connection: new_description.sink.connection,
1653                envelope: new_description.sink.envelope,
1654                as_of: new_description.sink.as_of,
1655                version: new_description.sink.version,
1656                from_storage_metadata,
1657                with_snapshot: new_description.sink.with_snapshot,
1658                to_storage_metadata,
1659            },
1660        };
1661
1662        // Fetch the client for this export's cluster.
1663        let instance = self
1664            .instances
1665            .get_mut(&new_description.instance_id)
1666            .ok_or_else(|| StorageError::ExportInstanceMissing {
1667                storage_instance_id: new_description.instance_id,
1668                export_id: id,
1669            })?;
1670
1671        instance.send(StorageCommand::RunSink(Box::new(cmd)));
1672        Ok(())
1673    }
1674
1675    /// Create the sinks described by the `ExportDescription`.
1676    async fn alter_export_connections(
1677        &mut self,
1678        exports: BTreeMap<GlobalId, StorageSinkConnection>,
1679    ) -> Result<(), StorageError<Self::Timestamp>> {
1680        let mut updates_by_instance =
1681            BTreeMap::<StorageInstanceId, Vec<(RunSinkCommand<T>, ExportDescription<T>)>>::new();
1682
1683        for (id, connection) in exports {
1684            // We stage changes in new_export_description and then apply all
1685            // updates to exports at the end.
1686            //
1687            // We don't just go ahead and clone the `ExportState` itself and
1688            // update that because `ExportState` is not clone, because it holds
1689            // a `ReadHandle` and cloning that would cause additional work for
1690            // whoever guarantees those read holds.
1691            let (mut new_export_description, as_of): (ExportDescription<Self::Timestamp>, _) = {
1692                let export = &self.collections[&id];
1693                let DataSource::Sink { desc } = &export.data_source else {
1694                    panic!("export exists")
1695                };
1696                let CollectionStateExtra::Export(state) = &export.extra_state else {
1697                    panic!("export exists")
1698                };
1699                let export_description = desc.clone();
1700                let as_of = state.input_hold().since().clone();
1701
1702                (export_description, as_of)
1703            };
1704            let current_sink = new_export_description.sink.clone();
1705
1706            new_export_description.sink.connection = connection;
1707
1708            // Ensure compatibility
1709            current_sink.alter_compatible(id, &new_export_description.sink)?;
1710
1711            let from_storage_metadata = self
1712                .storage_collections
1713                .collection_metadata(new_export_description.sink.from)?;
1714            let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1715
1716            let cmd = RunSinkCommand {
1717                id,
1718                description: StorageSinkDesc {
1719                    from: new_export_description.sink.from,
1720                    from_desc: new_export_description.sink.from_desc.clone(),
1721                    connection: new_export_description.sink.connection.clone(),
1722                    envelope: new_export_description.sink.envelope,
1723                    with_snapshot: new_export_description.sink.with_snapshot,
1724                    version: new_export_description.sink.version,
1725                    // Here we are about to send a RunSinkCommand with the current read capaibility
1726                    // held by this sink. However, clusters are already running a version of the
1727                    // sink and nothing guarantees that by the time this command arrives at the
1728                    // clusters they won't have made additional progress such that this read
1729                    // capability is invalidated.
1730                    // The solution to this problem is for the controller to track specific
1731                    // executions of dataflows such that it can track the shutdown of the current
1732                    // instance and the initialization of the new instance separately and ensure
1733                    // read holds are held for the correct amount of time.
1734                    // TODO(petrosagg): change the controller to explicitly track dataflow executions
1735                    as_of: as_of.to_owned(),
1736                    from_storage_metadata,
1737                    to_storage_metadata,
1738                },
1739            };
1740
1741            let update = updates_by_instance
1742                .entry(new_export_description.instance_id)
1743                .or_default();
1744            update.push((cmd, new_export_description));
1745        }
1746
1747        for (instance_id, updates) in updates_by_instance {
1748            let mut export_updates = BTreeMap::new();
1749            let mut cmds = Vec::with_capacity(updates.len());
1750
1751            for (cmd, export_state) in updates {
1752                export_updates.insert(cmd.id, export_state);
1753                cmds.push(cmd);
1754            }
1755
1756            // Fetch the client for this exports's cluster.
1757            let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1758                StorageError::ExportInstanceMissing {
1759                    storage_instance_id: instance_id,
1760                    export_id: *export_updates
1761                        .keys()
1762                        .next()
1763                        .expect("set of exports not empty"),
1764                }
1765            })?;
1766
1767            for cmd in cmds {
1768                instance.send(StorageCommand::RunSink(Box::new(cmd)));
1769            }
1770
1771            // Update state only after all possible errors have occurred.
1772            for (id, new_export_description) in export_updates {
1773                let Some(state) = self.collections.get_mut(&id) else {
1774                    panic!("export known to exist")
1775                };
1776                let DataSource::Sink { desc } = &mut state.data_source else {
1777                    panic!("export known to exist")
1778                };
1779                *desc = new_export_description;
1780            }
1781        }
1782
1783        Ok(())
1784    }
1785
1786    // Dropping a table takes roughly the following flow:
1787    //
1788    // First determine if this is a TableWrites table or a source-fed table (an IngestionExport):
1789    //
1790    // If this is a TableWrites table:
1791    //   1. We remove the table from the persist table write worker.
1792    //   2. The table removal is awaited in an async task.
1793    //   3. A message is sent to the storage controller that the table has been removed from the
1794    //      table write worker.
1795    //   4. The controller drains all table drop messages during `process`.
1796    //   5. `process` calls `drop_sources` with the dropped tables.
1797    //
1798    // If this is an IngestionExport table:
1799    //   1. We validate the ids and then call drop_sources_unvalidated to proceed dropping.
1800    fn drop_tables(
1801        &mut self,
1802        storage_metadata: &StorageMetadata,
1803        identifiers: Vec<GlobalId>,
1804        ts: Self::Timestamp,
1805    ) -> Result<(), StorageError<Self::Timestamp>> {
1806        // Collect tables by their data_source
1807        let (table_write_ids, data_source_ids): (Vec<_>, Vec<_>) = identifiers
1808            .into_iter()
1809            .partition(|id| match self.collections[id].data_source {
1810                DataSource::Table { .. } => true,
1811                DataSource::IngestionExport { .. } | DataSource::Webhook => false,
1812                _ => panic!("identifier is not a table: {}", id),
1813            });
1814
1815        // Drop table write tables
1816        if table_write_ids.len() > 0 {
1817            let drop_notif = self
1818                .persist_table_worker
1819                .drop_handles(table_write_ids.clone(), ts);
1820            let tx = self.pending_table_handle_drops_tx.clone();
1821            mz_ore::task::spawn(|| "table-cleanup".to_string(), async move {
1822                drop_notif.await;
1823                for identifier in table_write_ids {
1824                    let _ = tx.send(identifier);
1825                }
1826            });
1827        }
1828
1829        // Drop source-fed tables
1830        if data_source_ids.len() > 0 {
1831            self.validate_collection_ids(data_source_ids.iter().cloned())?;
1832            self.drop_sources_unvalidated(storage_metadata, data_source_ids)?;
1833        }
1834
1835        Ok(())
1836    }
1837
1838    fn drop_sources(
1839        &mut self,
1840        storage_metadata: &StorageMetadata,
1841        identifiers: Vec<GlobalId>,
1842    ) -> Result<(), StorageError<Self::Timestamp>> {
1843        self.validate_collection_ids(identifiers.iter().cloned())?;
1844        self.drop_sources_unvalidated(storage_metadata, identifiers)
1845    }
1846
1847    fn drop_sources_unvalidated(
1848        &mut self,
1849        storage_metadata: &StorageMetadata,
1850        ids: Vec<GlobalId>,
1851    ) -> Result<(), StorageError<Self::Timestamp>> {
1852        // Keep track of which ingestions we have to execute still, and which we
1853        // have to change because of dropped subsources.
1854        let mut ingestions_to_execute = BTreeSet::new();
1855        let mut ingestions_to_drop = BTreeSet::new();
1856        let mut source_statistics_to_drop = Vec::new();
1857
1858        // Ingestions (and their exports) are also collections, but we keep
1859        // track of non-ingestion collections separately, because they have
1860        // slightly different cleanup logic below.
1861        let mut collections_to_drop = Vec::new();
1862
1863        for id in ids.iter() {
1864            let metadata = storage_metadata.get_collection_shard::<T>(*id);
1865            mz_ore::soft_assert_or_log!(
1866                matches!(metadata, Err(StorageError::IdentifierMissing(_))),
1867                "dropping {id}, but drop was not synchronized with storage \
1868                controller via `synchronize_collections`"
1869            );
1870
1871            let collection_state = self.collections.get(id);
1872
1873            if let Some(collection_state) = collection_state {
1874                match collection_state.data_source {
1875                    DataSource::Webhook => {
1876                        // TODO(parkmycar): The Collection Manager and PersistMonotonicWriter
1877                        // could probably use some love and maybe get merged together?
1878                        let fut = self.collection_manager.unregister_collection(*id);
1879                        mz_ore::task::spawn(|| format!("storage-webhook-cleanup-{id}"), fut);
1880
1881                        collections_to_drop.push(*id);
1882                        source_statistics_to_drop.push(*id);
1883                    }
1884                    DataSource::Ingestion(_) => {
1885                        ingestions_to_drop.insert(*id);
1886                        source_statistics_to_drop.push(*id);
1887                    }
1888                    DataSource::IngestionExport { ingestion_id, .. } => {
1889                        // If we are dropping source exports, we need to modify the
1890                        // ingestion that it runs on.
1891                        //
1892                        // If we remove this export, we need to stop producing data to
1893                        // it, so plan to re-execute the ingestion with the amended
1894                        // description.
1895                        ingestions_to_execute.insert(ingestion_id);
1896
1897                        // Adjust the source to remove this export.
1898                        let ingestion_state = match self.collections.get_mut(&ingestion_id) {
1899                            Some(ingestion_collection) => ingestion_collection,
1900                            // Primary ingestion already dropped.
1901                            None => {
1902                                tracing::error!(
1903                                    "primary source {ingestion_id} seemingly dropped before subsource {id}"
1904                                );
1905                                continue;
1906                            }
1907                        };
1908
1909                        match &mut ingestion_state.data_source {
1910                            DataSource::Ingestion(ingestion_desc) => {
1911                                let removed = ingestion_desc.source_exports.remove(id);
1912                                mz_ore::soft_assert_or_log!(
1913                                    removed.is_some(),
1914                                    "dropped subsource {id} already removed from source exports"
1915                                );
1916                            }
1917                            _ => unreachable!(
1918                                "SourceExport must only refer to primary sources that already exist"
1919                            ),
1920                        };
1921
1922                        // Ingestion exports also have ReadHolds that we need to
1923                        // downgrade, and much of their drop machinery is the
1924                        // same as for the "main" ingestion.
1925                        ingestions_to_drop.insert(*id);
1926                        source_statistics_to_drop.push(*id);
1927                    }
1928                    DataSource::Progress | DataSource::Table { .. } | DataSource::Other => {
1929                        collections_to_drop.push(*id);
1930                    }
1931                    DataSource::Introspection(_) | DataSource::Sink { .. } => {
1932                        // Collections of these types are either not sources and should be dropped
1933                        // through other means, or are sources but should never be dropped.
1934                        soft_panic_or_log!(
1935                            "drop_sources called on a {:?} (id={id}))",
1936                            collection_state.data_source,
1937                        );
1938                    }
1939                }
1940            }
1941        }
1942
1943        // Do not bother re-executing ingestions we know we plan to drop.
1944        ingestions_to_execute.retain(|id| !ingestions_to_drop.contains(id));
1945        for ingestion_id in ingestions_to_execute {
1946            self.run_ingestion(ingestion_id)?;
1947        }
1948
1949        // For ingestions, we fabricate a new hold that will propagate through
1950        // the cluster and then back to us.
1951
1952        // We don't explicitly remove read capabilities! Downgrading the
1953        // frontier of the source to `[]` (the empty Antichain), will propagate
1954        // to the storage dependencies.
1955        let ingestion_policies = ingestions_to_drop
1956            .iter()
1957            .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
1958            .collect();
1959
1960        tracing::debug!(
1961            ?ingestion_policies,
1962            "dropping sources by setting read hold policies"
1963        );
1964        self.set_hold_policies(ingestion_policies);
1965
1966        // Delete all collection->shard mappings
1967        let shards_to_update: BTreeSet<_> = ingestions_to_drop
1968            .iter()
1969            .chain(collections_to_drop.iter())
1970            .cloned()
1971            .collect();
1972        self.append_shard_mappings(shards_to_update.into_iter(), Diff::MINUS_ONE);
1973
1974        let status_now = mz_ore::now::to_datetime((self.now)());
1975        let mut status_updates = vec![];
1976        for id in ingestions_to_drop.iter() {
1977            status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
1978        }
1979
1980        if !self.read_only {
1981            self.append_status_introspection_updates(
1982                IntrospectionType::SourceStatusHistory,
1983                status_updates,
1984            );
1985        }
1986
1987        {
1988            let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1989            for id in source_statistics_to_drop {
1990                source_statistics.source_statistics.remove(&id);
1991                source_statistics.webhook_statistics.remove(&id);
1992            }
1993        }
1994
1995        // Remove collection state
1996        for id in ingestions_to_drop.iter().chain(collections_to_drop.iter()) {
1997            tracing::info!(%id, "dropping collection state");
1998            let collection = self
1999                .collections
2000                .remove(id)
2001                .expect("list populated after checking that self.collections contains it");
2002
2003            let instance = match &collection.extra_state {
2004                CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
2005                CollectionStateExtra::Export(export) => Some(export.cluster_id()),
2006                CollectionStateExtra::None => None,
2007            }
2008            .and_then(|i| self.instances.get(&i));
2009
2010            // Record which replicas were running a collection, so that we can
2011            // match DroppedId messages against them and eventually remove state
2012            // from self.dropped_objects
2013            if let Some(instance) = instance {
2014                let active_replicas = instance.get_active_replicas_for_object(id);
2015                if !active_replicas.is_empty() {
2016                    // The remap collection of an ingestion doesn't have extra
2017                    // state and doesn't have an instance_id, but we still get
2018                    // upper updates for them, so want to make sure to populate
2019                    // dropped_ids.
2020                    // TODO(aljoscha): All this is a bit icky. But here we are
2021                    // for now...
2022                    match &collection.data_source {
2023                        DataSource::Ingestion(ingestion_desc) => {
2024                            self.dropped_objects.insert(
2025                                ingestion_desc.remap_collection_id,
2026                                active_replicas.clone(),
2027                            );
2028                        }
2029                        _ => {}
2030                    }
2031
2032                    self.dropped_objects.insert(*id, active_replicas);
2033                }
2034            }
2035        }
2036
2037        // Also let StorageCollections know!
2038        self.storage_collections
2039            .drop_collections_unvalidated(storage_metadata, ids);
2040
2041        Ok(())
2042    }
2043
2044    /// Drops the read capability for the sinks and allows their resources to be reclaimed.
2045    fn drop_sinks(
2046        &mut self,
2047        storage_metadata: &StorageMetadata,
2048        identifiers: Vec<GlobalId>,
2049    ) -> Result<(), StorageError<Self::Timestamp>> {
2050        self.validate_export_ids(identifiers.iter().cloned())?;
2051        self.drop_sinks_unvalidated(storage_metadata, identifiers);
2052        Ok(())
2053    }
2054
2055    fn drop_sinks_unvalidated(
2056        &mut self,
2057        storage_metadata: &StorageMetadata,
2058        mut sinks_to_drop: Vec<GlobalId>,
2059    ) {
2060        // Ignore exports that have already been removed.
2061        sinks_to_drop.retain(|id| self.export(*id).is_ok());
2062
2063        // TODO: ideally we'd advance the write frontier ourselves here, but this function's
2064        // not yet marked async.
2065
2066        // We don't explicitly remove read capabilities! Downgrading the
2067        // frontier of the source to `[]` (the empty Antichain), will propagate
2068        // to the storage dependencies.
2069        let drop_policy = sinks_to_drop
2070            .iter()
2071            .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
2072            .collect();
2073
2074        tracing::debug!(
2075            ?drop_policy,
2076            "dropping sources by setting read hold policies"
2077        );
2078        self.set_hold_policies(drop_policy);
2079
2080        // Record the drop status for all sink drops.
2081        //
2082        // We also delete the items' statistics objects.
2083        //
2084        // The locks are held for a short time, only while we do some removals from a map.
2085
2086        let status_now = mz_ore::now::to_datetime((self.now)());
2087
2088        // Record the drop status for all pending sink drops.
2089        let mut status_updates = vec![];
2090        {
2091            let mut sink_statistics = self.sink_statistics.lock().expect("poisoned");
2092            for id in sinks_to_drop.iter() {
2093                status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
2094                sink_statistics.remove(id);
2095            }
2096        }
2097
2098        if !self.read_only {
2099            self.append_status_introspection_updates(
2100                IntrospectionType::SinkStatusHistory,
2101                status_updates,
2102            );
2103        }
2104
2105        // Remove collection/export state
2106        for id in sinks_to_drop.iter() {
2107            tracing::info!(%id, "dropping export state");
2108            let collection = self
2109                .collections
2110                .remove(id)
2111                .expect("list populated after checking that self.collections contains it");
2112
2113            let instance = match &collection.extra_state {
2114                CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
2115                CollectionStateExtra::Export(export) => Some(export.cluster_id()),
2116                CollectionStateExtra::None => None,
2117            }
2118            .and_then(|i| self.instances.get(&i));
2119
2120            // Record how many replicas were running an export, so that we can
2121            // match `DroppedId` messages against it and eventually remove state
2122            // from `self.dropped_objects`.
2123            if let Some(instance) = instance {
2124                let active_replicas = instance.get_active_replicas_for_object(id);
2125                if !active_replicas.is_empty() {
2126                    self.dropped_objects.insert(*id, active_replicas);
2127                }
2128            }
2129        }
2130
2131        // Also let StorageCollections know!
2132        self.storage_collections
2133            .drop_collections_unvalidated(storage_metadata, sinks_to_drop);
2134    }
2135
2136    #[instrument(level = "debug")]
2137    fn append_table(
2138        &mut self,
2139        write_ts: Self::Timestamp,
2140        advance_to: Self::Timestamp,
2141        commands: Vec<(GlobalId, Vec<TableData>)>,
2142    ) -> Result<
2143        tokio::sync::oneshot::Receiver<Result<(), StorageError<Self::Timestamp>>>,
2144        StorageError<Self::Timestamp>,
2145    > {
2146        if self.read_only {
2147            // While in read only mode, ONLY collections that have been migrated
2148            // and need to be re-hydrated in read only mode can be written to.
2149            if !commands
2150                .iter()
2151                .all(|(id, _)| id.is_system() && self.migrated_storage_collections.contains(id))
2152            {
2153                return Err(StorageError::ReadOnly);
2154            }
2155        }
2156
2157        // TODO(petrosagg): validate appends against the expected RelationDesc of the collection
2158        for (id, updates) in commands.iter() {
2159            if !updates.is_empty() {
2160                if !write_ts.less_than(&advance_to) {
2161                    return Err(StorageError::UpdateBeyondUpper(*id));
2162                }
2163            }
2164        }
2165
2166        Ok(self
2167            .persist_table_worker
2168            .append(write_ts, advance_to, commands))
2169    }
2170
2171    fn monotonic_appender(
2172        &self,
2173        id: GlobalId,
2174    ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>> {
2175        self.collection_manager.monotonic_appender(id)
2176    }
2177
2178    fn webhook_statistics(
2179        &self,
2180        id: GlobalId,
2181    ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>> {
2182        // Call to this method are usually cached so the lock is not in the critical path.
2183        let source_statistics = self.source_statistics.lock().expect("poisoned");
2184        source_statistics
2185            .webhook_statistics
2186            .get(&id)
2187            .cloned()
2188            .ok_or(StorageError::IdentifierMissing(id))
2189    }
2190
2191    async fn ready(&mut self) {
2192        if self.maintenance_scheduled {
2193            return;
2194        }
2195
2196        if !self.pending_table_handle_drops_rx.is_empty() {
2197            return;
2198        }
2199
2200        tokio::select! {
2201            Some(m) = self.instance_response_rx.recv() => {
2202                self.stashed_responses.push(m);
2203                while let Ok(m) = self.instance_response_rx.try_recv() {
2204                    self.stashed_responses.push(m);
2205                }
2206            }
2207            _ = self.maintenance_ticker.tick() => {
2208                self.maintenance_scheduled = true;
2209            },
2210        };
2211    }
2212
2213    #[instrument(level = "debug")]
2214    fn process(
2215        &mut self,
2216        storage_metadata: &StorageMetadata,
2217    ) -> Result<Option<Response<T>>, anyhow::Error> {
2218        // Perform periodic maintenance work.
2219        if self.maintenance_scheduled {
2220            self.maintain();
2221            self.maintenance_scheduled = false;
2222        }
2223
2224        for instance in self.instances.values_mut() {
2225            instance.rehydrate_failed_replicas();
2226        }
2227
2228        let mut status_updates = vec![];
2229        let mut updated_frontiers = BTreeMap::new();
2230
2231        // Take the currently stashed responses so that we can call mut receiver functions in the loop.
2232        let stashed_responses = std::mem::take(&mut self.stashed_responses);
2233        for resp in stashed_responses {
2234            match resp {
2235                (_replica_id, StorageResponse::FrontierUpper(id, upper)) => {
2236                    self.update_write_frontier(id, &upper);
2237                    updated_frontiers.insert(id, upper);
2238                }
2239                (replica_id, StorageResponse::DroppedId(id)) => {
2240                    let replica_id = replica_id.expect("DroppedId from unknown replica");
2241                    if let Some(remaining_replicas) = self.dropped_objects.get_mut(&id) {
2242                        remaining_replicas.remove(&replica_id);
2243                        if remaining_replicas.is_empty() {
2244                            self.dropped_objects.remove(&id);
2245                        }
2246                    } else {
2247                        soft_panic_or_log!("unexpected DroppedId for {id}");
2248                    }
2249                }
2250                (_replica_id, StorageResponse::StatisticsUpdates(source_stats, sink_stats)) => {
2251                    // Note we only hold the locks while moving some plain-old-data around here.
2252                    //
2253                    // We just write the whole object, as the update from storage represents the
2254                    // current values.
2255                    //
2256                    // We don't overwrite removed objects, as we may have received a late
2257                    // `StatisticsUpdates` while we were shutting down the storage object.
2258                    {
2259                        let mut shared_stats = self.source_statistics.lock().expect("poisoned");
2260                        for stat in source_stats {
2261                            // Don't override it if its been removed.
2262                            shared_stats
2263                                .source_statistics
2264                                .entry(stat.id)
2265                                .and_modify(|current| current.stat().incorporate(stat));
2266                        }
2267                    }
2268
2269                    {
2270                        let mut shared_stats = self.sink_statistics.lock().expect("poisoned");
2271                        for stat in sink_stats {
2272                            // Don't override it if its been removed.
2273                            shared_stats
2274                                .entry(stat.id)
2275                                .and_modify(|current| current.stat().incorporate(stat));
2276                        }
2277                    }
2278                }
2279                (replica_id, StorageResponse::StatusUpdate(mut status_update)) => {
2280                    // NOTE(aljoscha): We sniff out the hydration status for
2281                    // ingestions from status updates. This is the easiest we
2282                    // can do right now, without going deeper into changing the
2283                    // comms protocol between controller and cluster. We cannot,
2284                    // for example use `StorageResponse::FrontierUpper`,
2285                    // because those will already get sent when the ingestion is
2286                    // just being created.
2287                    //
2288                    // Sources differ in when they will report as Running. Kafka
2289                    // UPSERT sources will only switch to `Running` once their
2290                    // state has been initialized from persist, which is the
2291                    // first case that we care about right now.
2292                    //
2293                    // I wouldn't say it's ideal, but it's workable until we
2294                    // find something better.
2295                    match status_update.status {
2296                        Status::Running => {
2297                            let collection = self.collections.get_mut(&status_update.id);
2298                            match collection {
2299                                Some(collection) => {
2300                                    match collection.extra_state {
2301                                        CollectionStateExtra::Ingestion(
2302                                            ref mut ingestion_state,
2303                                        ) => {
2304                                            if ingestion_state.hydrated_on.is_empty() {
2305                                                tracing::debug!(ingestion_id = %status_update.id, "ingestion is hydrated");
2306                                            }
2307                                            ingestion_state.hydrated_on.insert(replica_id.expect(
2308                                                "replica id should be present for status running",
2309                                            ));
2310                                        }
2311                                        CollectionStateExtra::Export(_) => {
2312                                            // TODO(sinks): track sink hydration?
2313                                        }
2314                                        CollectionStateExtra::None => {
2315                                            // Nothing to do
2316                                        }
2317                                    }
2318                                }
2319                                None => (), // no collection, let's say that's fine
2320                                            // here
2321                            }
2322                        }
2323                        Status::Paused => {
2324                            let collection = self.collections.get_mut(&status_update.id);
2325                            match collection {
2326                                Some(collection) => {
2327                                    match collection.extra_state {
2328                                        CollectionStateExtra::Ingestion(
2329                                            ref mut ingestion_state,
2330                                        ) => {
2331                                            // TODO: Paused gets send when there
2332                                            // are no active replicas. We should
2333                                            // change this to send a targeted
2334                                            // Pause for each replica, and do
2335                                            // more fine-grained hydration
2336                                            // tracking here.
2337                                            tracing::debug!(ingestion_id = %status_update.id, "ingestion is now paused");
2338                                            ingestion_state.hydrated_on.clear();
2339                                        }
2340                                        CollectionStateExtra::Export(_) => {
2341                                            // TODO(sinks): track sink hydration?
2342                                        }
2343                                        CollectionStateExtra::None => {
2344                                            // Nothing to do
2345                                        }
2346                                    }
2347                                }
2348                                None => (), // no collection, let's say that's fine
2349                                            // here
2350                            }
2351                        }
2352                        _ => (),
2353                    }
2354
2355                    // Set replica_id in the status update if available
2356                    if let Some(id) = replica_id {
2357                        status_update.replica_id = Some(id);
2358                    }
2359                    status_updates.push(status_update);
2360                }
2361                (_replica_id, StorageResponse::StagedBatches(batches)) => {
2362                    for (ingestion_id, batches) in batches {
2363                        match self.pending_oneshot_ingestions.remove(&ingestion_id) {
2364                            Some(pending) => {
2365                                // Send a cancel command so our command history is correct. And to
2366                                // avoid duplicate work once we have active replication.
2367                                if let Some(instance) = self.instances.get_mut(&pending.cluster_id)
2368                                {
2369                                    instance
2370                                        .send(StorageCommand::CancelOneshotIngestion(ingestion_id));
2371                                }
2372                                // Send the results down our channel.
2373                                (pending.result_tx)(batches)
2374                            }
2375                            // TODO(cf2): When we support running COPY FROM on multiple
2376                            // replicas we can probably just ignore the case of `None`.
2377                            None => mz_ore::soft_panic_or_log!("no sender for {ingestion_id}!"),
2378                        }
2379                    }
2380                }
2381            }
2382        }
2383
2384        self.record_status_updates(status_updates);
2385
2386        // Process dropped tables in a single batch.
2387        let mut dropped_table_ids = Vec::new();
2388        while let Ok(dropped_id) = self.pending_table_handle_drops_rx.try_recv() {
2389            dropped_table_ids.push(dropped_id);
2390        }
2391        if !dropped_table_ids.is_empty() {
2392            self.drop_sources(storage_metadata, dropped_table_ids)?;
2393        }
2394
2395        if updated_frontiers.is_empty() {
2396            Ok(None)
2397        } else {
2398            Ok(Some(Response::FrontierUpdates(
2399                updated_frontiers.into_iter().collect(),
2400            )))
2401        }
2402    }
2403
2404    async fn inspect_persist_state(
2405        &self,
2406        id: GlobalId,
2407    ) -> Result<serde_json::Value, anyhow::Error> {
2408        let collection = &self.storage_collections.collection_metadata(id)?;
2409        let client = self
2410            .persist
2411            .open(collection.persist_location.clone())
2412            .await?;
2413        let shard_state = client
2414            .inspect_shard::<Self::Timestamp>(&collection.data_shard)
2415            .await?;
2416        let json_state = serde_json::to_value(shard_state)?;
2417        Ok(json_state)
2418    }
2419
2420    fn append_introspection_updates(
2421        &mut self,
2422        type_: IntrospectionType,
2423        updates: Vec<(Row, Diff)>,
2424    ) {
2425        let id = self.introspection_ids[&type_];
2426        let updates = updates.into_iter().map(|update| update.into()).collect();
2427        self.collection_manager.blind_write(id, updates);
2428    }
2429
2430    fn append_status_introspection_updates(
2431        &mut self,
2432        type_: IntrospectionType,
2433        updates: Vec<StatusUpdate>,
2434    ) {
2435        let id = self.introspection_ids[&type_];
2436        let updates: Vec<_> = updates.into_iter().map(|update| update.into()).collect();
2437        if !updates.is_empty() {
2438            self.collection_manager.blind_write(id, updates);
2439        }
2440    }
2441
2442    fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp) {
2443        let id = self.introspection_ids[&type_];
2444        self.collection_manager.differential_write(id, op);
2445    }
2446
2447    fn append_only_introspection_tx(
2448        &self,
2449        type_: IntrospectionType,
2450    ) -> mpsc::UnboundedSender<(
2451        Vec<AppendOnlyUpdate>,
2452        oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2453    )> {
2454        let id = self.introspection_ids[&type_];
2455        self.collection_manager.append_only_write_sender(id)
2456    }
2457
2458    fn differential_introspection_tx(
2459        &self,
2460        type_: IntrospectionType,
2461    ) -> mpsc::UnboundedSender<(
2462        StorageWriteOp,
2463        oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2464    )> {
2465        let id = self.introspection_ids[&type_];
2466        self.collection_manager.differential_write_sender(id)
2467    }
2468
2469    async fn real_time_recent_timestamp(
2470        &self,
2471        timestamp_objects: BTreeSet<GlobalId>,
2472        timeout: Duration,
2473    ) -> Result<
2474        BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>,
2475        StorageError<Self::Timestamp>,
2476    > {
2477        use mz_storage_types::sources::GenericSourceConnection;
2478
2479        let mut rtr_futures = BTreeMap::new();
2480
2481        // Only user sources can be read from w/ RTR.
2482        for id in timestamp_objects.into_iter().filter(GlobalId::is_user) {
2483            let collection = match self.collection(id) {
2484                Ok(c) => c,
2485                // Not a storage item, which we accept.
2486                Err(_) => continue,
2487            };
2488
2489            let (source_conn, remap_id) = match &collection.data_source {
2490                DataSource::Ingestion(IngestionDescription {
2491                    desc: SourceDesc { connection, .. },
2492                    remap_collection_id,
2493                    ..
2494                }) => match connection {
2495                    GenericSourceConnection::Kafka(_)
2496                    | GenericSourceConnection::Postgres(_)
2497                    | GenericSourceConnection::MySql(_)
2498                    | GenericSourceConnection::SqlServer(_) => {
2499                        (connection.clone(), *remap_collection_id)
2500                    }
2501
2502                    // These internal sources do not yet (and might never)
2503                    // support RTR. However, erroring if they're selected from
2504                    // poses an annoying user experience, so instead just skip
2505                    // over them.
2506                    GenericSourceConnection::LoadGenerator(_) => continue,
2507                },
2508                // Skip over all other objects
2509                _ => {
2510                    continue;
2511                }
2512            };
2513
2514            // Prepare for getting the external system's frontier.
2515            let config = self.config().clone();
2516
2517            // Determine the remap collection we plan to read from.
2518            //
2519            // Note that the process of reading from the remap shard is the same
2520            // as other areas in this code that do the same thing, but we inline
2521            // it here because we must prove that we have not taken ownership of
2522            // `self` to move the stream of data from the remap shard into a
2523            // future.
2524            let read_handle = self.read_handle_for_snapshot(remap_id).await?;
2525
2526            // Have to acquire a read hold to prevent the since from advancing
2527            // while we read.
2528            let remap_read_hold = self
2529                .storage_collections
2530                .acquire_read_holds(vec![remap_id])
2531                .map_err(|_e| StorageError::ReadBeforeSince(remap_id))?
2532                .expect_element(|| "known to be exactly one");
2533
2534            let remap_as_of = remap_read_hold
2535                .since()
2536                .to_owned()
2537                .into_option()
2538                .ok_or(StorageError::ReadBeforeSince(remap_id))?;
2539
2540            rtr_futures.insert(
2541                id,
2542                tokio::time::timeout(timeout, async move {
2543                    use mz_storage_types::sources::SourceConnection as _;
2544
2545                    // Fetch the remap shard's contents; we must do this first so
2546                    // that the `as_of` doesn't change.
2547                    let as_of = Antichain::from_elem(remap_as_of);
2548                    let remap_subscribe = read_handle
2549                        .subscribe(as_of.clone())
2550                        .await
2551                        .map_err(|_| StorageError::ReadBeforeSince(remap_id))?;
2552
2553                    tracing::debug!(?id, type_ = source_conn.name(), upstream = ?source_conn.external_reference(), "fetching real time recency");
2554
2555                    let result = rtr::real_time_recency_ts(source_conn, id, config, as_of, remap_subscribe)
2556                        .await.map_err(|e| {
2557                            tracing::debug!(?id, "real time recency error: {:?}", e);
2558                            e
2559                        });
2560
2561                    // Drop once we have read succesfully.
2562                    drop(remap_read_hold);
2563
2564                    result
2565                }),
2566            );
2567        }
2568
2569        Ok(Box::pin(async move {
2570            let (ids, futs): (Vec<_>, Vec<_>) = rtr_futures.into_iter().unzip();
2571            ids.into_iter()
2572                .zip_eq(futures::future::join_all(futs).await)
2573                .try_fold(T::minimum(), |curr, (id, per_source_res)| {
2574                    let new =
2575                        per_source_res.map_err(|_e: Elapsed| StorageError::RtrTimeout(id))??;
2576                    Ok::<_, StorageError<Self::Timestamp>>(std::cmp::max(curr, new))
2577                })
2578        }))
2579    }
2580}
2581
2582/// Seed [`StorageTxn`] with any state required to instantiate a
2583/// [`StorageController`].
2584///
2585/// This cannot be a member of [`StorageController`] because it cannot take a
2586/// `self` parameter.
2587///
2588pub fn prepare_initialization<T>(txn: &mut dyn StorageTxn<T>) -> Result<(), StorageError<T>> {
2589    if txn.get_txn_wal_shard().is_none() {
2590        let txns_id = ShardId::new();
2591        txn.write_txn_wal_shard(txns_id)?;
2592    }
2593
2594    Ok(())
2595}
2596
2597impl<T> Controller<T>
2598where
2599    T: Timestamp
2600        + Lattice
2601        + TotalOrder
2602        + Codec64
2603        + From<EpochMillis>
2604        + TimestampManipulation
2605        + Into<Datum<'static>>,
2606    StorageCommand<T>: RustType<ProtoStorageCommand>,
2607    StorageResponse<T>: RustType<ProtoStorageResponse>,
2608    Self: StorageController<Timestamp = T>,
2609{
2610    /// Create a new storage controller from a client it should wrap.
2611    ///
2612    /// Note that when creating a new storage controller, you must also
2613    /// reconcile it with the previous state.
2614    ///
2615    /// # Panics
2616    /// If this function is called before [`prepare_initialization`].
2617    pub async fn new(
2618        build_info: &'static BuildInfo,
2619        persist_location: PersistLocation,
2620        persist_clients: Arc<PersistClientCache>,
2621        now: NowFn,
2622        wallclock_lag: WallclockLagFn<T>,
2623        txns_metrics: Arc<TxnMetrics>,
2624        envd_epoch: NonZeroI64,
2625        read_only: bool,
2626        metrics_registry: &MetricsRegistry,
2627        controller_metrics: ControllerMetrics,
2628        connection_context: ConnectionContext,
2629        txn: &dyn StorageTxn<T>,
2630        storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
2631    ) -> Self {
2632        let txns_client = persist_clients
2633            .open(persist_location.clone())
2634            .await
2635            .expect("location should be valid");
2636
2637        let persist_warm_task = warm_persist_state_in_background(
2638            txns_client.clone(),
2639            txn.get_collection_metadata().into_values(),
2640        );
2641        let persist_warm_task = Some(persist_warm_task.abort_on_drop());
2642
2643        // This value must be already installed because we must ensure it's
2644        // durably recorded before it is used, otherwise we risk leaking persist
2645        // state.
2646        let txns_id = txn
2647            .get_txn_wal_shard()
2648            .expect("must call prepare initialization before creating storage controller");
2649
2650        let persist_table_worker = if read_only {
2651            let txns_write = txns_client
2652                .open_writer(
2653                    txns_id,
2654                    Arc::new(TxnsCodecRow::desc()),
2655                    Arc::new(UnitSchema),
2656                    Diagnostics {
2657                        shard_name: "txns".to_owned(),
2658                        handle_purpose: "follow txns upper".to_owned(),
2659                    },
2660                )
2661                .await
2662                .expect("txns schema shouldn't change");
2663            persist_handles::PersistTableWriteWorker::new_read_only_mode(txns_write)
2664        } else {
2665            let txns = TxnsHandle::open(
2666                T::minimum(),
2667                txns_client.clone(),
2668                txns_client.dyncfgs().clone(),
2669                Arc::clone(&txns_metrics),
2670                txns_id,
2671            )
2672            .await;
2673            persist_handles::PersistTableWriteWorker::new_txns(txns)
2674        };
2675        let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
2676
2677        let collection_manager = collection_mgmt::CollectionManager::new(read_only, now.clone());
2678
2679        let introspection_ids = BTreeMap::new();
2680        let introspection_tokens = Arc::new(Mutex::new(BTreeMap::new()));
2681
2682        let (statistics_interval_sender, _) =
2683            channel(mz_storage_types::parameters::STATISTICS_INTERVAL_DEFAULT);
2684
2685        let (pending_table_handle_drops_tx, pending_table_handle_drops_rx) =
2686            tokio::sync::mpsc::unbounded_channel();
2687
2688        let mut maintenance_ticker = tokio::time::interval(Duration::from_secs(1));
2689        maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2690
2691        let (instance_response_tx, instance_response_rx) = mpsc::unbounded_channel();
2692
2693        let metrics = StorageControllerMetrics::new(metrics_registry, controller_metrics);
2694
2695        let now_dt = mz_ore::now::to_datetime(now());
2696
2697        Self {
2698            build_info,
2699            collections: BTreeMap::default(),
2700            dropped_objects: Default::default(),
2701            persist_table_worker,
2702            txns_read,
2703            txns_metrics,
2704            stashed_responses: vec![],
2705            pending_table_handle_drops_tx,
2706            pending_table_handle_drops_rx,
2707            pending_oneshot_ingestions: BTreeMap::default(),
2708            collection_manager,
2709            introspection_ids,
2710            introspection_tokens,
2711            now,
2712            envd_epoch,
2713            read_only,
2714            source_statistics: Arc::new(Mutex::new(statistics::SourceStatistics {
2715                source_statistics: BTreeMap::new(),
2716                webhook_statistics: BTreeMap::new(),
2717            })),
2718            sink_statistics: Arc::new(Mutex::new(BTreeMap::new())),
2719            statistics_interval_sender,
2720            instances: BTreeMap::new(),
2721            initialized: false,
2722            config: StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs()),
2723            persist_location,
2724            persist: persist_clients,
2725            metrics,
2726            recorded_frontiers: BTreeMap::new(),
2727            recorded_replica_frontiers: BTreeMap::new(),
2728            wallclock_lag,
2729            wallclock_lag_last_recorded: now_dt,
2730            storage_collections,
2731            migrated_storage_collections: BTreeSet::new(),
2732            maintenance_ticker,
2733            maintenance_scheduled: false,
2734            instance_response_rx,
2735            instance_response_tx,
2736            persist_warm_task,
2737        }
2738    }
2739
2740    // This is different from `set_read_policies`, which is for external users.
2741    // This method is for setting the policy that the controller uses when
2742    // maintaining the read holds that it has for collections/exports at the
2743    // StorageCollections.
2744    //
2745    // This is really only used when dropping things, where we set the
2746    // ReadPolicy to the empty Antichain.
2747    #[instrument(level = "debug")]
2748    fn set_hold_policies(&mut self, policies: Vec<(GlobalId, ReadPolicy<T>)>) {
2749        let mut read_capability_changes = BTreeMap::default();
2750
2751        for (id, policy) in policies.into_iter() {
2752            if let Some(collection) = self.collections.get_mut(&id) {
2753                let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state
2754                {
2755                    CollectionStateExtra::Ingestion(ingestion) => (
2756                        ingestion.write_frontier.borrow(),
2757                        &mut ingestion.derived_since,
2758                        &mut ingestion.hold_policy,
2759                    ),
2760                    CollectionStateExtra::None => {
2761                        unreachable!("set_hold_policies is only called for ingestions");
2762                    }
2763                    CollectionStateExtra::Export(export) => (
2764                        export.write_frontier.borrow(),
2765                        &mut export.derived_since,
2766                        &mut export.read_policy,
2767                    ),
2768                };
2769
2770                let new_derived_since = policy.frontier(write_frontier);
2771                let mut update = swap_updates(derived_since, new_derived_since);
2772                if !update.is_empty() {
2773                    read_capability_changes.insert(id, update);
2774                }
2775
2776                *hold_policy = policy;
2777            }
2778        }
2779
2780        if !read_capability_changes.is_empty() {
2781            self.update_hold_capabilities(&mut read_capability_changes);
2782        }
2783    }
2784
2785    #[instrument(level = "debug", fields(updates))]
2786    fn update_write_frontier(&mut self, id: GlobalId, new_upper: &Antichain<T>) {
2787        let mut read_capability_changes = BTreeMap::default();
2788
2789        if let Some(collection) = self.collections.get_mut(&id) {
2790            let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state {
2791                CollectionStateExtra::Ingestion(ingestion) => (
2792                    &mut ingestion.write_frontier,
2793                    &mut ingestion.derived_since,
2794                    &ingestion.hold_policy,
2795                ),
2796                CollectionStateExtra::None => {
2797                    if matches!(collection.data_source, DataSource::Progress) {
2798                        // We do get these, but can't do anything with it!
2799                    } else {
2800                        tracing::error!(
2801                            ?collection,
2802                            ?new_upper,
2803                            "updated write frontier for collection which is not an ingestion"
2804                        );
2805                    }
2806                    return;
2807                }
2808                CollectionStateExtra::Export(export) => (
2809                    &mut export.write_frontier,
2810                    &mut export.derived_since,
2811                    &export.read_policy,
2812                ),
2813            };
2814
2815            if PartialOrder::less_than(write_frontier, new_upper) {
2816                write_frontier.clone_from(new_upper);
2817            }
2818
2819            let new_derived_since = hold_policy.frontier(write_frontier.borrow());
2820            let mut update = swap_updates(derived_since, new_derived_since);
2821            if !update.is_empty() {
2822                read_capability_changes.insert(id, update);
2823            }
2824        } else if self.dropped_objects.contains_key(&id) {
2825            // We dropped an object but might still get updates from cluster
2826            // side, before it notices the drop. This is expected and fine.
2827        } else {
2828            soft_panic_or_log!("spurious upper update for {id}: {new_upper:?}");
2829        }
2830
2831        if !read_capability_changes.is_empty() {
2832            self.update_hold_capabilities(&mut read_capability_changes);
2833        }
2834    }
2835
2836    // This is different from `update_read_capabilities`, which is for external users.
2837    // This method is for maintaining the read holds that the controller has at
2838    // the StorageCollections, for storage dependencies.
2839    #[instrument(level = "debug", fields(updates))]
2840    fn update_hold_capabilities(&mut self, updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>) {
2841        // Location to record consequences that we need to act on.
2842        let mut collections_net = BTreeMap::new();
2843
2844        // We must not rely on any specific relative ordering of `GlobalId`s.
2845        // That said, it is reasonable to assume that collections generally have
2846        // greater IDs than their dependencies, so starting with the largest is
2847        // a useful optimization.
2848        while let Some(key) = updates.keys().rev().next().cloned() {
2849            let mut update = updates.remove(&key).unwrap();
2850
2851            if key.is_user() {
2852                debug!(id = %key, ?update, "update_hold_capability");
2853            }
2854
2855            if let Some(collection) = self.collections.get_mut(&key) {
2856                match &mut collection.extra_state {
2857                    CollectionStateExtra::Ingestion(ingestion) => {
2858                        let changes = ingestion.read_capabilities.update_iter(update.drain());
2859                        update.extend(changes);
2860
2861                        let (changes, frontier, _cluster_id) =
2862                            collections_net.entry(key).or_insert_with(|| {
2863                                (
2864                                    <ChangeBatch<_>>::new(),
2865                                    Antichain::new(),
2866                                    ingestion.instance_id,
2867                                )
2868                            });
2869
2870                        changes.extend(update.drain());
2871                        *frontier = ingestion.read_capabilities.frontier().to_owned();
2872                    }
2873                    CollectionStateExtra::None => {
2874                        // WIP: See if this ever panics in ci.
2875                        soft_panic_or_log!(
2876                            "trying to update holds for collection {collection:?} which is not \
2877                             an ingestion: {update:?}"
2878                        );
2879                        continue;
2880                    }
2881                    CollectionStateExtra::Export(export) => {
2882                        let changes = export.read_capabilities.update_iter(update.drain());
2883                        update.extend(changes);
2884
2885                        let (changes, frontier, _cluster_id) =
2886                            collections_net.entry(key).or_insert_with(|| {
2887                                (<ChangeBatch<_>>::new(), Antichain::new(), export.cluster_id)
2888                            });
2889
2890                        changes.extend(update.drain());
2891                        *frontier = export.read_capabilities.frontier().to_owned();
2892                    }
2893                }
2894            } else {
2895                // This is confusing and we should probably error.
2896                tracing::warn!(id = ?key, ?update, "update_hold_capabilities for unknown object");
2897            }
2898        }
2899
2900        // Translate our net compute actions into `AllowCompaction` commands and
2901        // downgrade persist sinces.
2902        for (key, (mut changes, frontier, cluster_id)) in collections_net {
2903            if !changes.is_empty() {
2904                if key.is_user() {
2905                    debug!(id = %key, ?frontier, "downgrading ingestion read holds!");
2906                }
2907
2908                let collection = self
2909                    .collections
2910                    .get_mut(&key)
2911                    .expect("missing collection state");
2912
2913                let read_holds = match &mut collection.extra_state {
2914                    CollectionStateExtra::Ingestion(ingestion) => {
2915                        ingestion.dependency_read_holds.as_mut_slice()
2916                    }
2917                    CollectionStateExtra::Export(export) => export.read_holds.as_mut_slice(),
2918                    CollectionStateExtra::None => {
2919                        soft_panic_or_log!(
2920                            "trying to downgrade read holds for collection which is not an \
2921                             ingestion: {collection:?}"
2922                        );
2923                        continue;
2924                    }
2925                };
2926
2927                for read_hold in read_holds.iter_mut() {
2928                    read_hold
2929                        .try_downgrade(frontier.clone())
2930                        .expect("we only advance the frontier");
2931                }
2932
2933                // Send AllowCompaction command directly to the instance
2934                if let Some(instance) = self.instances.get_mut(&cluster_id) {
2935                    instance.send(StorageCommand::AllowCompaction(key, frontier.clone()));
2936                } else {
2937                    soft_panic_or_log!(
2938                        "missing instance client for cluster {cluster_id} while we still have outstanding AllowCompaction command {frontier:?} for {key}"
2939                    );
2940                }
2941            }
2942        }
2943    }
2944
2945    /// Validate that a collection exists for all identifiers, and error if any do not.
2946    fn validate_collection_ids(
2947        &self,
2948        ids: impl Iterator<Item = GlobalId>,
2949    ) -> Result<(), StorageError<T>> {
2950        for id in ids {
2951            self.storage_collections.check_exists(id)?;
2952        }
2953        Ok(())
2954    }
2955
2956    /// Validate that a collection exists for all identifiers, and error if any do not.
2957    fn validate_export_ids(
2958        &self,
2959        ids: impl Iterator<Item = GlobalId>,
2960    ) -> Result<(), StorageError<T>> {
2961        for id in ids {
2962            self.export(id)?;
2963        }
2964        Ok(())
2965    }
2966
2967    /// Opens a write and critical since handles for the given `shard`.
2968    ///
2969    /// `since` is an optional `since` that the read handle will be forwarded to if it is less than
2970    /// its current since.
2971    ///
2972    /// This will `halt!` the process if we cannot successfully acquire a critical handle with our
2973    /// current epoch.
2974    async fn open_data_handles(
2975        &self,
2976        id: &GlobalId,
2977        shard: ShardId,
2978        relation_desc: RelationDesc,
2979        persist_client: &PersistClient,
2980    ) -> WriteHandle<SourceData, (), T, StorageDiff> {
2981        let diagnostics = Diagnostics {
2982            shard_name: id.to_string(),
2983            handle_purpose: format!("controller data for {}", id),
2984        };
2985
2986        let mut write = persist_client
2987            .open_writer(
2988                shard,
2989                Arc::new(relation_desc),
2990                Arc::new(UnitSchema),
2991                diagnostics.clone(),
2992            )
2993            .await
2994            .expect("invalid persist usage");
2995
2996        // N.B.
2997        // Fetch the most recent upper for the write handle. Otherwise, this may be behind
2998        // the since of the since handle. Its vital this happens AFTER we create
2999        // the since handle as it needs to be linearized with that operation. It may be true
3000        // that creating the write handle after the since handle already ensures this, but we
3001        // do this out of an abundance of caution.
3002        //
3003        // Note that this returns the upper, but also sets it on the handle to be fetched later.
3004        write.fetch_recent_upper().await;
3005
3006        write
3007    }
3008
3009    /// Registers the given introspection collection and does any preparatory
3010    /// work that we have to do before we start writing to it. This
3011    /// preparatory work will include partial truncation or other cleanup
3012    /// schemes, depending on introspection type.
3013    fn register_introspection_collection(
3014        &mut self,
3015        id: GlobalId,
3016        introspection_type: IntrospectionType,
3017        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
3018        persist_client: PersistClient,
3019    ) -> Result<(), StorageError<T>> {
3020        tracing::info!(%id, ?introspection_type, "registering introspection collection");
3021
3022        // In read-only mode we create a new shard for all migrated storage collections. So we
3023        // "trick" the write task into thinking that it's not in read-only mode so something is
3024        // advancing this new shard.
3025        let force_writable = self.read_only && self.migrated_storage_collections.contains(&id);
3026        if force_writable {
3027            assert!(id.is_system(), "unexpected non-system global id: {id:?}");
3028            info!("writing to migrated storage collection {id} in read-only mode");
3029        }
3030
3031        let prev = self.introspection_ids.insert(introspection_type, id);
3032        assert!(
3033            prev.is_none(),
3034            "cannot have multiple IDs for introspection type"
3035        );
3036
3037        let metadata = self.storage_collections.collection_metadata(id)?.clone();
3038
3039        let read_handle_fn = move || {
3040            let persist_client = persist_client.clone();
3041            let metadata = metadata.clone();
3042
3043            let fut = async move {
3044                let read_handle = persist_client
3045                    .open_leased_reader::<SourceData, (), T, StorageDiff>(
3046                        metadata.data_shard,
3047                        Arc::new(metadata.relation_desc.clone()),
3048                        Arc::new(UnitSchema),
3049                        Diagnostics {
3050                            shard_name: id.to_string(),
3051                            handle_purpose: format!("snapshot {}", id),
3052                        },
3053                        USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3054                    )
3055                    .await
3056                    .expect("invalid persist usage");
3057                read_handle
3058            };
3059
3060            fut.boxed()
3061        };
3062
3063        let recent_upper = write_handle.shared_upper();
3064
3065        match CollectionManagerKind::from(&introspection_type) {
3066            // For these, we first register the collection and then prepare it,
3067            // because the code that prepares differential collection expects to
3068            // be able to update desired state via the collection manager
3069            // already.
3070            CollectionManagerKind::Differential => {
3071                // These do a shallow copy.
3072                let introspection_config = DifferentialIntrospectionConfig {
3073                    recent_upper,
3074                    introspection_type,
3075                    storage_collections: Arc::clone(&self.storage_collections),
3076                    collection_manager: self.collection_manager.clone(),
3077                    source_statistics: Arc::clone(&self.source_statistics),
3078                    sink_statistics: Arc::clone(&self.sink_statistics),
3079                    statistics_interval: self.config.parameters.statistics_interval.clone(),
3080                    statistics_interval_receiver: self.statistics_interval_sender.subscribe(),
3081                    metrics: self.metrics.clone(),
3082                    introspection_tokens: Arc::clone(&self.introspection_tokens),
3083                };
3084                self.collection_manager.register_differential_collection(
3085                    id,
3086                    write_handle,
3087                    read_handle_fn,
3088                    force_writable,
3089                    introspection_config,
3090                );
3091            }
3092            // For these, we first have to prepare and then register with
3093            // collection manager, because the preparation logic wants to read
3094            // the shard's contents and then do uncontested writes.
3095            //
3096            // TODO(aljoscha): We should make the truncation/cleanup work that
3097            // happens when we take over instead be a periodic thing, and make
3098            // it resilient to the upper moving concurrently.
3099            CollectionManagerKind::AppendOnly => {
3100                let introspection_config = AppendOnlyIntrospectionConfig {
3101                    introspection_type,
3102                    config_set: Arc::clone(self.config.config_set()),
3103                    parameters: self.config.parameters.clone(),
3104                    storage_collections: Arc::clone(&self.storage_collections),
3105                };
3106                self.collection_manager.register_append_only_collection(
3107                    id,
3108                    write_handle,
3109                    force_writable,
3110                    Some(introspection_config),
3111                );
3112            }
3113        }
3114
3115        Ok(())
3116    }
3117
3118    /// Remove statistics for sources/sinks that were dropped but still have statistics rows
3119    /// hanging around.
3120    fn reconcile_dangling_statistics(&self) {
3121        self.source_statistics
3122            .lock()
3123            .expect("poisoned")
3124            .source_statistics
3125            // collections should also contain subsources.
3126            .retain(|k, _| self.storage_collections.check_exists(*k).is_ok());
3127        self.sink_statistics
3128            .lock()
3129            .expect("poisoned")
3130            .retain(|k, _| self.export(*k).is_ok());
3131    }
3132
3133    /// Appends a new global ID, shard ID pair to the appropriate collection.
3134    /// Use a `diff` of 1 to append a new entry; -1 to retract an existing
3135    /// entry.
3136    ///
3137    /// # Panics
3138    /// - If `self.collections` does not have an entry for `global_id`.
3139    /// - If `IntrospectionType::ShardMapping`'s `GlobalId` is not registered as
3140    ///   a managed collection.
3141    /// - If diff is any value other than `1` or `-1`.
3142    #[instrument(level = "debug")]
3143    fn append_shard_mappings<I>(&self, global_ids: I, diff: Diff)
3144    where
3145        I: Iterator<Item = GlobalId>,
3146    {
3147        mz_ore::soft_assert_or_log!(
3148            diff == Diff::MINUS_ONE || diff == Diff::ONE,
3149            "use 1 for insert or -1 for delete"
3150        );
3151
3152        let id = *self
3153            .introspection_ids
3154            .get(&IntrospectionType::ShardMapping)
3155            .expect("should be registered before this call");
3156
3157        let mut updates = vec![];
3158        // Pack updates into rows
3159        let mut row_buf = Row::default();
3160
3161        for global_id in global_ids {
3162            let shard_id = if let Some(collection) = self.collections.get(&global_id) {
3163                collection.collection_metadata.data_shard.clone()
3164            } else {
3165                panic!("unknown global id: {}", global_id);
3166            };
3167
3168            let mut packer = row_buf.packer();
3169            packer.push(Datum::from(global_id.to_string().as_str()));
3170            packer.push(Datum::from(shard_id.to_string().as_str()));
3171            updates.push((row_buf.clone(), diff));
3172        }
3173
3174        self.collection_manager.differential_append(id, updates);
3175    }
3176
3177    /// Determines and returns this collection's dependencies, if any.
3178    fn determine_collection_dependencies(
3179        &self,
3180        self_id: GlobalId,
3181        data_source: &DataSource<T>,
3182    ) -> Result<Vec<GlobalId>, StorageError<T>> {
3183        let dependency = match &data_source {
3184            DataSource::Introspection(_)
3185            | DataSource::Webhook
3186            | DataSource::Table { primary: None }
3187            | DataSource::Progress
3188            | DataSource::Other => vec![],
3189            DataSource::Table {
3190                primary: Some(primary),
3191            } => vec![*primary],
3192            DataSource::IngestionExport { ingestion_id, .. } => {
3193                // Ingestion exports depend on their primary source's remap
3194                // collection.
3195                let source_collection = self.collection(*ingestion_id)?;
3196                let ingestion_remap_collection_id = match &source_collection.data_source {
3197                    DataSource::Ingestion(ingestion) => ingestion.remap_collection_id,
3198                    _ => unreachable!(
3199                        "SourceExport must only refer to primary sources that already exist"
3200                    ),
3201                };
3202
3203                // Ingestion exports (aka. subsources) must make sure that 1)
3204                // their own collection's since stays one step behind the upper,
3205                // and, 2) that the remap shard's since stays one step behind
3206                // their upper. Hence they track themselves and the remap shard
3207                // as dependencies.
3208                vec![self_id, ingestion_remap_collection_id]
3209            }
3210            // Ingestions depend on their remap collection.
3211            DataSource::Ingestion(ingestion) => {
3212                // Ingestions must make sure that 1) their own collection's
3213                // since stays one step behind the upper, and, 2) that the remap
3214                // shard's since stays one step behind their upper. Hence they
3215                // track themselves and the remap shard as dependencies.
3216                vec![self_id, ingestion.remap_collection_id]
3217            }
3218            DataSource::Sink { desc } => {
3219                // Sinks hold back their own frontier and the frontier of their input.
3220                vec![self_id, desc.sink.from]
3221            }
3222        };
3223
3224        Ok(dependency)
3225    }
3226
3227    async fn read_handle_for_snapshot(
3228        &self,
3229        id: GlobalId,
3230    ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
3231        let metadata = self.storage_collections.collection_metadata(id)?;
3232        read_handle_for_snapshot(&self.persist, id, &metadata).await
3233    }
3234
3235    /// Handles writing of status updates for sources/sinks to the appropriate
3236    /// status relation
3237    fn record_status_updates(&mut self, updates: Vec<StatusUpdate>) {
3238        if self.read_only {
3239            return;
3240        }
3241
3242        let mut sink_status_updates = vec![];
3243        let mut source_status_updates = vec![];
3244
3245        for update in updates {
3246            let id = update.id;
3247            if self.export(id).is_ok() {
3248                sink_status_updates.push(update);
3249            } else if self.storage_collections.check_exists(id).is_ok() {
3250                source_status_updates.push(update);
3251            }
3252        }
3253
3254        self.append_status_introspection_updates(
3255            IntrospectionType::SourceStatusHistory,
3256            source_status_updates,
3257        );
3258        self.append_status_introspection_updates(
3259            IntrospectionType::SinkStatusHistory,
3260            sink_status_updates,
3261        );
3262    }
3263
3264    fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, StorageError<T>> {
3265        self.collections
3266            .get(&id)
3267            .ok_or(StorageError::IdentifierMissing(id))
3268    }
3269
3270    /// Runs the identified ingestion using the current definition of the
3271    /// ingestion in-memory.
3272    fn run_ingestion(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3273        tracing::info!(%id, "starting ingestion");
3274
3275        let collection = self.collection(id)?;
3276        let ingestion_description = match &collection.data_source {
3277            DataSource::Ingestion(i) => i.clone(),
3278            _ => {
3279                tracing::warn!("run_ingestion called on non-ingestion ID {}", id);
3280                Err(StorageError::IdentifierInvalid(id))?
3281            }
3282        };
3283
3284        // Enrich all of the exports with their metadata
3285        let mut source_exports = BTreeMap::new();
3286        for (
3287            export_id,
3288            SourceExport {
3289                storage_metadata: (),
3290                details,
3291                data_config,
3292            },
3293        ) in ingestion_description.source_exports.clone()
3294        {
3295            let export_storage_metadata = self.collection(export_id)?.collection_metadata.clone();
3296            source_exports.insert(
3297                export_id,
3298                SourceExport {
3299                    storage_metadata: export_storage_metadata,
3300                    details,
3301                    data_config,
3302                },
3303            );
3304        }
3305
3306        let description = IngestionDescription::<CollectionMetadata> {
3307            source_exports,
3308            // The ingestion metadata is simply the collection metadata of the collection with
3309            // the associated ingestion
3310            ingestion_metadata: collection.collection_metadata.clone(),
3311            // The rest of the fields are identical
3312            desc: ingestion_description.desc.clone(),
3313            instance_id: ingestion_description.instance_id,
3314            remap_collection_id: ingestion_description.remap_collection_id,
3315        };
3316
3317        let storage_instance_id = description.instance_id;
3318        // Fetch the client for this ingestion's instance.
3319        let instance = self
3320            .instances
3321            .get_mut(&storage_instance_id)
3322            .ok_or_else(|| StorageError::IngestionInstanceMissing {
3323                storage_instance_id,
3324                ingestion_id: id,
3325            })?;
3326
3327        let augmented_ingestion = Box::new(RunIngestionCommand { id, description });
3328        instance.send(StorageCommand::RunIngestion(augmented_ingestion));
3329
3330        Ok(())
3331    }
3332
3333    /// Runs the identified export using the current definition of the export
3334    /// that we have in memory.
3335    fn run_export(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3336        let DataSource::Sink { desc: description } = &self.collections[&id].data_source else {
3337            return Err(StorageError::IdentifierMissing(id));
3338        };
3339
3340        let from_storage_metadata = self
3341            .storage_collections
3342            .collection_metadata(description.sink.from)?;
3343        let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
3344
3345        // Choose an as-of frontier for this execution of the sink. If the write frontier of the sink
3346        // is strictly larger than its read hold, it must have at least written out its snapshot, and we can skip
3347        // reading it; otherwise assume we may have to replay from the beginning.
3348        let enable_snapshot_frontier =
3349            dyncfgs::STORAGE_SINK_SNAPSHOT_FRONTIER.get(self.config().config_set());
3350        let export_state = self.storage_collections.collection_frontiers(id)?;
3351        let mut as_of = description.sink.as_of.clone();
3352        as_of.join_assign(&export_state.implied_capability);
3353        let with_snapshot = if enable_snapshot_frontier
3354            && PartialOrder::less_than(&as_of, &export_state.write_frontier)
3355        {
3356            false
3357        } else {
3358            description.sink.with_snapshot
3359        };
3360
3361        info!(
3362            sink_id = %id,
3363            from_id = %description.sink.from,
3364            write_frontier = ?export_state.write_frontier,
3365            ?as_of,
3366            ?with_snapshot,
3367            "run_export"
3368        );
3369
3370        let cmd = RunSinkCommand {
3371            id,
3372            description: StorageSinkDesc {
3373                from: description.sink.from,
3374                from_desc: description.sink.from_desc.clone(),
3375                connection: description.sink.connection.clone(),
3376                envelope: description.sink.envelope,
3377                as_of,
3378                version: description.sink.version,
3379                from_storage_metadata,
3380                with_snapshot,
3381                to_storage_metadata,
3382            },
3383        };
3384
3385        let storage_instance_id = description.instance_id.clone();
3386
3387        let instance = self
3388            .instances
3389            .get_mut(&storage_instance_id)
3390            .ok_or_else(|| StorageError::ExportInstanceMissing {
3391                storage_instance_id,
3392                export_id: id,
3393            })?;
3394
3395        instance.send(StorageCommand::RunSink(Box::new(cmd)));
3396
3397        Ok(())
3398    }
3399
3400    /// Update introspection with the current frontiers of storage objects.
3401    ///
3402    /// This method is invoked by `Controller::maintain`, which we expect to be called once per
3403    /// second during normal operation.
3404    fn update_frontier_introspection(&mut self) {
3405        let mut global_frontiers = BTreeMap::new();
3406        let mut replica_frontiers = BTreeMap::new();
3407
3408        for collection_frontiers in self.storage_collections.active_collection_frontiers() {
3409            let id = collection_frontiers.id;
3410            let since = collection_frontiers.read_capabilities;
3411            let upper = collection_frontiers.write_frontier;
3412
3413            let instance = self
3414                .collections
3415                .get(&id)
3416                .and_then(|collection_state| match &collection_state.extra_state {
3417                    CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
3418                    CollectionStateExtra::Export(export) => Some(export.cluster_id()),
3419                    CollectionStateExtra::None => None,
3420                })
3421                .and_then(|i| self.instances.get(&i));
3422
3423            if let Some(instance) = instance {
3424                for replica_id in instance.replica_ids() {
3425                    replica_frontiers.insert((id, replica_id), upper.clone());
3426                }
3427            }
3428
3429            global_frontiers.insert(id, (since, upper));
3430        }
3431
3432        let mut global_updates = Vec::new();
3433        let mut replica_updates = Vec::new();
3434
3435        let mut push_global_update =
3436            |id: GlobalId, (since, upper): (Antichain<T>, Antichain<T>), diff: Diff| {
3437                let read_frontier = since.into_option().map_or(Datum::Null, |t| t.into());
3438                let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3439                let row = Row::pack_slice(&[
3440                    Datum::String(&id.to_string()),
3441                    read_frontier,
3442                    write_frontier,
3443                ]);
3444                global_updates.push((row, diff));
3445            };
3446
3447        let mut push_replica_update =
3448            |(id, replica_id): (GlobalId, ReplicaId), upper: Antichain<T>, diff: Diff| {
3449                let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3450                let row = Row::pack_slice(&[
3451                    Datum::String(&id.to_string()),
3452                    Datum::String(&replica_id.to_string()),
3453                    write_frontier,
3454                ]);
3455                replica_updates.push((row, diff));
3456            };
3457
3458        let mut old_global_frontiers =
3459            std::mem::replace(&mut self.recorded_frontiers, global_frontiers);
3460        for (&id, new) in &self.recorded_frontiers {
3461            match old_global_frontiers.remove(&id) {
3462                Some(old) if &old != new => {
3463                    push_global_update(id, new.clone(), Diff::ONE);
3464                    push_global_update(id, old, Diff::MINUS_ONE);
3465                }
3466                Some(_) => (),
3467                None => push_global_update(id, new.clone(), Diff::ONE),
3468            }
3469        }
3470        for (id, old) in old_global_frontiers {
3471            push_global_update(id, old, Diff::MINUS_ONE);
3472        }
3473
3474        let mut old_replica_frontiers =
3475            std::mem::replace(&mut self.recorded_replica_frontiers, replica_frontiers);
3476        for (&key, new) in &self.recorded_replica_frontiers {
3477            match old_replica_frontiers.remove(&key) {
3478                Some(old) if &old != new => {
3479                    push_replica_update(key, new.clone(), Diff::ONE);
3480                    push_replica_update(key, old, Diff::MINUS_ONE);
3481                }
3482                Some(_) => (),
3483                None => push_replica_update(key, new.clone(), Diff::ONE),
3484            }
3485        }
3486        for (key, old) in old_replica_frontiers {
3487            push_replica_update(key, old, Diff::MINUS_ONE);
3488        }
3489
3490        let id = self.introspection_ids[&IntrospectionType::Frontiers];
3491        self.collection_manager
3492            .differential_append(id, global_updates);
3493
3494        let id = self.introspection_ids[&IntrospectionType::ReplicaFrontiers];
3495        self.collection_manager
3496            .differential_append(id, replica_updates);
3497    }
3498
3499    /// Refresh the wallclock lag introspection and metrics with the current lag values.
3500    ///
3501    /// We measure the lag of write frontiers behind the wallclock time every second and track the
3502    /// maximum over 60 measurements (i.e., one minute). Every minute, we emit a new lag event to
3503    /// the `WallclockLagHistory` introspection with the current maximum.
3504    ///
3505    /// This method is invoked by `ComputeController::maintain`, which we expect to be called once
3506    /// per second during normal operation.
3507    fn refresh_wallclock_lag(&mut self) {
3508        // Record lags to persist, if it's time.
3509        self.maybe_record_wallclock_lag();
3510
3511        let now_ms = (self.now)();
3512        let histogram_period =
3513            WallclockLagHistogramPeriod::from_epoch_millis(now_ms, self.config.config_set());
3514
3515        let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
3516            Some(ts) => (self.wallclock_lag)(ts.clone()),
3517            None => Duration::ZERO,
3518        };
3519
3520        for frontiers in self.storage_collections.active_collection_frontiers() {
3521            let id = frontiers.id;
3522            let Some(collection) = self.collections.get_mut(&id) else {
3523                continue;
3524            };
3525
3526            let lag = frontier_lag(&frontiers.write_frontier);
3527            collection.wallclock_lag_max = std::cmp::max(collection.wallclock_lag_max, lag);
3528            collection.wallclock_lag_metrics.observe(lag);
3529
3530            if !ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION.get(self.config.config_set()) {
3531                continue;
3532            }
3533
3534            if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
3535                let bucket = lag.as_secs().next_power_of_two();
3536
3537                let instance_id = match &collection.extra_state {
3538                    CollectionStateExtra::Ingestion(i) => Some(i.instance_id),
3539                    CollectionStateExtra::Export(e) => Some(e.cluster_id()),
3540                    CollectionStateExtra::None => None,
3541                };
3542                let workload_class = instance_id
3543                    .and_then(|id| self.instances.get(&id))
3544                    .and_then(|i| i.workload_class.clone());
3545                let labels = match workload_class {
3546                    Some(wc) => [("workload_class", wc.clone())].into(),
3547                    None => BTreeMap::new(),
3548                };
3549
3550                let key = (histogram_period, bucket, labels);
3551                *stash.entry(key).or_default() += Diff::ONE;
3552            }
3553        }
3554    }
3555
3556    /// Produce new wallclock lag introspection updates, provided enough time has passed since the
3557    /// last recording.
3558    ///
3559    /// We emit new introspection updates if the system time has passed into a new multiple of the
3560    /// recording interval (typically 1 minute) since the last refresh. The compute controller uses
3561    /// the same approach, ensuring that both controllers commit their lags at roughly the same
3562    /// time, avoiding confusion caused by inconsistencies.
3563    fn maybe_record_wallclock_lag(&mut self) {
3564        if self.read_only {
3565            return;
3566        }
3567
3568        let duration_trunc = |datetime: DateTime<_>, interval| {
3569            let td = TimeDelta::from_std(interval).ok()?;
3570            datetime.duration_trunc(td).ok()
3571        };
3572
3573        let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(self.config.config_set());
3574        let now_dt = mz_ore::now::to_datetime((self.now)());
3575        let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
3576            soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
3577            let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
3578            duration_trunc(now_dt, *default).unwrap()
3579        });
3580        if now_trunc <= self.wallclock_lag_last_recorded {
3581            return;
3582        }
3583
3584        let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
3585
3586        let mut history_updates = Vec::new();
3587        let mut histogram_updates = Vec::new();
3588        let mut row_buf = Row::default();
3589        for frontiers in self.storage_collections.active_collection_frontiers() {
3590            let id = frontiers.id;
3591            let Some(collection) = self.collections.get_mut(&id) else {
3592                continue;
3593            };
3594
3595            let max_lag = std::mem::take(&mut collection.wallclock_lag_max);
3596            let max_lag_us = i64::try_from(max_lag.as_micros()).expect("must fit");
3597            let row = Row::pack_slice(&[
3598                Datum::String(&id.to_string()),
3599                Datum::Null,
3600                Datum::Interval(Interval::new(0, 0, max_lag_us)),
3601                Datum::TimestampTz(now_ts),
3602            ]);
3603            history_updates.push((row, Diff::ONE));
3604
3605            let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
3606                continue;
3607            };
3608
3609            for ((period, lag, labels), count) in std::mem::take(stash) {
3610                let mut packer = row_buf.packer();
3611                packer.extend([
3612                    Datum::TimestampTz(period.start),
3613                    Datum::TimestampTz(period.end),
3614                    Datum::String(&id.to_string()),
3615                    Datum::UInt64(lag),
3616                ]);
3617                let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
3618                packer.push_dict(labels);
3619
3620                histogram_updates.push((row_buf.clone(), count));
3621            }
3622        }
3623
3624        if !history_updates.is_empty() {
3625            self.append_introspection_updates(
3626                IntrospectionType::WallclockLagHistory,
3627                history_updates,
3628            );
3629        }
3630        if !histogram_updates.is_empty() {
3631            self.append_introspection_updates(
3632                IntrospectionType::WallclockLagHistogram,
3633                histogram_updates,
3634            );
3635        }
3636
3637        self.wallclock_lag_last_recorded = now_trunc;
3638    }
3639
3640    /// Run periodic tasks.
3641    ///
3642    /// This method is invoked roughly once per second during normal operation. It is a good place
3643    /// for tasks that need to run periodically, such as state cleanup or updating of metrics.
3644    fn maintain(&mut self) {
3645        self.update_frontier_introspection();
3646        self.refresh_wallclock_lag();
3647
3648        // Perform instance maintenance work.
3649        for instance in self.instances.values_mut() {
3650            instance.refresh_state_metrics();
3651        }
3652    }
3653}
3654
3655impl From<&IntrospectionType> for CollectionManagerKind {
3656    fn from(value: &IntrospectionType) -> Self {
3657        match value {
3658            IntrospectionType::ShardMapping
3659            | IntrospectionType::Frontiers
3660            | IntrospectionType::ReplicaFrontiers
3661            | IntrospectionType::StorageSourceStatistics
3662            | IntrospectionType::StorageSinkStatistics
3663            | IntrospectionType::ComputeDependencies
3664            | IntrospectionType::ComputeOperatorHydrationStatus
3665            | IntrospectionType::ComputeMaterializedViewRefreshes
3666            | IntrospectionType::ComputeErrorCounts
3667            | IntrospectionType::ComputeHydrationTimes => CollectionManagerKind::Differential,
3668
3669            IntrospectionType::SourceStatusHistory
3670            | IntrospectionType::SinkStatusHistory
3671            | IntrospectionType::PrivatelinkConnectionStatusHistory
3672            | IntrospectionType::ReplicaStatusHistory
3673            | IntrospectionType::ReplicaMetricsHistory
3674            | IntrospectionType::WallclockLagHistory
3675            | IntrospectionType::WallclockLagHistogram
3676            | IntrospectionType::PreparedStatementHistory
3677            | IntrospectionType::StatementExecutionHistory
3678            | IntrospectionType::SessionHistory
3679            | IntrospectionType::StatementLifecycleHistory
3680            | IntrospectionType::SqlText => CollectionManagerKind::AppendOnly,
3681        }
3682    }
3683}
3684
3685/// Get the current rows in the given statistics table. This is used to bootstrap
3686/// the statistics tasks.
3687///
3688// TODO(guswynn): we need to be more careful about the update time we get here:
3689// <https://github.com/MaterializeInc/database-issues/issues/7564>
3690async fn snapshot_statistics<T>(
3691    id: GlobalId,
3692    upper: Antichain<T>,
3693    storage_collections: &Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
3694) -> Vec<Row>
3695where
3696    T: Codec64 + From<EpochMillis> + TimestampManipulation,
3697{
3698    match upper.as_option() {
3699        Some(f) if f > &T::minimum() => {
3700            let as_of = f.step_back().unwrap();
3701
3702            let snapshot = storage_collections.snapshot(id, as_of).await.unwrap();
3703            snapshot
3704                .into_iter()
3705                .map(|(row, diff)| {
3706                    assert_eq!(diff, 1);
3707                    row
3708                })
3709                .collect()
3710        }
3711        // If collection is closed or the frontier is the minimum, we cannot
3712        // or don't need to truncate (respectively).
3713        _ => Vec::new(),
3714    }
3715}
3716
3717async fn read_handle_for_snapshot<T>(
3718    persist: &PersistClientCache,
3719    id: GlobalId,
3720    metadata: &CollectionMetadata,
3721) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>>
3722where
3723    T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
3724{
3725    let persist_client = persist
3726        .open(metadata.persist_location.clone())
3727        .await
3728        .unwrap();
3729
3730    // We create a new read handle every time someone requests a snapshot and then immediately
3731    // expire it instead of keeping a read handle permanently in our state to avoid having it
3732    // heartbeat continously. The assumption is that calls to snapshot are rare and therefore
3733    // worth it to always create a new handle.
3734    let read_handle = persist_client
3735        .open_leased_reader::<SourceData, (), _, _>(
3736            metadata.data_shard,
3737            Arc::new(metadata.relation_desc.clone()),
3738            Arc::new(UnitSchema),
3739            Diagnostics {
3740                shard_name: id.to_string(),
3741                handle_purpose: format!("snapshot {}", id),
3742            },
3743            USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3744        )
3745        .await
3746        .expect("invalid persist usage");
3747    Ok(read_handle)
3748}
3749
3750/// State maintained about individual collections.
3751#[derive(Debug)]
3752struct CollectionState<T: TimelyTimestamp> {
3753    /// The source of this collection's data.
3754    pub data_source: DataSource<T>,
3755
3756    pub collection_metadata: CollectionMetadata,
3757
3758    pub extra_state: CollectionStateExtra<T>,
3759
3760    /// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update.
3761    wallclock_lag_max: Duration,
3762    /// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
3763    /// introspection update.
3764    ///
3765    /// Keys are `(period, lag, labels)` triples, values are counts.
3766    ///
3767    /// If this is `None`, wallclock lag is not tracked for this collection.
3768    wallclock_lag_histogram_stash: Option<
3769        BTreeMap<
3770            (
3771                WallclockLagHistogramPeriod,
3772                u64,
3773                BTreeMap<&'static str, String>,
3774            ),
3775            Diff,
3776        >,
3777    >,
3778    /// Frontier wallclock lag metrics tracked for this collection.
3779    wallclock_lag_metrics: WallclockLagMetrics,
3780}
3781
3782impl<T: TimelyTimestamp> CollectionState<T> {
3783    fn new(
3784        data_source: DataSource<T>,
3785        collection_metadata: CollectionMetadata,
3786        extra_state: CollectionStateExtra<T>,
3787        wallclock_lag_metrics: WallclockLagMetrics,
3788    ) -> Self {
3789        // Only collect wallclock lag histogram data for collections written by storage, to avoid
3790        // duplicate measurements. Collections written by other components (e.g. compute) have
3791        // their wallclock lags recorded by these components.
3792        let wallclock_lag_histogram_stash = match &data_source {
3793            DataSource::Other => None,
3794            _ => Some(Default::default()),
3795        };
3796
3797        Self {
3798            data_source,
3799            collection_metadata,
3800            extra_state,
3801            wallclock_lag_max: Default::default(),
3802            wallclock_lag_histogram_stash,
3803            wallclock_lag_metrics,
3804        }
3805    }
3806}
3807
3808/// Additional state that the controller maintains for select collection types.
3809#[derive(Debug)]
3810enum CollectionStateExtra<T: TimelyTimestamp> {
3811    Ingestion(IngestionState<T>),
3812    Export(ExportState<T>),
3813    None,
3814}
3815
3816/// State maintained about ingestions and ingestion exports
3817#[derive(Debug)]
3818struct IngestionState<T: TimelyTimestamp> {
3819    /// Really only for keeping track of changes to the `derived_since`.
3820    pub read_capabilities: MutableAntichain<T>,
3821
3822    /// The current since frontier, derived from `write_frontier` using
3823    /// `hold_policy`.
3824    pub derived_since: Antichain<T>,
3825
3826    /// Holds that this ingestion (or ingestion export) has on its dependencies.
3827    pub dependency_read_holds: Vec<ReadHold<T>>,
3828
3829    /// Reported write frontier.
3830    pub write_frontier: Antichain<T>,
3831
3832    /// The policy that drives how we downgrade our read hold. That is how we
3833    /// derive our since from our upper.
3834    ///
3835    /// This is a _storage-controller-internal_ policy used to derive its
3836    /// personal read hold on the collection. It should not be confused with any
3837    /// read policies that the adapter might install at [StorageCollections].
3838    pub hold_policy: ReadPolicy<T>,
3839
3840    /// The ID of the instance in which the ingestion is running.
3841    pub instance_id: StorageInstanceId,
3842
3843    /// Set of replica IDs on which this ingestion is hydrated.
3844    pub hydrated_on: BTreeSet<ReplicaId>,
3845}
3846
3847/// A description of a status history collection.
3848///
3849/// Used to inform partial truncation, see
3850/// [`collection_mgmt::partially_truncate_status_history`].
3851struct StatusHistoryDesc<K> {
3852    retention_policy: StatusHistoryRetentionPolicy,
3853    extract_key: Box<dyn Fn(&[Datum]) -> K + Send>,
3854    extract_time: Box<dyn Fn(&[Datum]) -> CheckedTimestamp<DateTime<Utc>> + Send>,
3855}
3856enum StatusHistoryRetentionPolicy {
3857    // Truncates everything but the last N updates for each key.
3858    LastN(usize),
3859    // Truncates everything past the time window for each key.
3860    TimeWindow(Duration),
3861}
3862
3863fn source_status_history_desc(
3864    params: &StorageParameters,
3865) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3866    let desc = &MZ_SOURCE_STATUS_HISTORY_DESC;
3867    let (source_id_idx, _) = desc.get_by_name(&"source_id".into()).expect("exists");
3868    let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3869    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3870
3871    StatusHistoryDesc {
3872        retention_policy: StatusHistoryRetentionPolicy::LastN(
3873            params.keep_n_source_status_history_entries,
3874        ),
3875        extract_key: Box::new(move |datums| {
3876            (
3877                GlobalId::from_str(datums[source_id_idx].unwrap_str()).expect("GlobalId column"),
3878                if datums[replica_id_idx].is_null() {
3879                    None
3880                } else {
3881                    Some(
3882                        ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3883                            .expect("ReplicaId column"),
3884                    )
3885                },
3886            )
3887        }),
3888        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3889    }
3890}
3891
3892fn sink_status_history_desc(
3893    params: &StorageParameters,
3894) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3895    let desc = &MZ_SINK_STATUS_HISTORY_DESC;
3896    let (sink_id_idx, _) = desc.get_by_name(&"sink_id".into()).expect("exists");
3897    let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3898    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3899
3900    StatusHistoryDesc {
3901        retention_policy: StatusHistoryRetentionPolicy::LastN(
3902            params.keep_n_sink_status_history_entries,
3903        ),
3904        extract_key: Box::new(move |datums| {
3905            (
3906                GlobalId::from_str(datums[sink_id_idx].unwrap_str()).expect("GlobalId column"),
3907                if datums[replica_id_idx].is_null() {
3908                    None
3909                } else {
3910                    Some(
3911                        ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3912                            .expect("ReplicaId column"),
3913                    )
3914                },
3915            )
3916        }),
3917        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3918    }
3919}
3920
3921fn privatelink_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<GlobalId> {
3922    let desc = &MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC;
3923    let (key_idx, _) = desc.get_by_name(&"connection_id".into()).expect("exists");
3924    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3925
3926    StatusHistoryDesc {
3927        retention_policy: StatusHistoryRetentionPolicy::LastN(
3928            params.keep_n_privatelink_status_history_entries,
3929        ),
3930        extract_key: Box::new(move |datums| {
3931            GlobalId::from_str(datums[key_idx].unwrap_str()).expect("GlobalId column")
3932        }),
3933        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3934    }
3935}
3936
3937fn replica_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<(GlobalId, u64)> {
3938    let desc = &REPLICA_STATUS_HISTORY_DESC;
3939    let (replica_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3940    let (process_idx, _) = desc.get_by_name(&"process_id".into()).expect("exists");
3941    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3942
3943    StatusHistoryDesc {
3944        retention_policy: StatusHistoryRetentionPolicy::TimeWindow(
3945            params.replica_status_history_retention_window,
3946        ),
3947        extract_key: Box::new(move |datums| {
3948            (
3949                GlobalId::from_str(datums[replica_idx].unwrap_str()).expect("GlobalId column"),
3950                datums[process_idx].unwrap_uint64(),
3951            )
3952        }),
3953        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3954    }
3955}
3956
3957/// Replace one antichain with another, tracking the overall changes in the returned `ChangeBatch`.
3958fn swap_updates<T: Timestamp>(
3959    from: &mut Antichain<T>,
3960    mut replace_with: Antichain<T>,
3961) -> ChangeBatch<T> {
3962    let mut update = ChangeBatch::new();
3963    if PartialOrder::less_equal(from, &replace_with) {
3964        update.extend(replace_with.iter().map(|time| (time.clone(), 1)));
3965        std::mem::swap(from, &mut replace_with);
3966        update.extend(replace_with.iter().map(|time| (time.clone(), -1)));
3967    }
3968    update
3969}