mz_storage_controller/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of the storage controller trait.
11
12use std::any::Any;
13use std::collections::btree_map;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt::{Debug, Display};
16use std::str::FromStr;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use async_trait::async_trait;
21use chrono::{DateTime, DurationRound, TimeDelta, Utc};
22use derivative::Derivative;
23use differential_dataflow::lattice::Lattice;
24use futures::FutureExt;
25use futures::StreamExt;
26use itertools::Itertools;
27use mz_build_info::BuildInfo;
28use mz_cluster_client::client::ClusterReplicaLocation;
29use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
30use mz_cluster_client::{ReplicaId, WallclockLagFn};
31use mz_controller_types::dyncfgs::{
32    ENABLE_0DT_DEPLOYMENT_SOURCES, ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION,
33    WALLCLOCK_LAG_RECORDING_INTERVAL,
34};
35use mz_ore::collections::CollectionExt;
36use mz_ore::metrics::MetricsRegistry;
37use mz_ore::now::{EpochMillis, NowFn};
38use mz_ore::task::AbortOnDropHandle;
39use mz_ore::{assert_none, halt, instrument, soft_panic_or_log};
40use mz_persist_client::batch::ProtoBatch;
41use mz_persist_client::cache::PersistClientCache;
42use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
43use mz_persist_client::read::ReadHandle;
44use mz_persist_client::schema::CaESchema;
45use mz_persist_client::write::WriteHandle;
46use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
47use mz_persist_types::Codec64;
48use mz_persist_types::codec_impls::UnitSchema;
49use mz_proto::RustType;
50use mz_repr::adt::timestamp::CheckedTimestamp;
51use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
52use mz_storage_client::client::{
53    AppendOnlyUpdate, ProtoStorageCommand, ProtoStorageResponse, RunIngestionCommand,
54    RunOneshotIngestion, RunSinkCommand, Status, StatusUpdate, StorageCommand, StorageResponse,
55    TableData,
56};
57use mz_storage_client::controller::{
58    BoxFuture, CollectionDescription, DataSource, ExportDescription, ExportState,
59    IntrospectionType, MonotonicAppender, PersistEpoch, Response, StorageController,
60    StorageMetadata, StorageTxn, StorageWriteOp, WallclockLag, WallclockLagHistogramPeriod,
61};
62use mz_storage_client::healthcheck::{
63    MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC,
64    MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_STATUS_HISTORY_DESC,
65};
66use mz_storage_client::metrics::StorageControllerMetrics;
67use mz_storage_client::statistics::{
68    ControllerSinkStatistics, ControllerSourceStatistics, WebhookStatistics,
69};
70use mz_storage_client::storage_collections::StorageCollections;
71use mz_storage_types::configuration::StorageConfiguration;
72use mz_storage_types::connections::ConnectionContext;
73use mz_storage_types::connections::inline::InlinedConnection;
74use mz_storage_types::controller::{AlterError, CollectionMetadata, StorageError, TxnsCodecRow};
75use mz_storage_types::instances::StorageInstanceId;
76use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
77use mz_storage_types::parameters::StorageParameters;
78use mz_storage_types::read_holds::ReadHold;
79use mz_storage_types::read_policy::ReadPolicy;
80use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
81use mz_storage_types::sources::{
82    GenericSourceConnection, IngestionDescription, SourceConnection, SourceData, SourceDesc,
83    SourceExport, SourceExportDataConfig,
84};
85use mz_storage_types::{AlterCompatible, StorageDiff, dyncfgs};
86use mz_txn_wal::metrics::Metrics as TxnMetrics;
87use mz_txn_wal::txn_read::TxnsRead;
88use mz_txn_wal::txns::TxnsHandle;
89use timely::order::{PartialOrder, TotalOrder};
90use timely::progress::Timestamp as TimelyTimestamp;
91use timely::progress::frontier::MutableAntichain;
92use timely::progress::{Antichain, ChangeBatch, Timestamp};
93use tokio::sync::watch::{Sender, channel};
94use tokio::sync::{mpsc, oneshot};
95use tokio::time::MissedTickBehavior;
96use tokio::time::error::Elapsed;
97use tracing::{debug, info, warn};
98
99use crate::collection_mgmt::{
100    AppendOnlyIntrospectionConfig, CollectionManagerKind, DifferentialIntrospectionConfig,
101};
102use crate::instance::{Instance, ReplicaConfig};
103
104mod collection_mgmt;
105mod history;
106mod instance;
107mod persist_handles;
108mod rtr;
109mod statistics;
110
111#[derive(Derivative)]
112#[derivative(Debug)]
113struct PendingOneshotIngestion {
114    /// Callback used to provide results of the ingestion.
115    #[derivative(Debug = "ignore")]
116    result_tx: OneshotResultCallback<ProtoBatch>,
117    /// Cluster currently running this ingestion
118    cluster_id: StorageInstanceId,
119}
120
121impl PendingOneshotIngestion {
122    /// Consume the pending ingestion, responding with a cancelation message.
123    ///
124    /// TODO(cf2): Refine these error messages so they're not stringly typed.
125    pub(crate) fn cancel(self) {
126        (self.result_tx)(vec![Err("canceled".to_string())])
127    }
128}
129
130/// A storage controller for a storage instance.
131#[derive(Derivative)]
132#[derivative(Debug)]
133pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation>
134{
135    /// The build information for this process.
136    build_info: &'static BuildInfo,
137    /// A function that returns the current time.
138    now: NowFn,
139
140    /// Whether or not this controller is in read-only mode.
141    ///
142    /// When in read-only mode, neither this controller nor the instances
143    /// controlled by it are allowed to affect changes to external systems
144    /// (largely persist).
145    read_only: bool,
146
147    /// Collections maintained by the storage controller.
148    ///
149    /// This collection only grows, although individual collections may be rendered unusable.
150    /// This is to prevent the re-binding of identifiers to other descriptions.
151    pub(crate) collections: BTreeMap<GlobalId, CollectionState<T>>,
152
153    /// Map from IDs of objects that have been dropped to replicas we are still
154    /// expecting DroppedId messages from. This is cleared out once all replicas
155    /// have responded.
156    ///
157    /// We use this only to catch problems in the protocol between controller
158    /// and replicas, for example we can differentiate between late messages for
159    /// objects that have already been dropped and unexpected (read erroneous)
160    /// messages from the replica.
161    dropped_objects: BTreeMap<GlobalId, BTreeSet<ReplicaId>>,
162
163    /// Write handle for table shards.
164    pub(crate) persist_table_worker: persist_handles::PersistTableWriteWorker<T>,
165    /// A shared TxnsCache running in a task and communicated with over a channel.
166    txns_read: TxnsRead<T>,
167    txns_metrics: Arc<TxnMetrics>,
168    stashed_responses: Vec<(Option<ReplicaId>, StorageResponse<T>)>,
169    /// Channel for sending table handle drops.
170    #[derivative(Debug = "ignore")]
171    pending_table_handle_drops_tx: mpsc::UnboundedSender<GlobalId>,
172    /// Channel for receiving table handle drops.
173    #[derivative(Debug = "ignore")]
174    pending_table_handle_drops_rx: mpsc::UnboundedReceiver<GlobalId>,
175    /// Closures that can be used to send responses from oneshot ingestions.
176    #[derivative(Debug = "ignore")]
177    pending_oneshot_ingestions: BTreeMap<uuid::Uuid, PendingOneshotIngestion>,
178
179    /// Interface for managed collections
180    pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
181
182    /// Tracks which collection is responsible for which [`IntrospectionType`].
183    pub(crate) introspection_ids: BTreeMap<IntrospectionType, GlobalId>,
184    /// Tokens for tasks that drive updating introspection collections. Dropping
185    /// this will make sure that any tasks (or other resources) will stop when
186    /// needed.
187    // TODO(aljoscha): Should these live somewhere else?
188    introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
189
190    // The following two fields must always be locked in order.
191    /// Consolidated metrics updates to periodically write. We do not eagerly initialize this,
192    /// and its contents are entirely driven by `StorageResponse::StatisticsUpdates`'s, as well
193    /// as webhook statistics.
194    source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
195    /// Consolidated metrics updates to periodically write. We do not eagerly initialize this,
196    /// and its contents are entirely driven by `StorageResponse::StatisticsUpdates`'s.
197    sink_statistics: Arc<Mutex<BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSinkStatistics>>>,
198    /// A way to update the statistics interval in the statistics tasks.
199    statistics_interval_sender: Sender<Duration>,
200
201    /// Clients for all known storage instances.
202    instances: BTreeMap<StorageInstanceId, Instance<T>>,
203    /// Set to `true` once `initialization_complete` has been called.
204    initialized: bool,
205    /// Storage configuration to apply to newly provisioned instances, and use during purification.
206    config: StorageConfiguration,
207    /// The persist location where all storage collections are being written to
208    persist_location: PersistLocation,
209    /// A persist client used to write to storage collections
210    persist: Arc<PersistClientCache>,
211    /// Metrics of the Storage controller
212    metrics: StorageControllerMetrics,
213    /// `(read, write)` frontiers that have been recorded in the `Frontiers` collection, kept to be
214    /// able to retract old rows.
215    recorded_frontiers: BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)>,
216    /// Write frontiers that have been recorded in the `ReplicaFrontiers` collection, kept to be
217    /// able to retract old rows.
218    recorded_replica_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<T>>,
219
220    /// A function that computes the lag between the given time and wallclock time.
221    #[derivative(Debug = "ignore")]
222    wallclock_lag: WallclockLagFn<T>,
223    /// The last time wallclock lag introspection was recorded.
224    wallclock_lag_last_recorded: DateTime<Utc>,
225
226    /// Handle to a [StorageCollections].
227    storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
228    /// Migrated storage collections that can be written even in read only mode.
229    migrated_storage_collections: BTreeSet<GlobalId>,
230
231    /// Ticker for scheduling periodic maintenance work.
232    maintenance_ticker: tokio::time::Interval,
233    /// Whether maintenance work was scheduled.
234    maintenance_scheduled: bool,
235
236    /// Shared transmit channel for replicas to send responses.
237    instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
238    /// Receive end for replica responses.
239    instance_response_rx: mpsc::UnboundedReceiver<(Option<ReplicaId>, StorageResponse<T>)>,
240
241    /// Background task run at startup to warm persist state.
242    persist_warm_task: Option<AbortOnDropHandle<Box<dyn Debug + Send>>>,
243}
244
245/// Warm up persist state for `shard_ids` in a background task.
246///
247/// With better parallelism during startup this would likely be unnecessary, but empirically we see
248/// some nice speedups with this relatively simple function.
249fn warm_persist_state_in_background(
250    client: PersistClient,
251    shard_ids: impl Iterator<Item = ShardId> + Send + 'static,
252) -> mz_ore::task::JoinHandle<Box<dyn Debug + Send>> {
253    /// Bound the number of shards that we warm at a single time, to limit our overall resource use.
254    const MAX_CONCURRENT_WARMS: usize = 16;
255    let logic = async move {
256        let fetchers: Vec<_> = tokio_stream::iter(shard_ids)
257            .map(|shard_id| {
258                let client = client.clone();
259                async move {
260                    client
261                        .create_batch_fetcher::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
262                            shard_id,
263                            Arc::new(RelationDesc::empty()),
264                            Arc::new(UnitSchema),
265                            true,
266                            Diagnostics::from_purpose("warm persist load state"),
267                        )
268                        .await
269                }
270            })
271            .buffer_unordered(MAX_CONCURRENT_WARMS)
272            .collect()
273            .await;
274        let fetchers: Box<dyn Debug + Send> = Box::new(fetchers);
275        fetchers
276    };
277    mz_ore::task::spawn(|| "warm_persist_load_state", logic)
278}
279
280#[async_trait(?Send)]
281impl<T> StorageController for Controller<T>
282where
283    T: Timestamp
284        + Lattice
285        + TotalOrder
286        + Codec64
287        + From<EpochMillis>
288        + TimestampManipulation
289        + Into<Datum<'static>>
290        + Display,
291    StorageCommand<T>: RustType<ProtoStorageCommand>,
292    StorageResponse<T>: RustType<ProtoStorageResponse>,
293{
294    type Timestamp = T;
295
296    fn initialization_complete(&mut self) {
297        self.reconcile_dangling_statistics();
298        self.initialized = true;
299
300        for instance in self.instances.values_mut() {
301            instance.send(StorageCommand::InitializationComplete);
302        }
303    }
304
305    fn update_parameters(&mut self, config_params: StorageParameters) {
306        self.storage_collections
307            .update_parameters(config_params.clone());
308
309        // We serialize the dyncfg updates in StorageParameters, but configure
310        // persist separately.
311        self.persist.cfg().apply_from(&config_params.dyncfg_updates);
312
313        for instance in self.instances.values_mut() {
314            let params = Box::new(config_params.clone());
315            instance.send(StorageCommand::UpdateConfiguration(params));
316        }
317        self.config.update(config_params);
318        self.statistics_interval_sender
319            .send_replace(self.config.parameters.statistics_interval);
320        self.collection_manager.update_user_batch_duration(
321            self.config
322                .parameters
323                .user_storage_managed_collections_batch_duration,
324        );
325    }
326
327    /// Get the current configuration
328    fn config(&self) -> &StorageConfiguration {
329        &self.config
330    }
331
332    fn collection_metadata(
333        &self,
334        id: GlobalId,
335    ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>> {
336        self.storage_collections.collection_metadata(id)
337    }
338
339    fn collection_hydrated(
340        &self,
341        collection_id: GlobalId,
342    ) -> Result<bool, StorageError<Self::Timestamp>> {
343        let collection = self.collection(collection_id)?;
344
345        let instance_id = match &collection.data_source {
346            DataSource::Ingestion(ingestion_description) => ingestion_description.instance_id,
347            DataSource::IngestionExport { ingestion_id, .. } => {
348                let ingestion_state = self.collections.get(ingestion_id).expect("known to exist");
349
350                let instance_id = match &ingestion_state.data_source {
351                    DataSource::Ingestion(ingestion_desc) => ingestion_desc.instance_id,
352                    _ => unreachable!("SourceExport must only refer to primary source"),
353                };
354
355                instance_id
356            }
357            _ => return Ok(true),
358        };
359
360        let instance = self.instances.get(&instance_id).ok_or_else(|| {
361            StorageError::IngestionInstanceMissing {
362                storage_instance_id: instance_id,
363                ingestion_id: collection_id,
364            }
365        })?;
366
367        if instance.replica_ids().next().is_none() {
368            // Ingestions on zero-replica clusters are always considered
369            // hydrated.
370            return Ok(true);
371        }
372
373        match &collection.extra_state {
374            CollectionStateExtra::Ingestion(ingestion_state) => {
375                // An ingestion is hydrated if it is hydrated on at least one replica.
376                Ok(ingestion_state.hydrated_on.len() >= 1)
377            }
378            CollectionStateExtra::Export(_) => {
379                // For now, sinks are always considered hydrated. We rely on
380                // them starting up "instantly" and don't wait for them when
381                // checking hydration status of a replica.  TODO(sinks): base
382                // this off of the sink shard's frontier?
383                Ok(true)
384            }
385            CollectionStateExtra::None => {
386                // For now, objects that are not ingestions are always
387                // considered hydrated. This is tables and webhooks, as of
388                // today.
389                Ok(true)
390            }
391        }
392    }
393
394    #[mz_ore::instrument(level = "debug")]
395    fn collections_hydrated_on_replicas(
396        &self,
397        target_replica_ids: Option<Vec<ReplicaId>>,
398        target_cluster_id: &StorageInstanceId,
399        exclude_collections: &BTreeSet<GlobalId>,
400    ) -> Result<bool, StorageError<Self::Timestamp>> {
401        // If an empty set of replicas is provided there can be
402        // no collections on them, we'll count this as hydrated.
403        if target_replica_ids.as_ref().is_some_and(|v| v.is_empty()) {
404            return Ok(true);
405        }
406
407        // If target_replica_ids is provided, use it as the set of target
408        // replicas. Otherwise check for hydration on any replica.
409        let target_replicas: Option<BTreeSet<ReplicaId>> =
410            target_replica_ids.map(|ids| ids.into_iter().collect());
411
412        let mut all_hydrated = true;
413        for (collection_id, collection_state) in self.collections.iter() {
414            if collection_id.is_transient() || exclude_collections.contains(collection_id) {
415                continue;
416            }
417            let hydrated = match &collection_state.extra_state {
418                CollectionStateExtra::Ingestion(state) => {
419                    if &state.instance_id != target_cluster_id {
420                        continue;
421                    }
422                    match &target_replicas {
423                        Some(target_replicas) => !state.hydrated_on.is_disjoint(target_replicas),
424                        None => {
425                            // Not target replicas, so check that it's hydrated
426                            // on at least one replica.
427                            state.hydrated_on.len() >= 1
428                        }
429                    }
430                }
431                CollectionStateExtra::Export(_) => {
432                    // For now, sinks are always considered hydrated. We rely on
433                    // them starting up "instantly" and don't wait for them when
434                    // checking hydration status of a replica.  TODO(sinks):
435                    // base this off of the sink shard's frontier?
436                    true
437                }
438                CollectionStateExtra::None => {
439                    // For now, objects that are not ingestions are always
440                    // considered hydrated. This is tables and webhooks, as of
441                    // today.
442                    true
443                }
444            };
445            if !hydrated {
446                tracing::info!(%collection_id, "collection is not hydrated on any replica");
447                all_hydrated = false;
448                // We continue with our loop instead of breaking out early, so
449                // that we log all non-hydrated replicas.
450            }
451        }
452        Ok(all_hydrated)
453    }
454
455    fn collection_frontiers(
456        &self,
457        id: GlobalId,
458    ) -> Result<
459        (Antichain<Self::Timestamp>, Antichain<Self::Timestamp>),
460        StorageError<Self::Timestamp>,
461    > {
462        let frontiers = self.storage_collections.collection_frontiers(id)?;
463        Ok((frontiers.implied_capability, frontiers.write_frontier))
464    }
465
466    fn collections_frontiers(
467        &self,
468        mut ids: Vec<GlobalId>,
469    ) -> Result<Vec<(GlobalId, Antichain<T>, Antichain<T>)>, StorageError<Self::Timestamp>> {
470        let mut result = vec![];
471        // In theory, we could pull all our frontiers from storage collections...
472        // but in practice those frontiers may not be identical. For historical reasons, we use the
473        // locally-tracked frontier for sinks but the storage-collections-maintained frontier for
474        // sources.
475        ids.retain(|&id| match self.export(id) {
476            Ok(export) => {
477                result.push((
478                    id,
479                    export.input_hold().since().clone(),
480                    export.write_frontier.clone(),
481                ));
482                false
483            }
484            Err(_) => true,
485        });
486        result.extend(
487            self.storage_collections
488                .collections_frontiers(ids)?
489                .into_iter()
490                .map(|frontiers| {
491                    (
492                        frontiers.id,
493                        frontiers.implied_capability,
494                        frontiers.write_frontier,
495                    )
496                }),
497        );
498
499        Ok(result)
500    }
501
502    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
503        self.storage_collections.active_collection_metadatas()
504    }
505
506    fn active_ingestions(
507        &self,
508        instance_id: StorageInstanceId,
509    ) -> Box<dyn Iterator<Item = &GlobalId> + '_> {
510        Box::new(self.instances[&instance_id].active_ingestions())
511    }
512
513    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
514        self.storage_collections.check_exists(id)
515    }
516
517    fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>) {
518        let metrics = self.metrics.for_instance(id);
519        let mut instance = Instance::new(
520            workload_class,
521            metrics,
522            Arc::clone(self.config().config_set()),
523            self.now.clone(),
524            self.instance_response_tx.clone(),
525        );
526        if self.initialized {
527            instance.send(StorageCommand::InitializationComplete);
528        }
529        if !self.read_only {
530            instance.send(StorageCommand::AllowWrites);
531        }
532
533        let params = Box::new(self.config.parameters.clone());
534        instance.send(StorageCommand::UpdateConfiguration(params));
535
536        let old_instance = self.instances.insert(id, instance);
537        assert_none!(old_instance, "storage instance {id} already exists");
538    }
539
540    fn drop_instance(&mut self, id: StorageInstanceId) {
541        let instance = self.instances.remove(&id);
542        assert!(instance.is_some(), "storage instance {id} does not exist");
543    }
544
545    fn update_instance_workload_class(
546        &mut self,
547        id: StorageInstanceId,
548        workload_class: Option<String>,
549    ) {
550        let instance = self
551            .instances
552            .get_mut(&id)
553            .unwrap_or_else(|| panic!("instance {id} does not exist"));
554
555        instance.workload_class = workload_class;
556    }
557
558    fn connect_replica(
559        &mut self,
560        instance_id: StorageInstanceId,
561        replica_id: ReplicaId,
562        location: ClusterReplicaLocation,
563        enable_ctp: bool,
564    ) {
565        let instance = self
566            .instances
567            .get_mut(&instance_id)
568            .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
569
570        let config = ReplicaConfig {
571            build_info: self.build_info,
572            location,
573            grpc_client: self.config.parameters.grpc_client.clone(),
574            enable_ctp,
575        };
576        instance.add_replica(replica_id, config);
577    }
578
579    fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId) {
580        let instance = self
581            .instances
582            .get_mut(&instance_id)
583            .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
584
585        let status_now = mz_ore::now::to_datetime((self.now)());
586        let mut source_status_updates = vec![];
587        let mut sink_status_updates = vec![];
588
589        let make_update = |id, object_type| StatusUpdate {
590            id,
591            status: Status::Paused,
592            timestamp: status_now,
593            error: None,
594            hints: BTreeSet::from([format!(
595                "The replica running this {object_type} has been dropped"
596            )]),
597            namespaced_errors: Default::default(),
598            replica_id: Some(replica_id),
599        };
600
601        for id in instance.active_ingestions() {
602            if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
603                active_replicas.remove(&replica_id);
604                if active_replicas.is_empty() {
605                    self.dropped_objects.remove(id);
606                }
607            }
608
609            let ingestion = self
610                .collections
611                .get_mut(id)
612                .expect("instance contains unknown ingestion");
613
614            let ingestion_description = match &ingestion.data_source {
615                DataSource::Ingestion(ingestion_description) => ingestion_description.clone(),
616                _ => panic!(
617                    "unexpected data source for ingestion: {:?}",
618                    ingestion.data_source
619                ),
620            };
621
622            // NOTE(aljoscha): We filter out the remap collection because we
623            // don't get any status updates about it from the replica side. So
624            // we don't want to synthesize a 'paused' status here.
625            //
626            // TODO(aljoscha): I think we want to fix this eventually, and make
627            // sure we get status updates for the remap shard as well. Currently
628            // its handling in the source status collection is a bit difficult
629            // because we don't have updates for it in the status history
630            // collection.
631            let subsource_ids = ingestion_description
632                .collection_ids()
633                .filter(|id| id != &ingestion_description.remap_collection_id);
634            for id in subsource_ids {
635                source_status_updates.push(make_update(id, "source"));
636            }
637        }
638
639        for id in instance.active_exports() {
640            if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
641                active_replicas.remove(&replica_id);
642                if active_replicas.is_empty() {
643                    self.dropped_objects.remove(id);
644                }
645            }
646
647            sink_status_updates.push(make_update(*id, "sink"));
648        }
649
650        instance.drop_replica(replica_id);
651
652        if !self.read_only {
653            if !source_status_updates.is_empty() {
654                self.append_status_introspection_updates(
655                    IntrospectionType::SourceStatusHistory,
656                    source_status_updates,
657                );
658            }
659            if !sink_status_updates.is_empty() {
660                self.append_status_introspection_updates(
661                    IntrospectionType::SinkStatusHistory,
662                    sink_status_updates,
663                );
664            }
665        }
666    }
667
668    async fn evolve_nullability_for_bootstrap(
669        &mut self,
670        storage_metadata: &StorageMetadata,
671        collections: Vec<(GlobalId, RelationDesc)>,
672    ) -> Result<(), StorageError<Self::Timestamp>> {
673        let persist_client = self
674            .persist
675            .open(self.persist_location.clone())
676            .await
677            .unwrap();
678
679        for (global_id, relation_desc) in collections {
680            let shard_id = storage_metadata.get_collection_shard(global_id)?;
681            let diagnostics = Diagnostics {
682                shard_name: global_id.to_string(),
683                handle_purpose: "evolve nullability for bootstrap".to_string(),
684            };
685            let latest_schema = persist_client
686                .latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
687                .await
688                .expect("invalid persist usage");
689            let Some((schema_id, current_schema, _)) = latest_schema else {
690                tracing::debug!(?global_id, "no schema registered");
691                continue;
692            };
693            tracing::debug!(?global_id, ?current_schema, new_schema = ?relation_desc, "migrating schema");
694
695            let diagnostics = Diagnostics {
696                shard_name: global_id.to_string(),
697                handle_purpose: "evolve nullability for bootstrap".to_string(),
698            };
699            let evolve_result = persist_client
700                .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
701                    shard_id,
702                    schema_id,
703                    &relation_desc,
704                    &UnitSchema,
705                    diagnostics,
706                )
707                .await
708                .expect("invalid persist usage");
709            match evolve_result {
710                CaESchema::Ok(_) => (),
711                CaESchema::ExpectedMismatch {
712                    schema_id,
713                    key,
714                    val: _,
715                } => {
716                    return Err(StorageError::PersistSchemaEvolveRace {
717                        global_id,
718                        shard_id,
719                        schema_id,
720                        relation_desc: key,
721                    });
722                }
723                CaESchema::Incompatible => {
724                    return Err(StorageError::PersistInvalidSchemaEvolve {
725                        global_id,
726                        shard_id,
727                    });
728                }
729            };
730        }
731
732        Ok(())
733    }
734
735    /// Create and "execute" the described collection.
736    ///
737    /// "Execute" is in scare quotes because what executing a collection means
738    /// varies widely based on the type of collection you're creating.
739    ///
740    /// The general process creating a collection undergoes is:
741    /// 1. Enrich the description we get from the user with the metadata only
742    ///    the storage controller's metadata. This is mostly a matter of
743    ///    separating concerns.
744    /// 2. Generate write and read persist handles for the collection.
745    /// 3. Store the collection's metadata in the appropriate field.
746    /// 4. "Execute" the collection. What that means is contingent on the type of
747    ///    collection. so consult the code for more details.
748    ///
749    // TODO(aljoscha): It would be swell if we could refactor this Leviathan of
750    // a method/move individual parts to their own methods. @guswynn observes
751    // that a number of these operations could be moved into fns on
752    // `DataSource`.
753    #[instrument(name = "storage::create_collections")]
754    async fn create_collections_for_bootstrap(
755        &mut self,
756        storage_metadata: &StorageMetadata,
757        register_ts: Option<Self::Timestamp>,
758        mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
759        migrated_storage_collections: &BTreeSet<GlobalId>,
760    ) -> Result<(), StorageError<Self::Timestamp>> {
761        self.migrated_storage_collections
762            .extend(migrated_storage_collections.iter().cloned());
763
764        self.storage_collections
765            .create_collections_for_bootstrap(
766                storage_metadata,
767                register_ts.clone(),
768                collections.clone(),
769                migrated_storage_collections,
770            )
771            .await?;
772
773        // At this point we're connected to all the collection shards in persist. Our warming task
774        // is no longer useful, so abort it if it's still running.
775        drop(self.persist_warm_task.take());
776
777        // Validate first, to avoid corrupting state.
778        // 1. create a dropped identifier, or
779        // 2. create an existing identifier with a new description.
780        // Make sure to check for errors within `ingestions` as well.
781        collections.sort_by_key(|(id, _)| *id);
782        collections.dedup();
783        for pos in 1..collections.len() {
784            if collections[pos - 1].0 == collections[pos].0 {
785                return Err(StorageError::CollectionIdReused(collections[pos].0));
786            }
787        }
788
789        // We first enrich each collection description with some additional metadata...
790        let enriched_with_metadata = collections
791            .into_iter()
792            .map(|(id, description)| {
793                let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
794
795                let get_shard = |id| -> Result<ShardId, StorageError<T>> {
796                    let shard = storage_metadata.get_collection_shard::<T>(id)?;
797                    Ok(shard)
798                };
799
800                let remap_shard = match &description.data_source {
801                    // Only ingestions can have remap shards.
802                    DataSource::Ingestion(IngestionDescription {
803                        remap_collection_id,
804                        ..
805                    }) => {
806                        // Iff ingestion has a remap collection, its metadata must
807                        // exist (and be correct) by this point.
808                        Some(get_shard(*remap_collection_id)?)
809                    }
810                    _ => None,
811                };
812
813                // If the shard is being managed by txn-wal (initially, tables), then we need to
814                // pass along the shard id for the txns shard to dataflow rendering.
815                let txns_shard = description
816                    .data_source
817                    .in_txns()
818                    .then(|| *self.txns_read.txns_id());
819
820                let metadata = CollectionMetadata {
821                    persist_location: self.persist_location.clone(),
822                    remap_shard,
823                    data_shard,
824                    relation_desc: description.desc.clone(),
825                    txns_shard,
826                };
827
828                Ok((id, description, metadata))
829            })
830            .collect_vec();
831
832        // So that we can open persist handles for each collections concurrently.
833        let persist_client = self
834            .persist
835            .open(self.persist_location.clone())
836            .await
837            .unwrap();
838        let persist_client = &persist_client;
839
840        // Reborrow the `&mut self` as immutable, as all the concurrent work to be processed in
841        // this stream cannot all have exclusive access.
842        use futures::stream::{StreamExt, TryStreamExt};
843        let this = &*self;
844        let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
845            .map(|data: Result<_, StorageError<Self::Timestamp>>| {
846                async move {
847                    let (id, description, metadata) = data?;
848
849                    // should be replaced with real introspection (https://github.com/MaterializeInc/database-issues/issues/4078)
850                    // but for now, it's helpful to have this mapping written down somewhere
851                    debug!(
852                        "mapping GlobalId={} to remap shard ({:?}), data shard ({})",
853                        id, metadata.remap_shard, metadata.data_shard
854                    );
855
856                    let write = this
857                        .open_data_handles(
858                            &id,
859                            metadata.data_shard,
860                            metadata.relation_desc.clone(),
861                            persist_client,
862                        )
863                        .await;
864
865                    Ok::<_, StorageError<T>>((id, description, write, metadata))
866                }
867            })
868            // Poll each future for each collection concurrently, maximum of 50 at a time.
869            .buffer_unordered(50)
870            // HERE BE DRAGONS:
871            //
872            // There are at least 2 subtleties in using `FuturesUnordered` (which
873            // `buffer_unordered` uses underneath:
874            // - One is captured here <https://github.com/rust-lang/futures-rs/issues/2387>
875            // - And the other is deadlocking if processing an OUTPUT of a `FuturesUnordered`
876            // stream attempts to obtain an async mutex that is also obtained in the futures
877            // being polled.
878            //
879            // Both of these could potentially be issues in all usages of `buffer_unordered` in
880            // this method, so we stick the standard advice: only use `try_collect` or
881            // `collect`!
882            .try_collect()
883            .await?;
884
885        // The set of collections that we should render at the end of this
886        // function.
887        let mut to_execute = BTreeSet::new();
888        // New collections that are being created; this is distinct from the set
889        // of collections we plan to execute because
890        // `DataSource::IngestionExport` is added as a new collection, but is
891        // not executed directly.
892        let mut new_collections = BTreeSet::new();
893        let mut table_registers = Vec::with_capacity(to_register.len());
894
895        // Reorder in dependency order.
896        to_register.sort_by_key(|(id, ..)| *id);
897
898        // Register tables first, but register them in reverse order since earlier tables
899        // can depend on later tables.
900        //
901        // Note: We could do more complex sorting to avoid the allocations, but IMO it's
902        // easier to reason about it this way.
903        let (tables_to_register, collections_to_register): (Vec<_>, Vec<_>) = to_register
904            .into_iter()
905            .partition(|(_id, desc, ..)| matches!(desc.data_source, DataSource::Table { .. }));
906        let to_register = tables_to_register
907            .into_iter()
908            .rev()
909            .chain(collections_to_register.into_iter());
910
911        // Statistics need a level of indirection so we can mutably borrow
912        // `self` when registering collections and when we are inserting
913        // statistics.
914        let mut new_source_statistic_entries = BTreeSet::new();
915        let mut new_webhook_statistic_entries = BTreeSet::new();
916        let mut new_sink_statistic_entries = BTreeSet::new();
917
918        for (id, description, write, metadata) in to_register {
919            let is_in_txns = |id, metadata: &CollectionMetadata| {
920                metadata.txns_shard.is_some()
921                    && !(self.read_only && migrated_storage_collections.contains(&id))
922            };
923
924            let mut data_source = description.data_source;
925
926            to_execute.insert(id);
927            new_collections.insert(id);
928
929            // Ensure that the ingestion has an export for its primary source if applicable.
930            // This is done in an awkward spot to appease the borrow checker.
931            // TODO(database-issues#8620): This will be removed once sources no longer export
932            // to primary collections and only export to explicit SourceExports (tables).
933            if let DataSource::Ingestion(ingestion) = &mut data_source {
934                let export = ingestion.desc.primary_source_export();
935                ingestion.source_exports.insert(id, export);
936            }
937
938            let write_frontier = write.upper();
939
940            // Determine if this collection has another dependency.
941            let storage_dependencies = self.determine_collection_dependencies(id, &data_source)?;
942
943            let dependency_read_holds = self
944                .storage_collections
945                .acquire_read_holds(storage_dependencies)
946                .expect("can acquire read holds");
947
948            let mut dependency_since = Antichain::from_elem(T::minimum());
949            for read_hold in dependency_read_holds.iter() {
950                dependency_since.join_assign(read_hold.since());
951            }
952
953            // Assert some invariants.
954            //
955            // TODO(alter_table): Include Tables (is_in_txns) in this check. After
956            // supporting ALTER TABLE, it's now possible for a table to have a dependency
957            // and thus run this check. But tables are managed by txn-wal and thus the
958            // upper of the shard's `write_handle` generally isn't the logical upper of
959            // the shard. Instead we need to thread through the upper of the `txn` shard
960            // here so we can check this invariant.
961            if !dependency_read_holds.is_empty()
962                && !is_in_txns(id, &metadata)
963                && !matches!(&data_source, DataSource::Sink { .. })
964            {
965                // As the halt message says, this can happen when trying to come
966                // up in read-only mode and the current read-write
967                // environmentd/controller deletes a collection. We exit
968                // gracefully, which means we'll get restarted and get to try
969                // again.
970                if dependency_since.is_empty() {
971                    halt!(
972                        "dependency since frontier is empty while dependent upper \
973                        is not empty (dependent id={id}, write_frontier={:?}, dependency_read_holds={:?}), \
974                        this indicates concurrent deletion of a collection",
975                        write_frontier,
976                        dependency_read_holds,
977                    );
978                }
979
980                // The dependency since cannot be beyond the dependent (our)
981                // upper unless the collection is new. In practice, the
982                // depdenency is the remap shard of a source (export), and if
983                // the since is allowed to "catch up" to the upper, that is
984                // `upper <= since`, a restarting ingestion cannot differentiate
985                // between updates that have already been written out to the
986                // backing persist shard and updates that have yet to be
987                // written. We would write duplicate updates.
988                //
989                // If this check fails, it means that the read hold installed on
990                // the dependency was probably not upheld –– if it were, the
991                // dependency's since could not have advanced as far the
992                // dependent's upper.
993                //
994                // We don't care about the dependency since when the write
995                // frontier is empty. In that case, no-one can write down any
996                // more updates.
997                mz_ore::soft_assert_or_log!(
998                    write_frontier.elements() == &[T::minimum()]
999                        || write_frontier.is_empty()
1000                        || PartialOrder::less_than(&dependency_since, write_frontier),
1001                    "dependency since has advanced past dependent ({id}) upper \n
1002                            dependent ({id}): upper {:?} \n
1003                            dependency since {:?} \n
1004                            dependency read holds: {:?}",
1005                    write_frontier,
1006                    dependency_since,
1007                    dependency_read_holds,
1008                );
1009            }
1010
1011            // Perform data source-specific setup.
1012            let mut extra_state = CollectionStateExtra::None;
1013            let mut maybe_instance_id = None;
1014            match &data_source {
1015                DataSource::Introspection(typ) => {
1016                    debug!(
1017                        ?data_source, meta = ?metadata,
1018                        "registering {id} with persist monotonic worker",
1019                    );
1020                    // We always register the collection with the collection manager,
1021                    // regardless of read-only mode. The CollectionManager itself is
1022                    // aware of read-only mode and will not attempt to write before told
1023                    // to do so.
1024                    //
1025                    self.register_introspection_collection(
1026                        id,
1027                        *typ,
1028                        write,
1029                        persist_client.clone(),
1030                    )?;
1031                }
1032                DataSource::Webhook => {
1033                    debug!(
1034                        ?data_source, meta = ?metadata,
1035                        "registering {id} with persist monotonic worker",
1036                    );
1037                    new_source_statistic_entries.insert(id);
1038                    // This collection of statistics is periodically aggregated into
1039                    // `source_statistics`.
1040                    new_webhook_statistic_entries.insert(id);
1041                    // Register the collection so our manager knows about it.
1042                    //
1043                    // NOTE: Maybe this shouldn't be in the collection manager,
1044                    // and collection manager should only be responsible for
1045                    // built-in introspection collections?
1046                    self.collection_manager
1047                        .register_append_only_collection(id, write, false, None);
1048                }
1049                DataSource::IngestionExport {
1050                    ingestion_id,
1051                    details,
1052                    data_config,
1053                } => {
1054                    debug!(
1055                        ?data_source, meta = ?metadata,
1056                        "not registering {id} with a controller persist worker",
1057                    );
1058                    // Adjust the source to contain this export.
1059                    let ingestion_state = self
1060                        .collections
1061                        .get_mut(ingestion_id)
1062                        .expect("known to exist");
1063
1064                    let instance_id = match &mut ingestion_state.data_source {
1065                        DataSource::Ingestion(ingestion_desc) => {
1066                            ingestion_desc.source_exports.insert(
1067                                id,
1068                                SourceExport {
1069                                    storage_metadata: (),
1070                                    details: details.clone(),
1071                                    data_config: data_config.clone(),
1072                                },
1073                            );
1074
1075                            // Record the ingestion's cluster ID for the
1076                            // ingestion export. This way we always have a
1077                            // record of it, even if the ingestion's collection
1078                            // description disappears.
1079                            ingestion_desc.instance_id
1080                        }
1081                        _ => unreachable!(
1082                            "SourceExport must only refer to primary sources that already exist"
1083                        ),
1084                    };
1085
1086                    // Executing the source export doesn't do anything, ensure we execute the source instead.
1087                    to_execute.remove(&id);
1088                    to_execute.insert(*ingestion_id);
1089
1090                    let ingestion_state = IngestionState {
1091                        read_capabilities: MutableAntichain::from(dependency_since.clone()),
1092                        dependency_read_holds,
1093                        derived_since: dependency_since,
1094                        write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1095                        hold_policy: ReadPolicy::step_back(),
1096                        instance_id,
1097                        hydrated_on: BTreeSet::new(),
1098                    };
1099
1100                    extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1101                    maybe_instance_id = Some(instance_id);
1102
1103                    new_source_statistic_entries.insert(id);
1104                }
1105                DataSource::Table { .. } => {
1106                    debug!(
1107                        ?data_source, meta = ?metadata,
1108                        "registering {id} with persist table worker",
1109                    );
1110                    table_registers.push((id, write));
1111                }
1112                DataSource::Progress | DataSource::Other => {
1113                    debug!(
1114                        ?data_source, meta = ?metadata,
1115                        "not registering {id} with a controller persist worker",
1116                    );
1117                }
1118                DataSource::Ingestion(ingestion_desc) => {
1119                    debug!(
1120                        ?data_source, meta = ?metadata,
1121                        "not registering {id} with a controller persist worker",
1122                    );
1123
1124                    let mut dependency_since = Antichain::from_elem(T::minimum());
1125                    for read_hold in dependency_read_holds.iter() {
1126                        dependency_since.join_assign(read_hold.since());
1127                    }
1128
1129                    let ingestion_state = IngestionState {
1130                        read_capabilities: MutableAntichain::from(dependency_since.clone()),
1131                        dependency_read_holds,
1132                        derived_since: dependency_since,
1133                        write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1134                        hold_policy: ReadPolicy::step_back(),
1135                        instance_id: ingestion_desc.instance_id,
1136                        hydrated_on: BTreeSet::new(),
1137                    };
1138
1139                    extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1140                    maybe_instance_id = Some(ingestion_desc.instance_id);
1141
1142                    new_source_statistic_entries.insert(id);
1143                }
1144                DataSource::Sink { desc } => {
1145                    let mut dependency_since = Antichain::from_elem(T::minimum());
1146                    for read_hold in dependency_read_holds.iter() {
1147                        dependency_since.join_assign(read_hold.since());
1148                    }
1149
1150                    let [self_hold, read_hold] =
1151                        dependency_read_holds.try_into().expect("two holds");
1152
1153                    let state = ExportState::new(
1154                        desc.instance_id,
1155                        read_hold,
1156                        self_hold,
1157                        write_frontier.clone(),
1158                        ReadPolicy::step_back(),
1159                    );
1160                    maybe_instance_id = Some(state.cluster_id);
1161                    extra_state = CollectionStateExtra::Export(state);
1162
1163                    new_sink_statistic_entries.insert(id);
1164                }
1165            }
1166
1167            let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(id, maybe_instance_id);
1168            let collection_state =
1169                CollectionState::new(data_source, metadata, extra_state, wallclock_lag_metrics);
1170
1171            self.collections.insert(id, collection_state);
1172        }
1173
1174        {
1175            let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1176
1177            // Webhooks don't run on clusters/replicas, so we initialize their
1178            // statistics collection here.
1179            for id in new_webhook_statistic_entries {
1180                source_statistics.webhook_statistics.entry(id).or_default();
1181            }
1182
1183            // Sources and sinks only have statistics in the collection when
1184            // there is a replica that is reporting them. No need to initialize
1185            // here.
1186        }
1187
1188        // Register the tables all in one batch.
1189        if !table_registers.is_empty() {
1190            let register_ts = register_ts
1191                .expect("caller should have provided a register_ts when creating a table");
1192
1193            if self.read_only {
1194                // In read-only mode, we use a special read-only table worker
1195                // that allows writing to migrated tables and will continually
1196                // bump their shard upper so that it tracks the txn shard upper.
1197                // We do this, so that they remain readable at a recent
1198                // timestamp, which in turn allows dataflows that depend on them
1199                // to (re-)hydrate.
1200                //
1201                // We only want to register migrated tables, though, and leave
1202                // existing tables out/never write to them in read-only mode.
1203                table_registers
1204                    .retain(|(id, _write_handle)| migrated_storage_collections.contains(id));
1205
1206                self.persist_table_worker
1207                    .register(register_ts, table_registers)
1208                    .await
1209                    .expect("table worker unexpectedly shut down");
1210            } else {
1211                self.persist_table_worker
1212                    .register(register_ts, table_registers)
1213                    .await
1214                    .expect("table worker unexpectedly shut down");
1215            }
1216        }
1217
1218        self.append_shard_mappings(new_collections.into_iter(), Diff::ONE);
1219
1220        // TODO(guswynn): perform the io in this final section concurrently.
1221        for id in to_execute {
1222            match &self.collection(id)?.data_source {
1223                DataSource::Ingestion(ingestion) => {
1224                    if !self.read_only
1225                        || (ENABLE_0DT_DEPLOYMENT_SOURCES.get(self.config.config_set())
1226                            && ingestion.desc.connection.supports_read_only())
1227                    {
1228                        self.run_ingestion(id)?;
1229                    }
1230                }
1231                DataSource::IngestionExport { .. } => unreachable!(
1232                    "ingestion exports do not execute directly, but instead schedule their source to be re-executed"
1233                ),
1234                DataSource::Introspection(_)
1235                | DataSource::Webhook
1236                | DataSource::Table { .. }
1237                | DataSource::Progress
1238                | DataSource::Other => {}
1239                DataSource::Sink { .. } => {
1240                    if !self.read_only {
1241                        self.run_export(id)?;
1242                    }
1243                }
1244            };
1245        }
1246
1247        Ok(())
1248    }
1249
1250    fn check_alter_ingestion_source_desc(
1251        &mut self,
1252        ingestion_id: GlobalId,
1253        source_desc: &SourceDesc,
1254    ) -> Result<(), StorageError<Self::Timestamp>> {
1255        let source_collection = self.collection(ingestion_id)?;
1256        let data_source = &source_collection.data_source;
1257        match &data_source {
1258            DataSource::Ingestion(cur_ingestion) => {
1259                cur_ingestion
1260                    .desc
1261                    .alter_compatible(ingestion_id, source_desc)?;
1262            }
1263            o => {
1264                tracing::info!(
1265                    "{ingestion_id} inalterable because its data source is {:?} and not an ingestion",
1266                    o
1267                );
1268                Err(AlterError { id: ingestion_id })?
1269            }
1270        }
1271
1272        Ok(())
1273    }
1274
1275    async fn alter_ingestion_connections(
1276        &mut self,
1277        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
1278    ) -> Result<(), StorageError<Self::Timestamp>> {
1279        // Also have to let StorageCollections know!
1280        self.storage_collections
1281            .alter_ingestion_connections(source_connections.clone())
1282            .await?;
1283
1284        let mut ingestions_to_run = BTreeSet::new();
1285
1286        for (id, conn) in source_connections {
1287            let collection = self
1288                .collections
1289                .get_mut(&id)
1290                .ok_or_else(|| StorageError::IdentifierMissing(id))?;
1291
1292            match &mut collection.data_source {
1293                DataSource::Ingestion(ingestion) => {
1294                    // If the connection hasn't changed, there's no sense in
1295                    // re-rendering the dataflow.
1296                    if ingestion.desc.connection != conn {
1297                        tracing::info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
1298                        ingestion.desc.connection = conn;
1299                        ingestions_to_run.insert(id);
1300                    } else {
1301                        tracing::warn!(
1302                            "update_source_connection called on {id} but the \
1303                            connection was the same"
1304                        );
1305                    }
1306                }
1307                o => {
1308                    tracing::warn!("update_source_connection called on {:?}", o);
1309                    Err(StorageError::IdentifierInvalid(id))?;
1310                }
1311            }
1312        }
1313
1314        for id in ingestions_to_run {
1315            self.run_ingestion(id)?;
1316        }
1317        Ok(())
1318    }
1319
1320    async fn alter_ingestion_export_data_configs(
1321        &mut self,
1322        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
1323    ) -> Result<(), StorageError<Self::Timestamp>> {
1324        // Also have to let StorageCollections know!
1325        self.storage_collections
1326            .alter_ingestion_export_data_configs(source_exports.clone())
1327            .await?;
1328
1329        let mut ingestions_to_run = BTreeSet::new();
1330
1331        for (source_export_id, new_data_config) in source_exports {
1332            // We need to adjust the data config on the CollectionState for
1333            // the source export collection directly
1334            let source_export_collection = self
1335                .collections
1336                .get_mut(&source_export_id)
1337                .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1338            let ingestion_id = match &mut source_export_collection.data_source {
1339                DataSource::IngestionExport {
1340                    ingestion_id,
1341                    details: _,
1342                    data_config,
1343                } => {
1344                    *data_config = new_data_config.clone();
1345                    *ingestion_id
1346                }
1347                o => {
1348                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1349                    Err(StorageError::IdentifierInvalid(source_export_id))?
1350                }
1351            };
1352            // We also need to adjust the data config on the CollectionState of the
1353            // Ingestion that the export is associated with.
1354            let ingestion_collection = self
1355                .collections
1356                .get_mut(&ingestion_id)
1357                .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
1358
1359            match &mut ingestion_collection.data_source {
1360                DataSource::Ingestion(ingestion_desc) => {
1361                    let source_export = ingestion_desc
1362                        .source_exports
1363                        .get_mut(&source_export_id)
1364                        .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1365
1366                    // If the data config hasn't changed, there's no sense in
1367                    // re-rendering the dataflow.
1368                    if source_export.data_config != new_data_config {
1369                        tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
1370                        source_export.data_config = new_data_config;
1371
1372                        ingestions_to_run.insert(ingestion_id);
1373                    } else {
1374                        tracing::warn!(
1375                            "alter_ingestion_export_data_configs called on \
1376                                    export {source_export_id} of {ingestion_id} but \
1377                                    the data config was the same"
1378                        );
1379                    }
1380                }
1381                o => {
1382                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1383                    Err(StorageError::IdentifierInvalid(ingestion_id))?
1384                }
1385            }
1386        }
1387
1388        for id in ingestions_to_run {
1389            self.run_ingestion(id)?;
1390        }
1391        Ok(())
1392    }
1393
1394    async fn alter_table_desc(
1395        &mut self,
1396        existing_collection: GlobalId,
1397        new_collection: GlobalId,
1398        new_desc: RelationDesc,
1399        expected_version: RelationVersion,
1400        register_ts: Self::Timestamp,
1401    ) -> Result<(), StorageError<Self::Timestamp>> {
1402        let data_shard = {
1403            let Controller {
1404                collections,
1405                storage_collections,
1406                ..
1407            } = self;
1408
1409            let existing = collections
1410                .get(&existing_collection)
1411                .ok_or(StorageError::IdentifierMissing(existing_collection))?;
1412            if !matches!(existing.data_source, DataSource::Table { .. }) {
1413                return Err(StorageError::IdentifierInvalid(existing_collection));
1414            }
1415
1416            // Let StorageCollections know!
1417            storage_collections
1418                .alter_table_desc(
1419                    existing_collection,
1420                    new_collection,
1421                    new_desc.clone(),
1422                    expected_version,
1423                )
1424                .await?;
1425
1426            existing.collection_metadata.data_shard.clone()
1427        };
1428
1429        let persist_client = self
1430            .persist
1431            .open(self.persist_location.clone())
1432            .await
1433            .expect("invalid persist location");
1434        let write_handle = self
1435            .open_data_handles(
1436                &existing_collection,
1437                data_shard,
1438                new_desc.clone(),
1439                &persist_client,
1440            )
1441            .await;
1442
1443        // Note: The new collection is now the "primary collection" so we specify `None` here.
1444        let collection_desc = CollectionDescription::<T>::for_table(new_desc.clone(), None);
1445        let collection_meta = CollectionMetadata {
1446            persist_location: self.persist_location.clone(),
1447            data_shard,
1448            relation_desc: new_desc.clone(),
1449            // TODO(alter_table): Support schema evolution on sources.
1450            remap_shard: None,
1451            txns_shard: Some(self.txns_read.txns_id().clone()),
1452        };
1453        // TODO(alter_table): Support schema evolution on sources.
1454        let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(new_collection, None);
1455        let collection_state = CollectionState::new(
1456            collection_desc.data_source.clone(),
1457            collection_meta,
1458            CollectionStateExtra::None,
1459            wallclock_lag_metrics,
1460        );
1461
1462        // Great! We have successfully evolved the schema of our Table, now we need to update our
1463        // in-memory data structures.
1464        self.collections.insert(new_collection, collection_state);
1465        let existing = self
1466            .collections
1467            .get_mut(&existing_collection)
1468            .expect("missing existing collection");
1469        assert!(matches!(
1470            existing.data_source,
1471            DataSource::Table { primary: None }
1472        ));
1473        existing.data_source = DataSource::Table {
1474            primary: Some(new_collection),
1475        };
1476
1477        self.persist_table_worker
1478            .register(register_ts, vec![(new_collection, write_handle)])
1479            .await
1480            .expect("table worker unexpectedly shut down");
1481
1482        self.append_shard_mappings([new_collection].into_iter(), Diff::ONE);
1483
1484        Ok(())
1485    }
1486
1487    fn export(
1488        &self,
1489        id: GlobalId,
1490    ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1491        self.collections
1492            .get(&id)
1493            .and_then(|c| match &c.extra_state {
1494                CollectionStateExtra::Export(state) => Some(state),
1495                _ => None,
1496            })
1497            .ok_or(StorageError::IdentifierMissing(id))
1498    }
1499
1500    fn export_mut(
1501        &mut self,
1502        id: GlobalId,
1503    ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1504        self.collections
1505            .get_mut(&id)
1506            .and_then(|c| match &mut c.extra_state {
1507                CollectionStateExtra::Export(state) => Some(state),
1508                _ => None,
1509            })
1510            .ok_or(StorageError::IdentifierMissing(id))
1511    }
1512
1513    /// Create a oneshot ingestion.
1514    async fn create_oneshot_ingestion(
1515        &mut self,
1516        ingestion_id: uuid::Uuid,
1517        collection_id: GlobalId,
1518        instance_id: StorageInstanceId,
1519        request: OneshotIngestionRequest,
1520        result_tx: OneshotResultCallback<ProtoBatch>,
1521    ) -> Result<(), StorageError<Self::Timestamp>> {
1522        let collection_meta = self
1523            .collections
1524            .get(&collection_id)
1525            .ok_or_else(|| StorageError::IdentifierMissing(collection_id))?
1526            .collection_metadata
1527            .clone();
1528        let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1529            // TODO(cf2): Refine this error.
1530            StorageError::Generic(anyhow::anyhow!("missing cluster {instance_id}"))
1531        })?;
1532        let oneshot_cmd = RunOneshotIngestion {
1533            ingestion_id,
1534            collection_id,
1535            collection_meta,
1536            request,
1537        };
1538
1539        if !self.read_only {
1540            instance.send(StorageCommand::RunOneshotIngestion(Box::new(oneshot_cmd)));
1541            let pending = PendingOneshotIngestion {
1542                result_tx,
1543                cluster_id: instance_id,
1544            };
1545            let novel = self
1546                .pending_oneshot_ingestions
1547                .insert(ingestion_id, pending);
1548            assert_none!(novel);
1549            Ok(())
1550        } else {
1551            Err(StorageError::ReadOnly)
1552        }
1553    }
1554
1555    fn cancel_oneshot_ingestion(
1556        &mut self,
1557        ingestion_id: uuid::Uuid,
1558    ) -> Result<(), StorageError<Self::Timestamp>> {
1559        if self.read_only {
1560            return Err(StorageError::ReadOnly);
1561        }
1562
1563        let pending = self
1564            .pending_oneshot_ingestions
1565            .remove(&ingestion_id)
1566            .ok_or_else(|| {
1567                // TODO(cf2): Refine this error.
1568                StorageError::Generic(anyhow::anyhow!("missing oneshot ingestion {ingestion_id}"))
1569            })?;
1570
1571        match self.instances.get_mut(&pending.cluster_id) {
1572            Some(instance) => {
1573                instance.send(StorageCommand::CancelOneshotIngestion(ingestion_id));
1574            }
1575            None => {
1576                mz_ore::soft_panic_or_log!(
1577                    "canceling oneshot ingestion on non-existent cluster, ingestion {:?}, instance {}",
1578                    ingestion_id,
1579                    pending.cluster_id,
1580                );
1581            }
1582        }
1583        // Respond to the user that the request has been canceled.
1584        pending.cancel();
1585
1586        Ok(())
1587    }
1588
1589    async fn alter_export(
1590        &mut self,
1591        id: GlobalId,
1592        new_description: ExportDescription<Self::Timestamp>,
1593    ) -> Result<(), StorageError<Self::Timestamp>> {
1594        let from_id = new_description.sink.from;
1595
1596        // Acquire read holds at StorageCollections to ensure that the
1597        // sinked collection is not dropped while we're sinking it.
1598        let desired_read_holds = vec![from_id.clone(), id.clone()];
1599        let [input_hold, self_hold] = self
1600            .storage_collections
1601            .acquire_read_holds(desired_read_holds)
1602            .expect("missing dependency")
1603            .try_into()
1604            .expect("expected number of holds");
1605        let from_storage_metadata = self.storage_collections.collection_metadata(from_id)?;
1606        let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1607
1608        // Check whether the sink's write frontier is beyond the read hold we got
1609        let cur_export = self.export_mut(id)?;
1610        let input_readable = cur_export
1611            .write_frontier
1612            .iter()
1613            .all(|t| input_hold.since().less_than(t));
1614        if !input_readable {
1615            return Err(StorageError::ReadBeforeSince(from_id));
1616        }
1617
1618        let new_export = ExportState {
1619            read_capabilities: cur_export.read_capabilities.clone(),
1620            cluster_id: new_description.instance_id,
1621            derived_since: cur_export.derived_since.clone(),
1622            read_holds: [input_hold, self_hold],
1623            read_policy: cur_export.read_policy.clone(),
1624            write_frontier: cur_export.write_frontier.clone(),
1625        };
1626        *cur_export = new_export;
1627
1628        let cmd = RunSinkCommand {
1629            id,
1630            description: StorageSinkDesc {
1631                from: from_id,
1632                from_desc: new_description.sink.from_desc,
1633                connection: new_description.sink.connection,
1634                envelope: new_description.sink.envelope,
1635                as_of: new_description.sink.as_of,
1636                version: new_description.sink.version,
1637                from_storage_metadata,
1638                with_snapshot: new_description.sink.with_snapshot,
1639                to_storage_metadata,
1640            },
1641        };
1642
1643        // Fetch the client for this export's cluster.
1644        let instance = self
1645            .instances
1646            .get_mut(&new_description.instance_id)
1647            .ok_or_else(|| StorageError::ExportInstanceMissing {
1648                storage_instance_id: new_description.instance_id,
1649                export_id: id,
1650            })?;
1651
1652        instance.send(StorageCommand::RunSink(Box::new(cmd)));
1653        Ok(())
1654    }
1655
1656    /// Create the sinks described by the `ExportDescription`.
1657    async fn alter_export_connections(
1658        &mut self,
1659        exports: BTreeMap<GlobalId, StorageSinkConnection>,
1660    ) -> Result<(), StorageError<Self::Timestamp>> {
1661        let mut updates_by_instance =
1662            BTreeMap::<StorageInstanceId, Vec<(RunSinkCommand<T>, ExportDescription<T>)>>::new();
1663
1664        for (id, connection) in exports {
1665            // We stage changes in new_export_description and then apply all
1666            // updates to exports at the end.
1667            //
1668            // We don't just go ahead and clone the `ExportState` itself and
1669            // update that because `ExportState` is not clone, because it holds
1670            // a `ReadHandle` and cloning that would cause additional work for
1671            // whoever guarantees those read holds.
1672            let (mut new_export_description, as_of): (ExportDescription<Self::Timestamp>, _) = {
1673                let export = &self.collections[&id];
1674                let DataSource::Sink { desc } = &export.data_source else {
1675                    panic!("export exists")
1676                };
1677                let CollectionStateExtra::Export(state) = &export.extra_state else {
1678                    panic!("export exists")
1679                };
1680                let export_description = desc.clone();
1681                let as_of = state.input_hold().since().clone();
1682
1683                (export_description, as_of)
1684            };
1685            let current_sink = new_export_description.sink.clone();
1686
1687            new_export_description.sink.connection = connection;
1688
1689            // Ensure compatibility
1690            current_sink.alter_compatible(id, &new_export_description.sink)?;
1691
1692            let from_storage_metadata = self
1693                .storage_collections
1694                .collection_metadata(new_export_description.sink.from)?;
1695            let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1696
1697            let cmd = RunSinkCommand {
1698                id,
1699                description: StorageSinkDesc {
1700                    from: new_export_description.sink.from,
1701                    from_desc: new_export_description.sink.from_desc.clone(),
1702                    connection: new_export_description.sink.connection.clone(),
1703                    envelope: new_export_description.sink.envelope,
1704                    with_snapshot: new_export_description.sink.with_snapshot,
1705                    version: new_export_description.sink.version,
1706                    // Here we are about to send a RunSinkCommand with the current read capaibility
1707                    // held by this sink. However, clusters are already running a version of the
1708                    // sink and nothing guarantees that by the time this command arrives at the
1709                    // clusters they won't have made additional progress such that this read
1710                    // capability is invalidated.
1711                    // The solution to this problem is for the controller to track specific
1712                    // executions of dataflows such that it can track the shutdown of the current
1713                    // instance and the initialization of the new instance separately and ensure
1714                    // read holds are held for the correct amount of time.
1715                    // TODO(petrosagg): change the controller to explicitly track dataflow executions
1716                    as_of: as_of.to_owned(),
1717                    from_storage_metadata,
1718                    to_storage_metadata,
1719                },
1720            };
1721
1722            let update = updates_by_instance
1723                .entry(new_export_description.instance_id)
1724                .or_default();
1725            update.push((cmd, new_export_description));
1726        }
1727
1728        for (instance_id, updates) in updates_by_instance {
1729            let mut export_updates = BTreeMap::new();
1730            let mut cmds = Vec::with_capacity(updates.len());
1731
1732            for (cmd, export_state) in updates {
1733                export_updates.insert(cmd.id, export_state);
1734                cmds.push(cmd);
1735            }
1736
1737            // Fetch the client for this exports's cluster.
1738            let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1739                StorageError::ExportInstanceMissing {
1740                    storage_instance_id: instance_id,
1741                    export_id: *export_updates
1742                        .keys()
1743                        .next()
1744                        .expect("set of exports not empty"),
1745                }
1746            })?;
1747
1748            for cmd in cmds {
1749                instance.send(StorageCommand::RunSink(Box::new(cmd)));
1750            }
1751
1752            // Update state only after all possible errors have occurred.
1753            for (id, new_export_description) in export_updates {
1754                let Some(state) = self.collections.get_mut(&id) else {
1755                    panic!("export known to exist")
1756                };
1757                let DataSource::Sink { desc } = &mut state.data_source else {
1758                    panic!("export known to exist")
1759                };
1760                *desc = new_export_description;
1761            }
1762        }
1763
1764        Ok(())
1765    }
1766
1767    // Dropping a table takes roughly the following flow:
1768    //
1769    // First determine if this is a TableWrites table or a source-fed table (an IngestionExport):
1770    //
1771    // If this is a TableWrites table:
1772    //   1. We remove the table from the persist table write worker.
1773    //   2. The table removal is awaited in an async task.
1774    //   3. A message is sent to the storage controller that the table has been removed from the
1775    //      table write worker.
1776    //   4. The controller drains all table drop messages during `process`.
1777    //   5. `process` calls `drop_sources` with the dropped tables.
1778    //
1779    // If this is an IngestionExport table:
1780    //   1. We validate the ids and then call drop_sources_unvalidated to proceed dropping.
1781    fn drop_tables(
1782        &mut self,
1783        storage_metadata: &StorageMetadata,
1784        identifiers: Vec<GlobalId>,
1785        ts: Self::Timestamp,
1786    ) -> Result<(), StorageError<Self::Timestamp>> {
1787        // Collect tables by their data_source
1788        let (table_write_ids, data_source_ids): (Vec<_>, Vec<_>) = identifiers
1789            .into_iter()
1790            .partition(|id| match self.collections[id].data_source {
1791                DataSource::Table { .. } => true,
1792                DataSource::IngestionExport { .. } | DataSource::Webhook => false,
1793                _ => panic!("identifier is not a table: {}", id),
1794            });
1795
1796        // Drop table write tables
1797        if table_write_ids.len() > 0 {
1798            let drop_notif = self
1799                .persist_table_worker
1800                .drop_handles(table_write_ids.clone(), ts);
1801            let tx = self.pending_table_handle_drops_tx.clone();
1802            mz_ore::task::spawn(|| "table-cleanup".to_string(), async move {
1803                drop_notif.await;
1804                for identifier in table_write_ids {
1805                    let _ = tx.send(identifier);
1806                }
1807            });
1808        }
1809
1810        // Drop source-fed tables
1811        if data_source_ids.len() > 0 {
1812            self.validate_collection_ids(data_source_ids.iter().cloned())?;
1813            self.drop_sources_unvalidated(storage_metadata, data_source_ids)?;
1814        }
1815
1816        Ok(())
1817    }
1818
1819    fn drop_sources(
1820        &mut self,
1821        storage_metadata: &StorageMetadata,
1822        identifiers: Vec<GlobalId>,
1823    ) -> Result<(), StorageError<Self::Timestamp>> {
1824        self.validate_collection_ids(identifiers.iter().cloned())?;
1825        self.drop_sources_unvalidated(storage_metadata, identifiers)
1826    }
1827
1828    fn drop_sources_unvalidated(
1829        &mut self,
1830        storage_metadata: &StorageMetadata,
1831        ids: Vec<GlobalId>,
1832    ) -> Result<(), StorageError<Self::Timestamp>> {
1833        // Keep track of which ingestions we have to execute still, and which we
1834        // have to change because of dropped subsources.
1835        let mut ingestions_to_execute = BTreeSet::new();
1836        let mut ingestions_to_drop = BTreeSet::new();
1837        let mut source_statistics_to_drop = Vec::new();
1838
1839        // Ingestions (and their exports) are also collections, but we keep
1840        // track of non-ingestion collections separately, because they have
1841        // slightly different cleanup logic below.
1842        let mut collections_to_drop = Vec::new();
1843
1844        for id in ids.iter() {
1845            let metadata = storage_metadata.get_collection_shard::<T>(*id);
1846            mz_ore::soft_assert_or_log!(
1847                matches!(metadata, Err(StorageError::IdentifierMissing(_))),
1848                "dropping {id}, but drop was not synchronized with storage \
1849                controller via `synchronize_collections`"
1850            );
1851
1852            let collection_state = self.collections.get(id);
1853
1854            if let Some(collection_state) = collection_state {
1855                match collection_state.data_source {
1856                    DataSource::Webhook => {
1857                        // TODO(parkmycar): The Collection Manager and PersistMonotonicWriter
1858                        // could probably use some love and maybe get merged together?
1859                        let fut = self.collection_manager.unregister_collection(*id);
1860                        mz_ore::task::spawn(|| format!("storage-webhook-cleanup-{id}"), fut);
1861
1862                        collections_to_drop.push(*id);
1863                        source_statistics_to_drop.push(*id);
1864                    }
1865                    DataSource::Ingestion(_) => {
1866                        ingestions_to_drop.insert(*id);
1867                        source_statistics_to_drop.push(*id);
1868                    }
1869                    DataSource::IngestionExport { ingestion_id, .. } => {
1870                        // If we are dropping source exports, we need to modify the
1871                        // ingestion that it runs on.
1872                        //
1873                        // If we remove this export, we need to stop producing data to
1874                        // it, so plan to re-execute the ingestion with the amended
1875                        // description.
1876                        ingestions_to_execute.insert(ingestion_id);
1877
1878                        // Adjust the source to remove this export.
1879                        let ingestion_state = match self.collections.get_mut(&ingestion_id) {
1880                            Some(ingestion_collection) => ingestion_collection,
1881                            // Primary ingestion already dropped.
1882                            None => {
1883                                tracing::error!(
1884                                    "primary source {ingestion_id} seemingly dropped before subsource {id}"
1885                                );
1886                                continue;
1887                            }
1888                        };
1889
1890                        match &mut ingestion_state.data_source {
1891                            DataSource::Ingestion(ingestion_desc) => {
1892                                let removed = ingestion_desc.source_exports.remove(id);
1893                                mz_ore::soft_assert_or_log!(
1894                                    removed.is_some(),
1895                                    "dropped subsource {id} already removed from source exports"
1896                                );
1897                            }
1898                            _ => unreachable!(
1899                                "SourceExport must only refer to primary sources that already exist"
1900                            ),
1901                        };
1902
1903                        // Ingestion exports also have ReadHolds that we need to
1904                        // downgrade, and much of their drop machinery is the
1905                        // same as for the "main" ingestion.
1906                        ingestions_to_drop.insert(*id);
1907                        source_statistics_to_drop.push(*id);
1908                    }
1909                    DataSource::Progress | DataSource::Table { .. } | DataSource::Other => {
1910                        collections_to_drop.push(*id);
1911                    }
1912                    DataSource::Introspection(_) | DataSource::Sink { .. } => {
1913                        // Collections of these types are either not sources and should be dropped
1914                        // through other means, or are sources but should never be dropped.
1915                        soft_panic_or_log!(
1916                            "drop_sources called on a {:?} (id={id}))",
1917                            collection_state.data_source,
1918                        );
1919                    }
1920                }
1921            }
1922        }
1923
1924        // Do not bother re-executing ingestions we know we plan to drop.
1925        ingestions_to_execute.retain(|id| !ingestions_to_drop.contains(id));
1926        for ingestion_id in ingestions_to_execute {
1927            self.run_ingestion(ingestion_id)?;
1928        }
1929
1930        // For ingestions, we fabricate a new hold that will propagate through
1931        // the cluster and then back to us.
1932
1933        // We don't explicitly remove read capabilities! Downgrading the
1934        // frontier of the source to `[]` (the empty Antichain), will propagate
1935        // to the storage dependencies.
1936        let ingestion_policies = ingestions_to_drop
1937            .iter()
1938            .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
1939            .collect();
1940
1941        tracing::debug!(
1942            ?ingestion_policies,
1943            "dropping sources by setting read hold policies"
1944        );
1945        self.set_hold_policies(ingestion_policies);
1946
1947        // Delete all collection->shard mappings
1948        let shards_to_update: BTreeSet<_> = ingestions_to_drop
1949            .iter()
1950            .chain(collections_to_drop.iter())
1951            .cloned()
1952            .collect();
1953        self.append_shard_mappings(shards_to_update.into_iter(), Diff::MINUS_ONE);
1954
1955        let status_now = mz_ore::now::to_datetime((self.now)());
1956        let mut status_updates = vec![];
1957        for id in ingestions_to_drop.iter() {
1958            status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
1959        }
1960
1961        if !self.read_only {
1962            self.append_status_introspection_updates(
1963                IntrospectionType::SourceStatusHistory,
1964                status_updates,
1965            );
1966        }
1967
1968        {
1969            let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1970            for id in source_statistics_to_drop {
1971                source_statistics
1972                    .source_statistics
1973                    .retain(|(stats_id, _), _| stats_id != &id);
1974                source_statistics
1975                    .webhook_statistics
1976                    .retain(|stats_id, _| stats_id != &id);
1977            }
1978        }
1979
1980        // Remove collection state
1981        for id in ingestions_to_drop.iter().chain(collections_to_drop.iter()) {
1982            tracing::info!(%id, "dropping collection state");
1983            let collection = self
1984                .collections
1985                .remove(id)
1986                .expect("list populated after checking that self.collections contains it");
1987
1988            let instance = match &collection.extra_state {
1989                CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
1990                CollectionStateExtra::Export(export) => Some(export.cluster_id()),
1991                CollectionStateExtra::None => None,
1992            }
1993            .and_then(|i| self.instances.get(&i));
1994
1995            // Record which replicas were running a collection, so that we can
1996            // match DroppedId messages against them and eventually remove state
1997            // from self.dropped_objects
1998            if let Some(instance) = instance {
1999                let active_replicas = instance.get_active_replicas_for_object(id);
2000                if !active_replicas.is_empty() {
2001                    // The remap collection of an ingestion doesn't have extra
2002                    // state and doesn't have an instance_id, but we still get
2003                    // upper updates for them, so want to make sure to populate
2004                    // dropped_ids.
2005                    // TODO(aljoscha): All this is a bit icky. But here we are
2006                    // for now...
2007                    match &collection.data_source {
2008                        DataSource::Ingestion(ingestion_desc) => {
2009                            self.dropped_objects.insert(
2010                                ingestion_desc.remap_collection_id,
2011                                active_replicas.clone(),
2012                            );
2013                        }
2014                        _ => {}
2015                    }
2016
2017                    self.dropped_objects.insert(*id, active_replicas);
2018                }
2019            }
2020        }
2021
2022        // Also let StorageCollections know!
2023        self.storage_collections
2024            .drop_collections_unvalidated(storage_metadata, ids);
2025
2026        Ok(())
2027    }
2028
2029    /// Drops the read capability for the sinks and allows their resources to be reclaimed.
2030    fn drop_sinks(
2031        &mut self,
2032        storage_metadata: &StorageMetadata,
2033        identifiers: Vec<GlobalId>,
2034    ) -> Result<(), StorageError<Self::Timestamp>> {
2035        self.validate_export_ids(identifiers.iter().cloned())?;
2036        self.drop_sinks_unvalidated(storage_metadata, identifiers);
2037        Ok(())
2038    }
2039
2040    fn drop_sinks_unvalidated(
2041        &mut self,
2042        storage_metadata: &StorageMetadata,
2043        mut sinks_to_drop: Vec<GlobalId>,
2044    ) {
2045        // Ignore exports that have already been removed.
2046        sinks_to_drop.retain(|id| self.export(*id).is_ok());
2047
2048        // TODO: ideally we'd advance the write frontier ourselves here, but this function's
2049        // not yet marked async.
2050
2051        // We don't explicitly remove read capabilities! Downgrading the
2052        // frontier of the source to `[]` (the empty Antichain), will propagate
2053        // to the storage dependencies.
2054        let drop_policy = sinks_to_drop
2055            .iter()
2056            .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
2057            .collect();
2058
2059        tracing::debug!(
2060            ?drop_policy,
2061            "dropping sources by setting read hold policies"
2062        );
2063        self.set_hold_policies(drop_policy);
2064
2065        // Record the drop status for all sink drops.
2066        //
2067        // We also delete the items' statistics objects.
2068        //
2069        // The locks are held for a short time, only while we do some removals from a map.
2070
2071        let status_now = mz_ore::now::to_datetime((self.now)());
2072
2073        // Record the drop status for all pending sink drops.
2074        let mut status_updates = vec![];
2075        {
2076            let mut sink_statistics = self.sink_statistics.lock().expect("poisoned");
2077            for id in sinks_to_drop.iter() {
2078                status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
2079                sink_statistics.retain(|(stats_id, _), _| stats_id != id);
2080            }
2081        }
2082
2083        if !self.read_only {
2084            self.append_status_introspection_updates(
2085                IntrospectionType::SinkStatusHistory,
2086                status_updates,
2087            );
2088        }
2089
2090        // Remove collection/export state
2091        for id in sinks_to_drop.iter() {
2092            tracing::info!(%id, "dropping export state");
2093            let collection = self
2094                .collections
2095                .remove(id)
2096                .expect("list populated after checking that self.collections contains it");
2097
2098            let instance = match &collection.extra_state {
2099                CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
2100                CollectionStateExtra::Export(export) => Some(export.cluster_id()),
2101                CollectionStateExtra::None => None,
2102            }
2103            .and_then(|i| self.instances.get(&i));
2104
2105            // Record how many replicas were running an export, so that we can
2106            // match `DroppedId` messages against it and eventually remove state
2107            // from `self.dropped_objects`.
2108            if let Some(instance) = instance {
2109                let active_replicas = instance.get_active_replicas_for_object(id);
2110                if !active_replicas.is_empty() {
2111                    self.dropped_objects.insert(*id, active_replicas);
2112                }
2113            }
2114        }
2115
2116        // Also let StorageCollections know!
2117        self.storage_collections
2118            .drop_collections_unvalidated(storage_metadata, sinks_to_drop);
2119    }
2120
2121    #[instrument(level = "debug")]
2122    fn append_table(
2123        &mut self,
2124        write_ts: Self::Timestamp,
2125        advance_to: Self::Timestamp,
2126        commands: Vec<(GlobalId, Vec<TableData>)>,
2127    ) -> Result<
2128        tokio::sync::oneshot::Receiver<Result<(), StorageError<Self::Timestamp>>>,
2129        StorageError<Self::Timestamp>,
2130    > {
2131        if self.read_only {
2132            // While in read only mode, ONLY collections that have been migrated
2133            // and need to be re-hydrated in read only mode can be written to.
2134            if !commands
2135                .iter()
2136                .all(|(id, _)| id.is_system() && self.migrated_storage_collections.contains(id))
2137            {
2138                return Err(StorageError::ReadOnly);
2139            }
2140        }
2141
2142        // TODO(petrosagg): validate appends against the expected RelationDesc of the collection
2143        for (id, updates) in commands.iter() {
2144            if !updates.is_empty() {
2145                if !write_ts.less_than(&advance_to) {
2146                    return Err(StorageError::UpdateBeyondUpper(*id));
2147                }
2148            }
2149        }
2150
2151        Ok(self
2152            .persist_table_worker
2153            .append(write_ts, advance_to, commands))
2154    }
2155
2156    fn monotonic_appender(
2157        &self,
2158        id: GlobalId,
2159    ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>> {
2160        self.collection_manager.monotonic_appender(id)
2161    }
2162
2163    fn webhook_statistics(
2164        &self,
2165        id: GlobalId,
2166    ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>> {
2167        // Call to this method are usually cached so the lock is not in the critical path.
2168        let source_statistics = self.source_statistics.lock().expect("poisoned");
2169        source_statistics
2170            .webhook_statistics
2171            .get(&id)
2172            .cloned()
2173            .ok_or(StorageError::IdentifierMissing(id))
2174    }
2175
2176    async fn ready(&mut self) {
2177        if self.maintenance_scheduled {
2178            return;
2179        }
2180
2181        if !self.pending_table_handle_drops_rx.is_empty() {
2182            return;
2183        }
2184
2185        tokio::select! {
2186            Some(m) = self.instance_response_rx.recv() => {
2187                self.stashed_responses.push(m);
2188                while let Ok(m) = self.instance_response_rx.try_recv() {
2189                    self.stashed_responses.push(m);
2190                }
2191            }
2192            _ = self.maintenance_ticker.tick() => {
2193                self.maintenance_scheduled = true;
2194            },
2195        };
2196    }
2197
2198    #[instrument(level = "debug")]
2199    fn process(
2200        &mut self,
2201        storage_metadata: &StorageMetadata,
2202    ) -> Result<Option<Response<T>>, anyhow::Error> {
2203        // Perform periodic maintenance work.
2204        if self.maintenance_scheduled {
2205            self.maintain();
2206            self.maintenance_scheduled = false;
2207        }
2208
2209        for instance in self.instances.values_mut() {
2210            instance.rehydrate_failed_replicas();
2211        }
2212
2213        let mut status_updates = vec![];
2214        let mut updated_frontiers = BTreeMap::new();
2215
2216        // Take the currently stashed responses so that we can call mut receiver functions in the loop.
2217        let stashed_responses = std::mem::take(&mut self.stashed_responses);
2218        for resp in stashed_responses {
2219            match resp {
2220                (_replica_id, StorageResponse::FrontierUpper(id, upper)) => {
2221                    self.update_write_frontier(id, &upper);
2222                    updated_frontiers.insert(id, upper);
2223                }
2224                (replica_id, StorageResponse::DroppedId(id)) => {
2225                    let replica_id = replica_id.expect("DroppedId from unknown replica");
2226                    if let Some(remaining_replicas) = self.dropped_objects.get_mut(&id) {
2227                        remaining_replicas.remove(&replica_id);
2228                        if remaining_replicas.is_empty() {
2229                            self.dropped_objects.remove(&id);
2230                        }
2231                    } else {
2232                        soft_panic_or_log!("unexpected DroppedId for {id}");
2233                    }
2234                }
2235                (replica_id, StorageResponse::StatisticsUpdates(source_stats, sink_stats)) => {
2236                    // Note we only hold the locks while moving some plain-old-data around here.
2237                    {
2238                        // NOTE(aljoscha): We explicitly unwrap the `Option`,
2239                        // because we expect that stats coming from replicas
2240                        // have a replica id.
2241                        //
2242                        // If this is `None` and we would use that to access
2243                        // state below we might clobber something unexpectedly.
2244                        let replica_id = if let Some(replica_id) = replica_id {
2245                            replica_id
2246                        } else {
2247                            tracing::error!(
2248                                ?source_stats,
2249                                "missing replica_id for source statistics update"
2250                            );
2251                            continue;
2252                        };
2253
2254                        let mut shared_stats = self.source_statistics.lock().expect("poisoned");
2255
2256                        for stat in source_stats {
2257                            let collection_id = stat.id.clone();
2258
2259                            if self.collection(collection_id).is_err() {
2260                                // We can get updates for collections that have
2261                                // already been deleted, ignore those.
2262                                continue;
2263                            }
2264
2265                            let entry = shared_stats
2266                                .source_statistics
2267                                .entry((stat.id, Some(replica_id)));
2268
2269                            match entry {
2270                                btree_map::Entry::Vacant(vacant_entry) => {
2271                                    let mut stats = ControllerSourceStatistics::new(
2272                                        collection_id,
2273                                        Some(replica_id),
2274                                    );
2275                                    stats.incorporate(stat);
2276                                    vacant_entry.insert(stats);
2277                                }
2278                                btree_map::Entry::Occupied(mut occupied_entry) => {
2279                                    occupied_entry.get_mut().incorporate(stat);
2280                                }
2281                            }
2282                        }
2283                    }
2284
2285                    {
2286                        // NOTE(aljoscha); Same as above. We want to be
2287                        // explicit.
2288                        //
2289                        // Also, technically for sinks there is no webhook
2290                        // "sources" that would force us to use an `Option`. But
2291                        // we still have to use an option for other reasons: the
2292                        // scraper expects a trait for working with stats, and
2293                        // that in the end forces the sink stats map to also
2294                        // have `Option` in the key. Do I like that? No, but
2295                        // here we are.
2296                        let replica_id = if let Some(replica_id) = replica_id {
2297                            replica_id
2298                        } else {
2299                            tracing::error!(
2300                                ?sink_stats,
2301                                "missing replica_id for sink statistics update"
2302                            );
2303                            continue;
2304                        };
2305
2306                        let mut shared_stats = self.sink_statistics.lock().expect("poisoned");
2307
2308                        for stat in sink_stats {
2309                            let collection_id = stat.id.clone();
2310
2311                            if self.collection(collection_id).is_err() {
2312                                // We can get updates for collections that have
2313                                // already been deleted, ignore those.
2314                                continue;
2315                            }
2316
2317                            let entry = shared_stats.entry((stat.id, Some(replica_id)));
2318
2319                            match entry {
2320                                btree_map::Entry::Vacant(vacant_entry) => {
2321                                    let mut stats =
2322                                        ControllerSinkStatistics::new(collection_id, replica_id);
2323                                    stats.incorporate(stat);
2324                                    vacant_entry.insert(stats);
2325                                }
2326                                btree_map::Entry::Occupied(mut occupied_entry) => {
2327                                    occupied_entry.get_mut().incorporate(stat);
2328                                }
2329                            }
2330                        }
2331                    }
2332                }
2333                (replica_id, StorageResponse::StatusUpdate(mut status_update)) => {
2334                    // NOTE(aljoscha): We sniff out the hydration status for
2335                    // ingestions from status updates. This is the easiest we
2336                    // can do right now, without going deeper into changing the
2337                    // comms protocol between controller and cluster. We cannot,
2338                    // for example use `StorageResponse::FrontierUpper`,
2339                    // because those will already get sent when the ingestion is
2340                    // just being created.
2341                    //
2342                    // Sources differ in when they will report as Running. Kafka
2343                    // UPSERT sources will only switch to `Running` once their
2344                    // state has been initialized from persist, which is the
2345                    // first case that we care about right now.
2346                    //
2347                    // I wouldn't say it's ideal, but it's workable until we
2348                    // find something better.
2349                    match status_update.status {
2350                        Status::Running => {
2351                            let collection = self.collections.get_mut(&status_update.id);
2352                            match collection {
2353                                Some(collection) => {
2354                                    match collection.extra_state {
2355                                        CollectionStateExtra::Ingestion(
2356                                            ref mut ingestion_state,
2357                                        ) => {
2358                                            if ingestion_state.hydrated_on.is_empty() {
2359                                                tracing::debug!(ingestion_id = %status_update.id, "ingestion is hydrated");
2360                                            }
2361                                            ingestion_state.hydrated_on.insert(replica_id.expect(
2362                                                "replica id should be present for status running",
2363                                            ));
2364                                        }
2365                                        CollectionStateExtra::Export(_) => {
2366                                            // TODO(sinks): track sink hydration?
2367                                        }
2368                                        CollectionStateExtra::None => {
2369                                            // Nothing to do
2370                                        }
2371                                    }
2372                                }
2373                                None => (), // no collection, let's say that's fine
2374                                            // here
2375                            }
2376                        }
2377                        Status::Paused => {
2378                            let collection = self.collections.get_mut(&status_update.id);
2379                            match collection {
2380                                Some(collection) => {
2381                                    match collection.extra_state {
2382                                        CollectionStateExtra::Ingestion(
2383                                            ref mut ingestion_state,
2384                                        ) => {
2385                                            // TODO: Paused gets send when there
2386                                            // are no active replicas. We should
2387                                            // change this to send a targeted
2388                                            // Pause for each replica, and do
2389                                            // more fine-grained hydration
2390                                            // tracking here.
2391                                            tracing::debug!(ingestion_id = %status_update.id, "ingestion is now paused");
2392                                            ingestion_state.hydrated_on.clear();
2393                                        }
2394                                        CollectionStateExtra::Export(_) => {
2395                                            // TODO(sinks): track sink hydration?
2396                                        }
2397                                        CollectionStateExtra::None => {
2398                                            // Nothing to do
2399                                        }
2400                                    }
2401                                }
2402                                None => (), // no collection, let's say that's fine
2403                                            // here
2404                            }
2405                        }
2406                        _ => (),
2407                    }
2408
2409                    // Set replica_id in the status update if available
2410                    if let Some(id) = replica_id {
2411                        status_update.replica_id = Some(id);
2412                    }
2413                    status_updates.push(status_update);
2414                }
2415                (_replica_id, StorageResponse::StagedBatches(batches)) => {
2416                    for (ingestion_id, batches) in batches {
2417                        match self.pending_oneshot_ingestions.remove(&ingestion_id) {
2418                            Some(pending) => {
2419                                // Send a cancel command so our command history is correct. And to
2420                                // avoid duplicate work once we have active replication.
2421                                if let Some(instance) = self.instances.get_mut(&pending.cluster_id)
2422                                {
2423                                    instance
2424                                        .send(StorageCommand::CancelOneshotIngestion(ingestion_id));
2425                                }
2426                                // Send the results down our channel.
2427                                (pending.result_tx)(batches)
2428                            }
2429                            None => {
2430                                // We might not be tracking this oneshot ingestion anymore because
2431                                // it was canceled.
2432                            }
2433                        }
2434                    }
2435                }
2436            }
2437        }
2438
2439        self.record_status_updates(status_updates);
2440
2441        // Process dropped tables in a single batch.
2442        let mut dropped_table_ids = Vec::new();
2443        while let Ok(dropped_id) = self.pending_table_handle_drops_rx.try_recv() {
2444            dropped_table_ids.push(dropped_id);
2445        }
2446        if !dropped_table_ids.is_empty() {
2447            self.drop_sources(storage_metadata, dropped_table_ids)?;
2448        }
2449
2450        if updated_frontiers.is_empty() {
2451            Ok(None)
2452        } else {
2453            Ok(Some(Response::FrontierUpdates(
2454                updated_frontiers.into_iter().collect(),
2455            )))
2456        }
2457    }
2458
2459    async fn inspect_persist_state(
2460        &self,
2461        id: GlobalId,
2462    ) -> Result<serde_json::Value, anyhow::Error> {
2463        let collection = &self.storage_collections.collection_metadata(id)?;
2464        let client = self
2465            .persist
2466            .open(collection.persist_location.clone())
2467            .await?;
2468        let shard_state = client
2469            .inspect_shard::<Self::Timestamp>(&collection.data_shard)
2470            .await?;
2471        let json_state = serde_json::to_value(shard_state)?;
2472        Ok(json_state)
2473    }
2474
2475    fn append_introspection_updates(
2476        &mut self,
2477        type_: IntrospectionType,
2478        updates: Vec<(Row, Diff)>,
2479    ) {
2480        let id = self.introspection_ids[&type_];
2481        let updates = updates.into_iter().map(|update| update.into()).collect();
2482        self.collection_manager.blind_write(id, updates);
2483    }
2484
2485    fn append_status_introspection_updates(
2486        &mut self,
2487        type_: IntrospectionType,
2488        updates: Vec<StatusUpdate>,
2489    ) {
2490        let id = self.introspection_ids[&type_];
2491        let updates: Vec<_> = updates.into_iter().map(|update| update.into()).collect();
2492        if !updates.is_empty() {
2493            self.collection_manager.blind_write(id, updates);
2494        }
2495    }
2496
2497    fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp) {
2498        let id = self.introspection_ids[&type_];
2499        self.collection_manager.differential_write(id, op);
2500    }
2501
2502    fn append_only_introspection_tx(
2503        &self,
2504        type_: IntrospectionType,
2505    ) -> mpsc::UnboundedSender<(
2506        Vec<AppendOnlyUpdate>,
2507        oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2508    )> {
2509        let id = self.introspection_ids[&type_];
2510        self.collection_manager.append_only_write_sender(id)
2511    }
2512
2513    fn differential_introspection_tx(
2514        &self,
2515        type_: IntrospectionType,
2516    ) -> mpsc::UnboundedSender<(
2517        StorageWriteOp,
2518        oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2519    )> {
2520        let id = self.introspection_ids[&type_];
2521        self.collection_manager.differential_write_sender(id)
2522    }
2523
2524    async fn real_time_recent_timestamp(
2525        &self,
2526        timestamp_objects: BTreeSet<GlobalId>,
2527        timeout: Duration,
2528    ) -> Result<
2529        BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>,
2530        StorageError<Self::Timestamp>,
2531    > {
2532        use mz_storage_types::sources::GenericSourceConnection;
2533
2534        let mut rtr_futures = BTreeMap::new();
2535
2536        // Only user sources can be read from w/ RTR.
2537        for id in timestamp_objects.into_iter().filter(GlobalId::is_user) {
2538            let collection = match self.collection(id) {
2539                Ok(c) => c,
2540                // Not a storage item, which we accept.
2541                Err(_) => continue,
2542            };
2543
2544            let (source_conn, remap_id) = match &collection.data_source {
2545                DataSource::Ingestion(IngestionDescription {
2546                    desc: SourceDesc { connection, .. },
2547                    remap_collection_id,
2548                    ..
2549                }) => match connection {
2550                    GenericSourceConnection::Kafka(_)
2551                    | GenericSourceConnection::Postgres(_)
2552                    | GenericSourceConnection::MySql(_)
2553                    | GenericSourceConnection::SqlServer(_) => {
2554                        (connection.clone(), *remap_collection_id)
2555                    }
2556
2557                    // These internal sources do not yet (and might never)
2558                    // support RTR. However, erroring if they're selected from
2559                    // poses an annoying user experience, so instead just skip
2560                    // over them.
2561                    GenericSourceConnection::LoadGenerator(_) => continue,
2562                },
2563                // Skip over all other objects
2564                _ => {
2565                    continue;
2566                }
2567            };
2568
2569            // Prepare for getting the external system's frontier.
2570            let config = self.config().clone();
2571
2572            // Determine the remap collection we plan to read from.
2573            //
2574            // Note that the process of reading from the remap shard is the same
2575            // as other areas in this code that do the same thing, but we inline
2576            // it here because we must prove that we have not taken ownership of
2577            // `self` to move the stream of data from the remap shard into a
2578            // future.
2579            let read_handle = self.read_handle_for_snapshot(remap_id).await?;
2580
2581            // Have to acquire a read hold to prevent the since from advancing
2582            // while we read.
2583            let remap_read_hold = self
2584                .storage_collections
2585                .acquire_read_holds(vec![remap_id])
2586                .map_err(|_e| StorageError::ReadBeforeSince(remap_id))?
2587                .expect_element(|| "known to be exactly one");
2588
2589            let remap_as_of = remap_read_hold
2590                .since()
2591                .to_owned()
2592                .into_option()
2593                .ok_or(StorageError::ReadBeforeSince(remap_id))?;
2594
2595            rtr_futures.insert(
2596                id,
2597                tokio::time::timeout(timeout, async move {
2598                    use mz_storage_types::sources::SourceConnection as _;
2599
2600                    // Fetch the remap shard's contents; we must do this first so
2601                    // that the `as_of` doesn't change.
2602                    let as_of = Antichain::from_elem(remap_as_of);
2603                    let remap_subscribe = read_handle
2604                        .subscribe(as_of.clone())
2605                        .await
2606                        .map_err(|_| StorageError::ReadBeforeSince(remap_id))?;
2607
2608                    tracing::debug!(?id, type_ = source_conn.name(), upstream = ?source_conn.external_reference(), "fetching real time recency");
2609
2610                    let result = rtr::real_time_recency_ts(source_conn, id, config, as_of, remap_subscribe)
2611                        .await.map_err(|e| {
2612                            tracing::debug!(?id, "real time recency error: {:?}", e);
2613                            e
2614                        });
2615
2616                    // Drop once we have read succesfully.
2617                    drop(remap_read_hold);
2618
2619                    result
2620                }),
2621            );
2622        }
2623
2624        Ok(Box::pin(async move {
2625            let (ids, futs): (Vec<_>, Vec<_>) = rtr_futures.into_iter().unzip();
2626            ids.into_iter()
2627                .zip_eq(futures::future::join_all(futs).await)
2628                .try_fold(T::minimum(), |curr, (id, per_source_res)| {
2629                    let new =
2630                        per_source_res.map_err(|_e: Elapsed| StorageError::RtrTimeout(id))??;
2631                    Ok::<_, StorageError<Self::Timestamp>>(std::cmp::max(curr, new))
2632                })
2633        }))
2634    }
2635}
2636
2637/// Seed [`StorageTxn`] with any state required to instantiate a
2638/// [`StorageController`].
2639///
2640/// This cannot be a member of [`StorageController`] because it cannot take a
2641/// `self` parameter.
2642///
2643pub fn prepare_initialization<T>(txn: &mut dyn StorageTxn<T>) -> Result<(), StorageError<T>> {
2644    if txn.get_txn_wal_shard().is_none() {
2645        let txns_id = ShardId::new();
2646        txn.write_txn_wal_shard(txns_id)?;
2647    }
2648
2649    Ok(())
2650}
2651
2652impl<T> Controller<T>
2653where
2654    T: Timestamp
2655        + Lattice
2656        + TotalOrder
2657        + Codec64
2658        + From<EpochMillis>
2659        + TimestampManipulation
2660        + Into<Datum<'static>>,
2661    StorageCommand<T>: RustType<ProtoStorageCommand>,
2662    StorageResponse<T>: RustType<ProtoStorageResponse>,
2663    Self: StorageController<Timestamp = T>,
2664{
2665    /// Create a new storage controller from a client it should wrap.
2666    ///
2667    /// Note that when creating a new storage controller, you must also
2668    /// reconcile it with the previous state.
2669    ///
2670    /// # Panics
2671    /// If this function is called before [`prepare_initialization`].
2672    pub async fn new(
2673        build_info: &'static BuildInfo,
2674        persist_location: PersistLocation,
2675        persist_clients: Arc<PersistClientCache>,
2676        now: NowFn,
2677        wallclock_lag: WallclockLagFn<T>,
2678        txns_metrics: Arc<TxnMetrics>,
2679        read_only: bool,
2680        metrics_registry: &MetricsRegistry,
2681        controller_metrics: ControllerMetrics,
2682        connection_context: ConnectionContext,
2683        txn: &dyn StorageTxn<T>,
2684        storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
2685    ) -> Self {
2686        let txns_client = persist_clients
2687            .open(persist_location.clone())
2688            .await
2689            .expect("location should be valid");
2690
2691        let persist_warm_task = warm_persist_state_in_background(
2692            txns_client.clone(),
2693            txn.get_collection_metadata().into_values(),
2694        );
2695        let persist_warm_task = Some(persist_warm_task.abort_on_drop());
2696
2697        // This value must be already installed because we must ensure it's
2698        // durably recorded before it is used, otherwise we risk leaking persist
2699        // state.
2700        let txns_id = txn
2701            .get_txn_wal_shard()
2702            .expect("must call prepare initialization before creating storage controller");
2703
2704        let persist_table_worker = if read_only {
2705            let txns_write = txns_client
2706                .open_writer(
2707                    txns_id,
2708                    Arc::new(TxnsCodecRow::desc()),
2709                    Arc::new(UnitSchema),
2710                    Diagnostics {
2711                        shard_name: "txns".to_owned(),
2712                        handle_purpose: "follow txns upper".to_owned(),
2713                    },
2714                )
2715                .await
2716                .expect("txns schema shouldn't change");
2717            persist_handles::PersistTableWriteWorker::new_read_only_mode(txns_write)
2718        } else {
2719            let txns = TxnsHandle::open(
2720                T::minimum(),
2721                txns_client.clone(),
2722                txns_client.dyncfgs().clone(),
2723                Arc::clone(&txns_metrics),
2724                txns_id,
2725            )
2726            .await;
2727            persist_handles::PersistTableWriteWorker::new_txns(txns)
2728        };
2729        let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
2730
2731        let collection_manager = collection_mgmt::CollectionManager::new(read_only, now.clone());
2732
2733        let introspection_ids = BTreeMap::new();
2734        let introspection_tokens = Arc::new(Mutex::new(BTreeMap::new()));
2735
2736        let (statistics_interval_sender, _) =
2737            channel(mz_storage_types::parameters::STATISTICS_INTERVAL_DEFAULT);
2738
2739        let (pending_table_handle_drops_tx, pending_table_handle_drops_rx) =
2740            tokio::sync::mpsc::unbounded_channel();
2741
2742        let mut maintenance_ticker = tokio::time::interval(Duration::from_secs(1));
2743        maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2744
2745        let (instance_response_tx, instance_response_rx) = mpsc::unbounded_channel();
2746
2747        let metrics = StorageControllerMetrics::new(metrics_registry, controller_metrics);
2748
2749        let now_dt = mz_ore::now::to_datetime(now());
2750
2751        Self {
2752            build_info,
2753            collections: BTreeMap::default(),
2754            dropped_objects: Default::default(),
2755            persist_table_worker,
2756            txns_read,
2757            txns_metrics,
2758            stashed_responses: vec![],
2759            pending_table_handle_drops_tx,
2760            pending_table_handle_drops_rx,
2761            pending_oneshot_ingestions: BTreeMap::default(),
2762            collection_manager,
2763            introspection_ids,
2764            introspection_tokens,
2765            now,
2766            read_only,
2767            source_statistics: Arc::new(Mutex::new(statistics::SourceStatistics {
2768                source_statistics: BTreeMap::new(),
2769                webhook_statistics: BTreeMap::new(),
2770            })),
2771            sink_statistics: Arc::new(Mutex::new(BTreeMap::new())),
2772            statistics_interval_sender,
2773            instances: BTreeMap::new(),
2774            initialized: false,
2775            config: StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs()),
2776            persist_location,
2777            persist: persist_clients,
2778            metrics,
2779            recorded_frontiers: BTreeMap::new(),
2780            recorded_replica_frontiers: BTreeMap::new(),
2781            wallclock_lag,
2782            wallclock_lag_last_recorded: now_dt,
2783            storage_collections,
2784            migrated_storage_collections: BTreeSet::new(),
2785            maintenance_ticker,
2786            maintenance_scheduled: false,
2787            instance_response_rx,
2788            instance_response_tx,
2789            persist_warm_task,
2790        }
2791    }
2792
2793    // This is different from `set_read_policies`, which is for external users.
2794    // This method is for setting the policy that the controller uses when
2795    // maintaining the read holds that it has for collections/exports at the
2796    // StorageCollections.
2797    //
2798    // This is really only used when dropping things, where we set the
2799    // ReadPolicy to the empty Antichain.
2800    #[instrument(level = "debug")]
2801    fn set_hold_policies(&mut self, policies: Vec<(GlobalId, ReadPolicy<T>)>) {
2802        let mut read_capability_changes = BTreeMap::default();
2803
2804        for (id, policy) in policies.into_iter() {
2805            if let Some(collection) = self.collections.get_mut(&id) {
2806                let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state
2807                {
2808                    CollectionStateExtra::Ingestion(ingestion) => (
2809                        ingestion.write_frontier.borrow(),
2810                        &mut ingestion.derived_since,
2811                        &mut ingestion.hold_policy,
2812                    ),
2813                    CollectionStateExtra::None => {
2814                        unreachable!("set_hold_policies is only called for ingestions");
2815                    }
2816                    CollectionStateExtra::Export(export) => (
2817                        export.write_frontier.borrow(),
2818                        &mut export.derived_since,
2819                        &mut export.read_policy,
2820                    ),
2821                };
2822
2823                let new_derived_since = policy.frontier(write_frontier);
2824                let mut update = swap_updates(derived_since, new_derived_since);
2825                if !update.is_empty() {
2826                    read_capability_changes.insert(id, update);
2827                }
2828
2829                *hold_policy = policy;
2830            }
2831        }
2832
2833        if !read_capability_changes.is_empty() {
2834            self.update_hold_capabilities(&mut read_capability_changes);
2835        }
2836    }
2837
2838    #[instrument(level = "debug", fields(updates))]
2839    fn update_write_frontier(&mut self, id: GlobalId, new_upper: &Antichain<T>) {
2840        let mut read_capability_changes = BTreeMap::default();
2841
2842        if let Some(collection) = self.collections.get_mut(&id) {
2843            let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state {
2844                CollectionStateExtra::Ingestion(ingestion) => (
2845                    &mut ingestion.write_frontier,
2846                    &mut ingestion.derived_since,
2847                    &ingestion.hold_policy,
2848                ),
2849                CollectionStateExtra::None => {
2850                    if matches!(collection.data_source, DataSource::Progress) {
2851                        // We do get these, but can't do anything with it!
2852                    } else {
2853                        tracing::error!(
2854                            ?collection,
2855                            ?new_upper,
2856                            "updated write frontier for collection which is not an ingestion"
2857                        );
2858                    }
2859                    return;
2860                }
2861                CollectionStateExtra::Export(export) => (
2862                    &mut export.write_frontier,
2863                    &mut export.derived_since,
2864                    &export.read_policy,
2865                ),
2866            };
2867
2868            if PartialOrder::less_than(write_frontier, new_upper) {
2869                write_frontier.clone_from(new_upper);
2870            }
2871
2872            let new_derived_since = hold_policy.frontier(write_frontier.borrow());
2873            let mut update = swap_updates(derived_since, new_derived_since);
2874            if !update.is_empty() {
2875                read_capability_changes.insert(id, update);
2876            }
2877        } else if self.dropped_objects.contains_key(&id) {
2878            // We dropped an object but might still get updates from cluster
2879            // side, before it notices the drop. This is expected and fine.
2880        } else {
2881            soft_panic_or_log!("spurious upper update for {id}: {new_upper:?}");
2882        }
2883
2884        if !read_capability_changes.is_empty() {
2885            self.update_hold_capabilities(&mut read_capability_changes);
2886        }
2887    }
2888
2889    // This is different from `update_read_capabilities`, which is for external users.
2890    // This method is for maintaining the read holds that the controller has at
2891    // the StorageCollections, for storage dependencies.
2892    #[instrument(level = "debug", fields(updates))]
2893    fn update_hold_capabilities(&mut self, updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>) {
2894        // Location to record consequences that we need to act on.
2895        let mut collections_net = BTreeMap::new();
2896
2897        // We must not rely on any specific relative ordering of `GlobalId`s.
2898        // That said, it is reasonable to assume that collections generally have
2899        // greater IDs than their dependencies, so starting with the largest is
2900        // a useful optimization.
2901        while let Some(key) = updates.keys().rev().next().cloned() {
2902            let mut update = updates.remove(&key).unwrap();
2903
2904            if key.is_user() {
2905                debug!(id = %key, ?update, "update_hold_capability");
2906            }
2907
2908            if let Some(collection) = self.collections.get_mut(&key) {
2909                match &mut collection.extra_state {
2910                    CollectionStateExtra::Ingestion(ingestion) => {
2911                        let changes = ingestion.read_capabilities.update_iter(update.drain());
2912                        update.extend(changes);
2913
2914                        let (changes, frontier, _cluster_id) =
2915                            collections_net.entry(key).or_insert_with(|| {
2916                                (
2917                                    <ChangeBatch<_>>::new(),
2918                                    Antichain::new(),
2919                                    ingestion.instance_id,
2920                                )
2921                            });
2922
2923                        changes.extend(update.drain());
2924                        *frontier = ingestion.read_capabilities.frontier().to_owned();
2925                    }
2926                    CollectionStateExtra::None => {
2927                        // WIP: See if this ever panics in ci.
2928                        soft_panic_or_log!(
2929                            "trying to update holds for collection {collection:?} which is not \
2930                             an ingestion: {update:?}"
2931                        );
2932                        continue;
2933                    }
2934                    CollectionStateExtra::Export(export) => {
2935                        let changes = export.read_capabilities.update_iter(update.drain());
2936                        update.extend(changes);
2937
2938                        let (changes, frontier, _cluster_id) =
2939                            collections_net.entry(key).or_insert_with(|| {
2940                                (<ChangeBatch<_>>::new(), Antichain::new(), export.cluster_id)
2941                            });
2942
2943                        changes.extend(update.drain());
2944                        *frontier = export.read_capabilities.frontier().to_owned();
2945                    }
2946                }
2947            } else {
2948                // This is confusing and we should probably error.
2949                tracing::warn!(id = ?key, ?update, "update_hold_capabilities for unknown object");
2950            }
2951        }
2952
2953        // Translate our net compute actions into `AllowCompaction` commands and
2954        // downgrade persist sinces.
2955        for (key, (mut changes, frontier, cluster_id)) in collections_net {
2956            if !changes.is_empty() {
2957                if key.is_user() {
2958                    debug!(id = %key, ?frontier, "downgrading ingestion read holds!");
2959                }
2960
2961                let collection = self
2962                    .collections
2963                    .get_mut(&key)
2964                    .expect("missing collection state");
2965
2966                let read_holds = match &mut collection.extra_state {
2967                    CollectionStateExtra::Ingestion(ingestion) => {
2968                        ingestion.dependency_read_holds.as_mut_slice()
2969                    }
2970                    CollectionStateExtra::Export(export) => export.read_holds.as_mut_slice(),
2971                    CollectionStateExtra::None => {
2972                        soft_panic_or_log!(
2973                            "trying to downgrade read holds for collection which is not an \
2974                             ingestion: {collection:?}"
2975                        );
2976                        continue;
2977                    }
2978                };
2979
2980                for read_hold in read_holds.iter_mut() {
2981                    read_hold
2982                        .try_downgrade(frontier.clone())
2983                        .expect("we only advance the frontier");
2984                }
2985
2986                // Send AllowCompaction command directly to the instance
2987                if let Some(instance) = self.instances.get_mut(&cluster_id) {
2988                    instance.send(StorageCommand::AllowCompaction(key, frontier.clone()));
2989                } else {
2990                    soft_panic_or_log!(
2991                        "missing instance client for cluster {cluster_id} while we still have outstanding AllowCompaction command {frontier:?} for {key}"
2992                    );
2993                }
2994            }
2995        }
2996    }
2997
2998    /// Validate that a collection exists for all identifiers, and error if any do not.
2999    fn validate_collection_ids(
3000        &self,
3001        ids: impl Iterator<Item = GlobalId>,
3002    ) -> Result<(), StorageError<T>> {
3003        for id in ids {
3004            self.storage_collections.check_exists(id)?;
3005        }
3006        Ok(())
3007    }
3008
3009    /// Validate that a collection exists for all identifiers, and error if any do not.
3010    fn validate_export_ids(
3011        &self,
3012        ids: impl Iterator<Item = GlobalId>,
3013    ) -> Result<(), StorageError<T>> {
3014        for id in ids {
3015            self.export(id)?;
3016        }
3017        Ok(())
3018    }
3019
3020    /// Opens a write and critical since handles for the given `shard`.
3021    ///
3022    /// `since` is an optional `since` that the read handle will be forwarded to if it is less than
3023    /// its current since.
3024    ///
3025    /// This will `halt!` the process if we cannot successfully acquire a critical handle with our
3026    /// current epoch.
3027    async fn open_data_handles(
3028        &self,
3029        id: &GlobalId,
3030        shard: ShardId,
3031        relation_desc: RelationDesc,
3032        persist_client: &PersistClient,
3033    ) -> WriteHandle<SourceData, (), T, StorageDiff> {
3034        let diagnostics = Diagnostics {
3035            shard_name: id.to_string(),
3036            handle_purpose: format!("controller data for {}", id),
3037        };
3038
3039        let mut write = persist_client
3040            .open_writer(
3041                shard,
3042                Arc::new(relation_desc),
3043                Arc::new(UnitSchema),
3044                diagnostics.clone(),
3045            )
3046            .await
3047            .expect("invalid persist usage");
3048
3049        // N.B.
3050        // Fetch the most recent upper for the write handle. Otherwise, this may be behind
3051        // the since of the since handle. Its vital this happens AFTER we create
3052        // the since handle as it needs to be linearized with that operation. It may be true
3053        // that creating the write handle after the since handle already ensures this, but we
3054        // do this out of an abundance of caution.
3055        //
3056        // Note that this returns the upper, but also sets it on the handle to be fetched later.
3057        write.fetch_recent_upper().await;
3058
3059        write
3060    }
3061
3062    /// Registers the given introspection collection and does any preparatory
3063    /// work that we have to do before we start writing to it. This
3064    /// preparatory work will include partial truncation or other cleanup
3065    /// schemes, depending on introspection type.
3066    fn register_introspection_collection(
3067        &mut self,
3068        id: GlobalId,
3069        introspection_type: IntrospectionType,
3070        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
3071        persist_client: PersistClient,
3072    ) -> Result<(), StorageError<T>> {
3073        tracing::info!(%id, ?introspection_type, "registering introspection collection");
3074
3075        // In read-only mode we create a new shard for all migrated storage collections. So we
3076        // "trick" the write task into thinking that it's not in read-only mode so something is
3077        // advancing this new shard.
3078        let force_writable = self.read_only && self.migrated_storage_collections.contains(&id);
3079        if force_writable {
3080            assert!(id.is_system(), "unexpected non-system global id: {id:?}");
3081            info!("writing to migrated storage collection {id} in read-only mode");
3082        }
3083
3084        let prev = self.introspection_ids.insert(introspection_type, id);
3085        assert!(
3086            prev.is_none(),
3087            "cannot have multiple IDs for introspection type"
3088        );
3089
3090        let metadata = self.storage_collections.collection_metadata(id)?.clone();
3091
3092        let read_handle_fn = move || {
3093            let persist_client = persist_client.clone();
3094            let metadata = metadata.clone();
3095
3096            let fut = async move {
3097                let read_handle = persist_client
3098                    .open_leased_reader::<SourceData, (), T, StorageDiff>(
3099                        metadata.data_shard,
3100                        Arc::new(metadata.relation_desc.clone()),
3101                        Arc::new(UnitSchema),
3102                        Diagnostics {
3103                            shard_name: id.to_string(),
3104                            handle_purpose: format!("snapshot {}", id),
3105                        },
3106                        USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3107                    )
3108                    .await
3109                    .expect("invalid persist usage");
3110                read_handle
3111            };
3112
3113            fut.boxed()
3114        };
3115
3116        let recent_upper = write_handle.shared_upper();
3117
3118        match CollectionManagerKind::from(&introspection_type) {
3119            // For these, we first register the collection and then prepare it,
3120            // because the code that prepares differential collection expects to
3121            // be able to update desired state via the collection manager
3122            // already.
3123            CollectionManagerKind::Differential => {
3124                let statistics_retention_duration =
3125                    dyncfgs::STATISTICS_RETENTION_DURATION.get(self.config().config_set());
3126
3127                // These do a shallow copy.
3128                let introspection_config = DifferentialIntrospectionConfig {
3129                    recent_upper,
3130                    introspection_type,
3131                    storage_collections: Arc::clone(&self.storage_collections),
3132                    collection_manager: self.collection_manager.clone(),
3133                    source_statistics: Arc::clone(&self.source_statistics),
3134                    sink_statistics: Arc::clone(&self.sink_statistics),
3135                    statistics_interval: self.config.parameters.statistics_interval.clone(),
3136                    statistics_interval_receiver: self.statistics_interval_sender.subscribe(),
3137                    statistics_retention_duration,
3138                    metrics: self.metrics.clone(),
3139                    introspection_tokens: Arc::clone(&self.introspection_tokens),
3140                };
3141                self.collection_manager.register_differential_collection(
3142                    id,
3143                    write_handle,
3144                    read_handle_fn,
3145                    force_writable,
3146                    introspection_config,
3147                );
3148            }
3149            // For these, we first have to prepare and then register with
3150            // collection manager, because the preparation logic wants to read
3151            // the shard's contents and then do uncontested writes.
3152            //
3153            // TODO(aljoscha): We should make the truncation/cleanup work that
3154            // happens when we take over instead be a periodic thing, and make
3155            // it resilient to the upper moving concurrently.
3156            CollectionManagerKind::AppendOnly => {
3157                let introspection_config = AppendOnlyIntrospectionConfig {
3158                    introspection_type,
3159                    config_set: Arc::clone(self.config.config_set()),
3160                    parameters: self.config.parameters.clone(),
3161                    storage_collections: Arc::clone(&self.storage_collections),
3162                };
3163                self.collection_manager.register_append_only_collection(
3164                    id,
3165                    write_handle,
3166                    force_writable,
3167                    Some(introspection_config),
3168                );
3169            }
3170        }
3171
3172        Ok(())
3173    }
3174
3175    /// Remove statistics for sources/sinks that were dropped but still have statistics rows
3176    /// hanging around.
3177    fn reconcile_dangling_statistics(&self) {
3178        self.source_statistics
3179            .lock()
3180            .expect("poisoned")
3181            .source_statistics
3182            // collections should also contain subsources.
3183            .retain(|(k, _replica_id), _| self.storage_collections.check_exists(*k).is_ok());
3184        self.sink_statistics
3185            .lock()
3186            .expect("poisoned")
3187            .retain(|(k, _replica_id), _| self.export(*k).is_ok());
3188    }
3189
3190    /// Appends a new global ID, shard ID pair to the appropriate collection.
3191    /// Use a `diff` of 1 to append a new entry; -1 to retract an existing
3192    /// entry.
3193    ///
3194    /// # Panics
3195    /// - If `self.collections` does not have an entry for `global_id`.
3196    /// - If `IntrospectionType::ShardMapping`'s `GlobalId` is not registered as
3197    ///   a managed collection.
3198    /// - If diff is any value other than `1` or `-1`.
3199    #[instrument(level = "debug")]
3200    fn append_shard_mappings<I>(&self, global_ids: I, diff: Diff)
3201    where
3202        I: Iterator<Item = GlobalId>,
3203    {
3204        mz_ore::soft_assert_or_log!(
3205            diff == Diff::MINUS_ONE || diff == Diff::ONE,
3206            "use 1 for insert or -1 for delete"
3207        );
3208
3209        let id = *self
3210            .introspection_ids
3211            .get(&IntrospectionType::ShardMapping)
3212            .expect("should be registered before this call");
3213
3214        let mut updates = vec![];
3215        // Pack updates into rows
3216        let mut row_buf = Row::default();
3217
3218        for global_id in global_ids {
3219            let shard_id = if let Some(collection) = self.collections.get(&global_id) {
3220                collection.collection_metadata.data_shard.clone()
3221            } else {
3222                panic!("unknown global id: {}", global_id);
3223            };
3224
3225            let mut packer = row_buf.packer();
3226            packer.push(Datum::from(global_id.to_string().as_str()));
3227            packer.push(Datum::from(shard_id.to_string().as_str()));
3228            updates.push((row_buf.clone(), diff));
3229        }
3230
3231        self.collection_manager.differential_append(id, updates);
3232    }
3233
3234    /// Determines and returns this collection's dependencies, if any.
3235    fn determine_collection_dependencies(
3236        &self,
3237        self_id: GlobalId,
3238        data_source: &DataSource<T>,
3239    ) -> Result<Vec<GlobalId>, StorageError<T>> {
3240        let dependency = match &data_source {
3241            DataSource::Introspection(_)
3242            | DataSource::Webhook
3243            | DataSource::Table { primary: None }
3244            | DataSource::Progress
3245            | DataSource::Other => vec![],
3246            DataSource::Table {
3247                primary: Some(primary),
3248            } => vec![*primary],
3249            DataSource::IngestionExport { ingestion_id, .. } => {
3250                // Ingestion exports depend on their primary source's remap
3251                // collection.
3252                let source_collection = self.collection(*ingestion_id)?;
3253                let ingestion_remap_collection_id = match &source_collection.data_source {
3254                    DataSource::Ingestion(ingestion) => ingestion.remap_collection_id,
3255                    _ => unreachable!(
3256                        "SourceExport must only refer to primary sources that already exist"
3257                    ),
3258                };
3259
3260                // Ingestion exports (aka. subsources) must make sure that 1)
3261                // their own collection's since stays one step behind the upper,
3262                // and, 2) that the remap shard's since stays one step behind
3263                // their upper. Hence they track themselves and the remap shard
3264                // as dependencies.
3265                vec![self_id, ingestion_remap_collection_id]
3266            }
3267            // Ingestions depend on their remap collection.
3268            DataSource::Ingestion(ingestion) => {
3269                // Ingestions must make sure that 1) their own collection's
3270                // since stays one step behind the upper, and, 2) that the remap
3271                // shard's since stays one step behind their upper. Hence they
3272                // track themselves and the remap shard as dependencies.
3273                vec![self_id, ingestion.remap_collection_id]
3274            }
3275            DataSource::Sink { desc } => {
3276                // Sinks hold back their own frontier and the frontier of their input.
3277                vec![self_id, desc.sink.from]
3278            }
3279        };
3280
3281        Ok(dependency)
3282    }
3283
3284    async fn read_handle_for_snapshot(
3285        &self,
3286        id: GlobalId,
3287    ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
3288        let metadata = self.storage_collections.collection_metadata(id)?;
3289        read_handle_for_snapshot(&self.persist, id, &metadata).await
3290    }
3291
3292    /// Handles writing of status updates for sources/sinks to the appropriate
3293    /// status relation
3294    fn record_status_updates(&mut self, updates: Vec<StatusUpdate>) {
3295        if self.read_only {
3296            return;
3297        }
3298
3299        let mut sink_status_updates = vec![];
3300        let mut source_status_updates = vec![];
3301
3302        for update in updates {
3303            let id = update.id;
3304            if self.export(id).is_ok() {
3305                sink_status_updates.push(update);
3306            } else if self.storage_collections.check_exists(id).is_ok() {
3307                source_status_updates.push(update);
3308            }
3309        }
3310
3311        self.append_status_introspection_updates(
3312            IntrospectionType::SourceStatusHistory,
3313            source_status_updates,
3314        );
3315        self.append_status_introspection_updates(
3316            IntrospectionType::SinkStatusHistory,
3317            sink_status_updates,
3318        );
3319    }
3320
3321    fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, StorageError<T>> {
3322        self.collections
3323            .get(&id)
3324            .ok_or(StorageError::IdentifierMissing(id))
3325    }
3326
3327    /// Runs the identified ingestion using the current definition of the
3328    /// ingestion in-memory.
3329    fn run_ingestion(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3330        tracing::info!(%id, "starting ingestion");
3331
3332        let collection = self.collection(id)?;
3333        let ingestion_description = match &collection.data_source {
3334            DataSource::Ingestion(i) => i.clone(),
3335            _ => {
3336                tracing::warn!("run_ingestion called on non-ingestion ID {}", id);
3337                Err(StorageError::IdentifierInvalid(id))?
3338            }
3339        };
3340
3341        // Enrich all of the exports with their metadata
3342        let mut source_exports = BTreeMap::new();
3343        for (
3344            export_id,
3345            SourceExport {
3346                storage_metadata: (),
3347                details,
3348                data_config,
3349            },
3350        ) in ingestion_description.source_exports.clone()
3351        {
3352            let export_storage_metadata = self.collection(export_id)?.collection_metadata.clone();
3353            source_exports.insert(
3354                export_id,
3355                SourceExport {
3356                    storage_metadata: export_storage_metadata,
3357                    details,
3358                    data_config,
3359                },
3360            );
3361        }
3362
3363        let description = IngestionDescription::<CollectionMetadata> {
3364            source_exports,
3365            // The ingestion metadata is simply the collection metadata of the collection with
3366            // the associated ingestion
3367            ingestion_metadata: collection.collection_metadata.clone(),
3368            // The rest of the fields are identical
3369            desc: ingestion_description.desc.clone(),
3370            instance_id: ingestion_description.instance_id,
3371            remap_collection_id: ingestion_description.remap_collection_id,
3372        };
3373
3374        let storage_instance_id = description.instance_id;
3375        // Fetch the client for this ingestion's instance.
3376        let instance = self
3377            .instances
3378            .get_mut(&storage_instance_id)
3379            .ok_or_else(|| StorageError::IngestionInstanceMissing {
3380                storage_instance_id,
3381                ingestion_id: id,
3382            })?;
3383
3384        let augmented_ingestion = Box::new(RunIngestionCommand { id, description });
3385        instance.send(StorageCommand::RunIngestion(augmented_ingestion));
3386
3387        Ok(())
3388    }
3389
3390    /// Runs the identified export using the current definition of the export
3391    /// that we have in memory.
3392    fn run_export(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3393        let DataSource::Sink { desc: description } = &self.collections[&id].data_source else {
3394            return Err(StorageError::IdentifierMissing(id));
3395        };
3396
3397        let from_storage_metadata = self
3398            .storage_collections
3399            .collection_metadata(description.sink.from)?;
3400        let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
3401
3402        // Choose an as-of frontier for this execution of the sink. If the write frontier of the sink
3403        // is strictly larger than its read hold, it must have at least written out its snapshot, and we can skip
3404        // reading it; otherwise assume we may have to replay from the beginning.
3405        let enable_snapshot_frontier =
3406            dyncfgs::STORAGE_SINK_SNAPSHOT_FRONTIER.get(self.config().config_set());
3407        let export_state = self.storage_collections.collection_frontiers(id)?;
3408        let mut as_of = description.sink.as_of.clone();
3409        as_of.join_assign(&export_state.implied_capability);
3410        let with_snapshot = if enable_snapshot_frontier
3411            && PartialOrder::less_than(&as_of, &export_state.write_frontier)
3412        {
3413            false
3414        } else {
3415            description.sink.with_snapshot
3416        };
3417
3418        info!(
3419            sink_id = %id,
3420            from_id = %description.sink.from,
3421            write_frontier = ?export_state.write_frontier,
3422            ?as_of,
3423            ?with_snapshot,
3424            "run_export"
3425        );
3426
3427        let cmd = RunSinkCommand {
3428            id,
3429            description: StorageSinkDesc {
3430                from: description.sink.from,
3431                from_desc: description.sink.from_desc.clone(),
3432                connection: description.sink.connection.clone(),
3433                envelope: description.sink.envelope,
3434                as_of,
3435                version: description.sink.version,
3436                from_storage_metadata,
3437                with_snapshot,
3438                to_storage_metadata,
3439            },
3440        };
3441
3442        let storage_instance_id = description.instance_id.clone();
3443
3444        let instance = self
3445            .instances
3446            .get_mut(&storage_instance_id)
3447            .ok_or_else(|| StorageError::ExportInstanceMissing {
3448                storage_instance_id,
3449                export_id: id,
3450            })?;
3451
3452        instance.send(StorageCommand::RunSink(Box::new(cmd)));
3453
3454        Ok(())
3455    }
3456
3457    /// Update introspection with the current frontiers of storage objects.
3458    ///
3459    /// This method is invoked by `Controller::maintain`, which we expect to be called once per
3460    /// second during normal operation.
3461    fn update_frontier_introspection(&mut self) {
3462        let mut global_frontiers = BTreeMap::new();
3463        let mut replica_frontiers = BTreeMap::new();
3464
3465        for collection_frontiers in self.storage_collections.active_collection_frontiers() {
3466            let id = collection_frontiers.id;
3467            let since = collection_frontiers.read_capabilities;
3468            let upper = collection_frontiers.write_frontier;
3469
3470            let instance = self
3471                .collections
3472                .get(&id)
3473                .and_then(|collection_state| match &collection_state.extra_state {
3474                    CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
3475                    CollectionStateExtra::Export(export) => Some(export.cluster_id()),
3476                    CollectionStateExtra::None => None,
3477                })
3478                .and_then(|i| self.instances.get(&i));
3479
3480            if let Some(instance) = instance {
3481                for replica_id in instance.replica_ids() {
3482                    replica_frontiers.insert((id, replica_id), upper.clone());
3483                }
3484            }
3485
3486            global_frontiers.insert(id, (since, upper));
3487        }
3488
3489        let mut global_updates = Vec::new();
3490        let mut replica_updates = Vec::new();
3491
3492        let mut push_global_update =
3493            |id: GlobalId, (since, upper): (Antichain<T>, Antichain<T>), diff: Diff| {
3494                let read_frontier = since.into_option().map_or(Datum::Null, |t| t.into());
3495                let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3496                let row = Row::pack_slice(&[
3497                    Datum::String(&id.to_string()),
3498                    read_frontier,
3499                    write_frontier,
3500                ]);
3501                global_updates.push((row, diff));
3502            };
3503
3504        let mut push_replica_update =
3505            |(id, replica_id): (GlobalId, ReplicaId), upper: Antichain<T>, diff: Diff| {
3506                let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3507                let row = Row::pack_slice(&[
3508                    Datum::String(&id.to_string()),
3509                    Datum::String(&replica_id.to_string()),
3510                    write_frontier,
3511                ]);
3512                replica_updates.push((row, diff));
3513            };
3514
3515        let mut old_global_frontiers =
3516            std::mem::replace(&mut self.recorded_frontiers, global_frontiers);
3517        for (&id, new) in &self.recorded_frontiers {
3518            match old_global_frontiers.remove(&id) {
3519                Some(old) if &old != new => {
3520                    push_global_update(id, new.clone(), Diff::ONE);
3521                    push_global_update(id, old, Diff::MINUS_ONE);
3522                }
3523                Some(_) => (),
3524                None => push_global_update(id, new.clone(), Diff::ONE),
3525            }
3526        }
3527        for (id, old) in old_global_frontiers {
3528            push_global_update(id, old, Diff::MINUS_ONE);
3529        }
3530
3531        let mut old_replica_frontiers =
3532            std::mem::replace(&mut self.recorded_replica_frontiers, replica_frontiers);
3533        for (&key, new) in &self.recorded_replica_frontiers {
3534            match old_replica_frontiers.remove(&key) {
3535                Some(old) if &old != new => {
3536                    push_replica_update(key, new.clone(), Diff::ONE);
3537                    push_replica_update(key, old, Diff::MINUS_ONE);
3538                }
3539                Some(_) => (),
3540                None => push_replica_update(key, new.clone(), Diff::ONE),
3541            }
3542        }
3543        for (key, old) in old_replica_frontiers {
3544            push_replica_update(key, old, Diff::MINUS_ONE);
3545        }
3546
3547        let id = self.introspection_ids[&IntrospectionType::Frontiers];
3548        self.collection_manager
3549            .differential_append(id, global_updates);
3550
3551        let id = self.introspection_ids[&IntrospectionType::ReplicaFrontiers];
3552        self.collection_manager
3553            .differential_append(id, replica_updates);
3554    }
3555
3556    /// Refresh the wallclock lag introspection and metrics with the current lag values.
3557    ///
3558    /// This method produces wallclock lag metrics of two different shapes:
3559    ///
3560    /// * Histories: For each replica and each collection, we measure the lag of the write frontier
3561    ///   behind the wallclock time every second. Every minute we emit the maximum lag observed
3562    ///   over the last minute, together with the current time.
3563    /// * Histograms: For each collection, we measure the lag of the write frontier behind
3564    ///   wallclock time every second. Every minute we emit all lags observed over the last minute,
3565    ///   together with the current histogram period.
3566    ///
3567    /// Histories are emitted to both Mz introspection and Prometheus, histograms only to
3568    /// introspection. We treat lags of unreadable collections (i.e. collections that contain no
3569    /// readable times) as undefined and set them to NULL in introspection and `u64::MAX` in
3570    /// Prometheus.
3571    ///
3572    /// This method is invoked by `Controller::maintain`, which we expect to be called once per
3573    /// second during normal operation.
3574    fn refresh_wallclock_lag(&mut self) {
3575        let now_ms = (self.now)();
3576        let histogram_period =
3577            WallclockLagHistogramPeriod::from_epoch_millis(now_ms, self.config.config_set());
3578
3579        let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
3580            Some(ts) => (self.wallclock_lag)(ts.clone()),
3581            None => Duration::ZERO,
3582        };
3583
3584        for frontiers in self.storage_collections.active_collection_frontiers() {
3585            let id = frontiers.id;
3586            let Some(collection) = self.collections.get_mut(&id) else {
3587                continue;
3588            };
3589
3590            let collection_unreadable =
3591                PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
3592            let lag = if collection_unreadable {
3593                WallclockLag::Undefined
3594            } else {
3595                let lag = frontier_lag(&frontiers.write_frontier);
3596                WallclockLag::Seconds(lag.as_secs())
3597            };
3598
3599            collection.wallclock_lag_max = collection.wallclock_lag_max.max(lag);
3600
3601            // No way to specify values as undefined in Prometheus metrics, so we use the
3602            // maximum value instead.
3603            let secs = lag.unwrap_seconds_or(u64::MAX);
3604            collection.wallclock_lag_metrics.observe(secs);
3605
3606            if !ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION.get(self.config.config_set()) {
3607                continue;
3608            }
3609
3610            if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
3611                let bucket = lag.map_seconds(|secs| secs.next_power_of_two());
3612
3613                let instance_id = match &collection.extra_state {
3614                    CollectionStateExtra::Ingestion(i) => Some(i.instance_id),
3615                    CollectionStateExtra::Export(e) => Some(e.cluster_id()),
3616                    CollectionStateExtra::None => None,
3617                };
3618                let workload_class = instance_id
3619                    .and_then(|id| self.instances.get(&id))
3620                    .and_then(|i| i.workload_class.clone());
3621                let labels = match workload_class {
3622                    Some(wc) => [("workload_class", wc.clone())].into(),
3623                    None => BTreeMap::new(),
3624                };
3625
3626                let key = (histogram_period, bucket, labels);
3627                *stash.entry(key).or_default() += Diff::ONE;
3628            }
3629        }
3630
3631        // Record lags to persist, if it's time.
3632        self.maybe_record_wallclock_lag();
3633    }
3634
3635    /// Produce new wallclock lag introspection updates, provided enough time has passed since the
3636    /// last recording.
3637    ///
3638    /// We emit new introspection updates if the system time has passed into a new multiple of the
3639    /// recording interval (typically 1 minute) since the last refresh. The compute controller uses
3640    /// the same approach, ensuring that both controllers commit their lags at roughly the same
3641    /// time, avoiding confusion caused by inconsistencies.
3642    fn maybe_record_wallclock_lag(&mut self) {
3643        if self.read_only {
3644            return;
3645        }
3646
3647        let duration_trunc = |datetime: DateTime<_>, interval| {
3648            let td = TimeDelta::from_std(interval).ok()?;
3649            datetime.duration_trunc(td).ok()
3650        };
3651
3652        let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(self.config.config_set());
3653        let now_dt = mz_ore::now::to_datetime((self.now)());
3654        let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
3655            soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
3656            let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
3657            duration_trunc(now_dt, *default).unwrap()
3658        });
3659        if now_trunc <= self.wallclock_lag_last_recorded {
3660            return;
3661        }
3662
3663        let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
3664
3665        let mut history_updates = Vec::new();
3666        let mut histogram_updates = Vec::new();
3667        let mut row_buf = Row::default();
3668        for frontiers in self.storage_collections.active_collection_frontiers() {
3669            let id = frontiers.id;
3670            let Some(collection) = self.collections.get_mut(&id) else {
3671                continue;
3672            };
3673
3674            let max_lag = std::mem::replace(&mut collection.wallclock_lag_max, WallclockLag::MIN);
3675            let row = Row::pack_slice(&[
3676                Datum::String(&id.to_string()),
3677                Datum::Null,
3678                max_lag.into_interval_datum(),
3679                Datum::TimestampTz(now_ts),
3680            ]);
3681            history_updates.push((row, Diff::ONE));
3682
3683            let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
3684                continue;
3685            };
3686
3687            for ((period, lag, labels), count) in std::mem::take(stash) {
3688                let mut packer = row_buf.packer();
3689                packer.extend([
3690                    Datum::TimestampTz(period.start),
3691                    Datum::TimestampTz(period.end),
3692                    Datum::String(&id.to_string()),
3693                    lag.into_uint64_datum(),
3694                ]);
3695                let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
3696                packer.push_dict(labels);
3697
3698                histogram_updates.push((row_buf.clone(), count));
3699            }
3700        }
3701
3702        if !history_updates.is_empty() {
3703            self.append_introspection_updates(
3704                IntrospectionType::WallclockLagHistory,
3705                history_updates,
3706            );
3707        }
3708        if !histogram_updates.is_empty() {
3709            self.append_introspection_updates(
3710                IntrospectionType::WallclockLagHistogram,
3711                histogram_updates,
3712            );
3713        }
3714
3715        self.wallclock_lag_last_recorded = now_trunc;
3716    }
3717
3718    /// Run periodic tasks.
3719    ///
3720    /// This method is invoked roughly once per second during normal operation. It is a good place
3721    /// for tasks that need to run periodically, such as state cleanup or updating of metrics.
3722    fn maintain(&mut self) {
3723        self.update_frontier_introspection();
3724        self.refresh_wallclock_lag();
3725
3726        // Perform instance maintenance work.
3727        for instance in self.instances.values_mut() {
3728            instance.refresh_state_metrics();
3729        }
3730    }
3731}
3732
3733impl From<&IntrospectionType> for CollectionManagerKind {
3734    fn from(value: &IntrospectionType) -> Self {
3735        match value {
3736            IntrospectionType::ShardMapping
3737            | IntrospectionType::Frontiers
3738            | IntrospectionType::ReplicaFrontiers
3739            | IntrospectionType::StorageSourceStatistics
3740            | IntrospectionType::StorageSinkStatistics
3741            | IntrospectionType::ComputeDependencies
3742            | IntrospectionType::ComputeOperatorHydrationStatus
3743            | IntrospectionType::ComputeMaterializedViewRefreshes
3744            | IntrospectionType::ComputeErrorCounts
3745            | IntrospectionType::ComputeHydrationTimes => CollectionManagerKind::Differential,
3746
3747            IntrospectionType::SourceStatusHistory
3748            | IntrospectionType::SinkStatusHistory
3749            | IntrospectionType::PrivatelinkConnectionStatusHistory
3750            | IntrospectionType::ReplicaStatusHistory
3751            | IntrospectionType::ReplicaMetricsHistory
3752            | IntrospectionType::WallclockLagHistory
3753            | IntrospectionType::WallclockLagHistogram
3754            | IntrospectionType::PreparedStatementHistory
3755            | IntrospectionType::StatementExecutionHistory
3756            | IntrospectionType::SessionHistory
3757            | IntrospectionType::StatementLifecycleHistory
3758            | IntrospectionType::SqlText => CollectionManagerKind::AppendOnly,
3759        }
3760    }
3761}
3762
3763/// Get the current rows in the given statistics table. This is used to bootstrap
3764/// the statistics tasks.
3765///
3766// TODO(guswynn): we need to be more careful about the update time we get here:
3767// <https://github.com/MaterializeInc/database-issues/issues/7564>
3768async fn snapshot_statistics<T>(
3769    id: GlobalId,
3770    upper: Antichain<T>,
3771    storage_collections: &Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
3772) -> Vec<Row>
3773where
3774    T: Codec64 + From<EpochMillis> + TimestampManipulation,
3775{
3776    match upper.as_option() {
3777        Some(f) if f > &T::minimum() => {
3778            let as_of = f.step_back().unwrap();
3779
3780            let snapshot = storage_collections.snapshot(id, as_of).await.unwrap();
3781            snapshot
3782                .into_iter()
3783                .map(|(row, diff)| {
3784                    assert_eq!(diff, 1);
3785                    row
3786                })
3787                .collect()
3788        }
3789        // If collection is closed or the frontier is the minimum, we cannot
3790        // or don't need to truncate (respectively).
3791        _ => Vec::new(),
3792    }
3793}
3794
3795async fn read_handle_for_snapshot<T>(
3796    persist: &PersistClientCache,
3797    id: GlobalId,
3798    metadata: &CollectionMetadata,
3799) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>>
3800where
3801    T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
3802{
3803    let persist_client = persist
3804        .open(metadata.persist_location.clone())
3805        .await
3806        .unwrap();
3807
3808    // We create a new read handle every time someone requests a snapshot and then immediately
3809    // expire it instead of keeping a read handle permanently in our state to avoid having it
3810    // heartbeat continously. The assumption is that calls to snapshot are rare and therefore
3811    // worth it to always create a new handle.
3812    let read_handle = persist_client
3813        .open_leased_reader::<SourceData, (), _, _>(
3814            metadata.data_shard,
3815            Arc::new(metadata.relation_desc.clone()),
3816            Arc::new(UnitSchema),
3817            Diagnostics {
3818                shard_name: id.to_string(),
3819                handle_purpose: format!("snapshot {}", id),
3820            },
3821            USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3822        )
3823        .await
3824        .expect("invalid persist usage");
3825    Ok(read_handle)
3826}
3827
3828/// State maintained about individual collections.
3829#[derive(Debug)]
3830struct CollectionState<T: TimelyTimestamp> {
3831    /// The source of this collection's data.
3832    pub data_source: DataSource<T>,
3833
3834    pub collection_metadata: CollectionMetadata,
3835
3836    pub extra_state: CollectionStateExtra<T>,
3837
3838    /// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update.
3839    wallclock_lag_max: WallclockLag,
3840    /// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
3841    /// introspection update.
3842    ///
3843    /// Keys are `(period, lag, labels)` triples, values are counts.
3844    ///
3845    /// If this is `None`, wallclock lag is not tracked for this collection.
3846    wallclock_lag_histogram_stash: Option<
3847        BTreeMap<
3848            (
3849                WallclockLagHistogramPeriod,
3850                WallclockLag,
3851                BTreeMap<&'static str, String>,
3852            ),
3853            Diff,
3854        >,
3855    >,
3856    /// Frontier wallclock lag metrics tracked for this collection.
3857    wallclock_lag_metrics: WallclockLagMetrics,
3858}
3859
3860impl<T: TimelyTimestamp> CollectionState<T> {
3861    fn new(
3862        data_source: DataSource<T>,
3863        collection_metadata: CollectionMetadata,
3864        extra_state: CollectionStateExtra<T>,
3865        wallclock_lag_metrics: WallclockLagMetrics,
3866    ) -> Self {
3867        // Only collect wallclock lag histogram data for collections written by storage, to avoid
3868        // duplicate measurements. Collections written by other components (e.g. compute) have
3869        // their wallclock lags recorded by these components.
3870        let wallclock_lag_histogram_stash = match &data_source {
3871            DataSource::Other => None,
3872            _ => Some(Default::default()),
3873        };
3874
3875        Self {
3876            data_source,
3877            collection_metadata,
3878            extra_state,
3879            wallclock_lag_max: WallclockLag::MIN,
3880            wallclock_lag_histogram_stash,
3881            wallclock_lag_metrics,
3882        }
3883    }
3884}
3885
3886/// Additional state that the controller maintains for select collection types.
3887#[derive(Debug)]
3888enum CollectionStateExtra<T: TimelyTimestamp> {
3889    Ingestion(IngestionState<T>),
3890    Export(ExportState<T>),
3891    None,
3892}
3893
3894/// State maintained about ingestions and ingestion exports
3895#[derive(Debug)]
3896struct IngestionState<T: TimelyTimestamp> {
3897    /// Really only for keeping track of changes to the `derived_since`.
3898    pub read_capabilities: MutableAntichain<T>,
3899
3900    /// The current since frontier, derived from `write_frontier` using
3901    /// `hold_policy`.
3902    pub derived_since: Antichain<T>,
3903
3904    /// Holds that this ingestion (or ingestion export) has on its dependencies.
3905    pub dependency_read_holds: Vec<ReadHold<T>>,
3906
3907    /// Reported write frontier.
3908    pub write_frontier: Antichain<T>,
3909
3910    /// The policy that drives how we downgrade our read hold. That is how we
3911    /// derive our since from our upper.
3912    ///
3913    /// This is a _storage-controller-internal_ policy used to derive its
3914    /// personal read hold on the collection. It should not be confused with any
3915    /// read policies that the adapter might install at [StorageCollections].
3916    pub hold_policy: ReadPolicy<T>,
3917
3918    /// The ID of the instance in which the ingestion is running.
3919    pub instance_id: StorageInstanceId,
3920
3921    /// Set of replica IDs on which this ingestion is hydrated.
3922    pub hydrated_on: BTreeSet<ReplicaId>,
3923}
3924
3925/// A description of a status history collection.
3926///
3927/// Used to inform partial truncation, see
3928/// [`collection_mgmt::partially_truncate_status_history`].
3929struct StatusHistoryDesc<K> {
3930    retention_policy: StatusHistoryRetentionPolicy,
3931    extract_key: Box<dyn Fn(&[Datum]) -> K + Send>,
3932    extract_time: Box<dyn Fn(&[Datum]) -> CheckedTimestamp<DateTime<Utc>> + Send>,
3933}
3934enum StatusHistoryRetentionPolicy {
3935    // Truncates everything but the last N updates for each key.
3936    LastN(usize),
3937    // Truncates everything past the time window for each key.
3938    TimeWindow(Duration),
3939}
3940
3941fn source_status_history_desc(
3942    params: &StorageParameters,
3943) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3944    let desc = &MZ_SOURCE_STATUS_HISTORY_DESC;
3945    let (source_id_idx, _) = desc.get_by_name(&"source_id".into()).expect("exists");
3946    let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3947    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3948
3949    StatusHistoryDesc {
3950        retention_policy: StatusHistoryRetentionPolicy::LastN(
3951            params.keep_n_source_status_history_entries,
3952        ),
3953        extract_key: Box::new(move |datums| {
3954            (
3955                GlobalId::from_str(datums[source_id_idx].unwrap_str()).expect("GlobalId column"),
3956                if datums[replica_id_idx].is_null() {
3957                    None
3958                } else {
3959                    Some(
3960                        ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3961                            .expect("ReplicaId column"),
3962                    )
3963                },
3964            )
3965        }),
3966        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3967    }
3968}
3969
3970fn sink_status_history_desc(
3971    params: &StorageParameters,
3972) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3973    let desc = &MZ_SINK_STATUS_HISTORY_DESC;
3974    let (sink_id_idx, _) = desc.get_by_name(&"sink_id".into()).expect("exists");
3975    let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3976    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3977
3978    StatusHistoryDesc {
3979        retention_policy: StatusHistoryRetentionPolicy::LastN(
3980            params.keep_n_sink_status_history_entries,
3981        ),
3982        extract_key: Box::new(move |datums| {
3983            (
3984                GlobalId::from_str(datums[sink_id_idx].unwrap_str()).expect("GlobalId column"),
3985                if datums[replica_id_idx].is_null() {
3986                    None
3987                } else {
3988                    Some(
3989                        ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3990                            .expect("ReplicaId column"),
3991                    )
3992                },
3993            )
3994        }),
3995        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3996    }
3997}
3998
3999fn privatelink_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<GlobalId> {
4000    let desc = &MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC;
4001    let (key_idx, _) = desc.get_by_name(&"connection_id".into()).expect("exists");
4002    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4003
4004    StatusHistoryDesc {
4005        retention_policy: StatusHistoryRetentionPolicy::LastN(
4006            params.keep_n_privatelink_status_history_entries,
4007        ),
4008        extract_key: Box::new(move |datums| {
4009            GlobalId::from_str(datums[key_idx].unwrap_str()).expect("GlobalId column")
4010        }),
4011        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4012    }
4013}
4014
4015fn replica_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<(GlobalId, u64)> {
4016    let desc = &REPLICA_STATUS_HISTORY_DESC;
4017    let (replica_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
4018    let (process_idx, _) = desc.get_by_name(&"process_id".into()).expect("exists");
4019    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4020
4021    StatusHistoryDesc {
4022        retention_policy: StatusHistoryRetentionPolicy::TimeWindow(
4023            params.replica_status_history_retention_window,
4024        ),
4025        extract_key: Box::new(move |datums| {
4026            (
4027                GlobalId::from_str(datums[replica_idx].unwrap_str()).expect("GlobalId column"),
4028                datums[process_idx].unwrap_uint64(),
4029            )
4030        }),
4031        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4032    }
4033}
4034
4035/// Replace one antichain with another, tracking the overall changes in the returned `ChangeBatch`.
4036fn swap_updates<T: Timestamp>(
4037    from: &mut Antichain<T>,
4038    mut replace_with: Antichain<T>,
4039) -> ChangeBatch<T> {
4040    let mut update = ChangeBatch::new();
4041    if PartialOrder::less_equal(from, &replace_with) {
4042        update.extend(replace_with.iter().map(|time| (time.clone(), 1)));
4043        std::mem::swap(from, &mut replace_with);
4044        update.extend(replace_with.iter().map(|time| (time.clone(), -1)));
4045    }
4046    update
4047}