mz_storage_controller/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of the storage controller trait.
11
12use std::any::Any;
13use std::collections::btree_map;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt::{Debug, Display};
16use std::str::FromStr;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use async_trait::async_trait;
21use chrono::{DateTime, DurationRound, TimeDelta, Utc};
22use derivative::Derivative;
23use differential_dataflow::lattice::Lattice;
24use futures::FutureExt;
25use futures::StreamExt;
26use itertools::Itertools;
27use mz_build_info::BuildInfo;
28use mz_cluster_client::client::ClusterReplicaLocation;
29use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
30use mz_cluster_client::{ReplicaId, WallclockLagFn};
31use mz_controller_types::dyncfgs::{
32    ENABLE_0DT_DEPLOYMENT_SOURCES, ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION,
33    WALLCLOCK_LAG_RECORDING_INTERVAL,
34};
35use mz_ore::collections::CollectionExt;
36use mz_ore::metrics::MetricsRegistry;
37use mz_ore::now::{EpochMillis, NowFn};
38use mz_ore::task::AbortOnDropHandle;
39use mz_ore::{assert_none, halt, instrument, soft_panic_or_log};
40use mz_persist_client::batch::ProtoBatch;
41use mz_persist_client::cache::PersistClientCache;
42use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
43use mz_persist_client::read::ReadHandle;
44use mz_persist_client::schema::CaESchema;
45use mz_persist_client::write::WriteHandle;
46use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
47use mz_persist_types::Codec64;
48use mz_persist_types::codec_impls::UnitSchema;
49use mz_proto::RustType;
50use mz_repr::adt::timestamp::CheckedTimestamp;
51use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
52use mz_storage_client::client::{
53    AppendOnlyUpdate, ProtoStorageCommand, ProtoStorageResponse, RunIngestionCommand,
54    RunOneshotIngestion, RunSinkCommand, Status, StatusUpdate, StorageCommand, StorageResponse,
55    TableData,
56};
57use mz_storage_client::controller::{
58    BoxFuture, CollectionDescription, DataSource, ExportDescription, ExportState,
59    IntrospectionType, MonotonicAppender, PersistEpoch, Response, StorageController,
60    StorageMetadata, StorageTxn, StorageWriteOp, WallclockLag, WallclockLagHistogramPeriod,
61};
62use mz_storage_client::healthcheck::{
63    MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC,
64    MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_STATUS_HISTORY_DESC,
65};
66use mz_storage_client::metrics::StorageControllerMetrics;
67use mz_storage_client::statistics::{
68    ControllerSinkStatistics, ControllerSourceStatistics, WebhookStatistics,
69};
70use mz_storage_client::storage_collections::StorageCollections;
71use mz_storage_types::configuration::StorageConfiguration;
72use mz_storage_types::connections::ConnectionContext;
73use mz_storage_types::connections::inline::InlinedConnection;
74use mz_storage_types::controller::{AlterError, CollectionMetadata, StorageError, TxnsCodecRow};
75use mz_storage_types::instances::StorageInstanceId;
76use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
77use mz_storage_types::parameters::StorageParameters;
78use mz_storage_types::read_holds::ReadHold;
79use mz_storage_types::read_policy::ReadPolicy;
80use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
81use mz_storage_types::sources::{
82    GenericSourceConnection, IngestionDescription, SourceConnection, SourceData, SourceDesc,
83    SourceExport, SourceExportDataConfig,
84};
85use mz_storage_types::{AlterCompatible, StorageDiff, dyncfgs};
86use mz_txn_wal::metrics::Metrics as TxnMetrics;
87use mz_txn_wal::txn_read::TxnsRead;
88use mz_txn_wal::txns::TxnsHandle;
89use timely::order::{PartialOrder, TotalOrder};
90use timely::progress::Timestamp as TimelyTimestamp;
91use timely::progress::frontier::MutableAntichain;
92use timely::progress::{Antichain, ChangeBatch, Timestamp};
93use tokio::sync::watch::{Sender, channel};
94use tokio::sync::{mpsc, oneshot};
95use tokio::time::MissedTickBehavior;
96use tokio::time::error::Elapsed;
97use tracing::{debug, info, warn};
98
99use crate::collection_mgmt::{
100    AppendOnlyIntrospectionConfig, CollectionManagerKind, DifferentialIntrospectionConfig,
101};
102use crate::instance::{Instance, ReplicaConfig};
103
104mod collection_mgmt;
105mod history;
106mod instance;
107mod persist_handles;
108mod rtr;
109mod statistics;
110
111#[derive(Derivative)]
112#[derivative(Debug)]
113struct PendingOneshotIngestion {
114    /// Callback used to provide results of the ingestion.
115    #[derivative(Debug = "ignore")]
116    result_tx: OneshotResultCallback<ProtoBatch>,
117    /// Cluster currently running this ingestion
118    cluster_id: StorageInstanceId,
119}
120
121impl PendingOneshotIngestion {
122    /// Consume the pending ingestion, responding with a cancelation message.
123    ///
124    /// TODO(cf2): Refine these error messages so they're not stringly typed.
125    pub(crate) fn cancel(self) {
126        (self.result_tx)(vec![Err("canceled".to_string())])
127    }
128}
129
130/// A storage controller for a storage instance.
131#[derive(Derivative)]
132#[derivative(Debug)]
133pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation>
134{
135    /// The build information for this process.
136    build_info: &'static BuildInfo,
137    /// A function that returns the current time.
138    now: NowFn,
139
140    /// Whether or not this controller is in read-only mode.
141    ///
142    /// When in read-only mode, neither this controller nor the instances
143    /// controlled by it are allowed to affect changes to external systems
144    /// (largely persist).
145    read_only: bool,
146
147    /// Collections maintained by the storage controller.
148    ///
149    /// This collection only grows, although individual collections may be rendered unusable.
150    /// This is to prevent the re-binding of identifiers to other descriptions.
151    pub(crate) collections: BTreeMap<GlobalId, CollectionState<T>>,
152
153    /// Map from IDs of objects that have been dropped to replicas we are still
154    /// expecting DroppedId messages from. This is cleared out once all replicas
155    /// have responded.
156    ///
157    /// We use this only to catch problems in the protocol between controller
158    /// and replicas, for example we can differentiate between late messages for
159    /// objects that have already been dropped and unexpected (read erroneous)
160    /// messages from the replica.
161    dropped_objects: BTreeMap<GlobalId, BTreeSet<ReplicaId>>,
162
163    /// Write handle for table shards.
164    pub(crate) persist_table_worker: persist_handles::PersistTableWriteWorker<T>,
165    /// A shared TxnsCache running in a task and communicated with over a channel.
166    txns_read: TxnsRead<T>,
167    txns_metrics: Arc<TxnMetrics>,
168    stashed_responses: Vec<(Option<ReplicaId>, StorageResponse<T>)>,
169    /// Channel for sending table handle drops.
170    #[derivative(Debug = "ignore")]
171    pending_table_handle_drops_tx: mpsc::UnboundedSender<GlobalId>,
172    /// Channel for receiving table handle drops.
173    #[derivative(Debug = "ignore")]
174    pending_table_handle_drops_rx: mpsc::UnboundedReceiver<GlobalId>,
175    /// Closures that can be used to send responses from oneshot ingestions.
176    #[derivative(Debug = "ignore")]
177    pending_oneshot_ingestions: BTreeMap<uuid::Uuid, PendingOneshotIngestion>,
178
179    /// Interface for managed collections
180    pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
181
182    /// Tracks which collection is responsible for which [`IntrospectionType`].
183    pub(crate) introspection_ids: BTreeMap<IntrospectionType, GlobalId>,
184    /// Tokens for tasks that drive updating introspection collections. Dropping
185    /// this will make sure that any tasks (or other resources) will stop when
186    /// needed.
187    // TODO(aljoscha): Should these live somewhere else?
188    introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
189
190    // The following two fields must always be locked in order.
191    /// Consolidated metrics updates to periodically write. We do not eagerly initialize this,
192    /// and its contents are entirely driven by `StorageResponse::StatisticsUpdates`'s, as well
193    /// as webhook statistics.
194    source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
195    /// Consolidated metrics updates to periodically write. We do not eagerly initialize this,
196    /// and its contents are entirely driven by `StorageResponse::StatisticsUpdates`'s.
197    sink_statistics: Arc<Mutex<BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSinkStatistics>>>,
198    /// A way to update the statistics interval in the statistics tasks.
199    statistics_interval_sender: Sender<Duration>,
200
201    /// Clients for all known storage instances.
202    instances: BTreeMap<StorageInstanceId, Instance<T>>,
203    /// Set to `true` once `initialization_complete` has been called.
204    initialized: bool,
205    /// Storage configuration to apply to newly provisioned instances, and use during purification.
206    config: StorageConfiguration,
207    /// The persist location where all storage collections are being written to
208    persist_location: PersistLocation,
209    /// A persist client used to write to storage collections
210    persist: Arc<PersistClientCache>,
211    /// Metrics of the Storage controller
212    metrics: StorageControllerMetrics,
213    /// `(read, write)` frontiers that have been recorded in the `Frontiers` collection, kept to be
214    /// able to retract old rows.
215    recorded_frontiers: BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)>,
216    /// Write frontiers that have been recorded in the `ReplicaFrontiers` collection, kept to be
217    /// able to retract old rows.
218    recorded_replica_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<T>>,
219
220    /// A function that computes the lag between the given time and wallclock time.
221    #[derivative(Debug = "ignore")]
222    wallclock_lag: WallclockLagFn<T>,
223    /// The last time wallclock lag introspection was recorded.
224    wallclock_lag_last_recorded: DateTime<Utc>,
225
226    /// Handle to a [StorageCollections].
227    storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
228    /// Migrated storage collections that can be written even in read only mode.
229    migrated_storage_collections: BTreeSet<GlobalId>,
230
231    /// Ticker for scheduling periodic maintenance work.
232    maintenance_ticker: tokio::time::Interval,
233    /// Whether maintenance work was scheduled.
234    maintenance_scheduled: bool,
235
236    /// Shared transmit channel for replicas to send responses.
237    instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
238    /// Receive end for replica responses.
239    instance_response_rx: mpsc::UnboundedReceiver<(Option<ReplicaId>, StorageResponse<T>)>,
240
241    /// Background task run at startup to warm persist state.
242    persist_warm_task: Option<AbortOnDropHandle<Box<dyn Debug + Send>>>,
243}
244
245/// Warm up persist state for `shard_ids` in a background task.
246///
247/// With better parallelism during startup this would likely be unnecessary, but empirically we see
248/// some nice speedups with this relatively simple function.
249fn warm_persist_state_in_background(
250    client: PersistClient,
251    shard_ids: impl Iterator<Item = ShardId> + Send + 'static,
252) -> mz_ore::task::JoinHandle<Box<dyn Debug + Send>> {
253    /// Bound the number of shards that we warm at a single time, to limit our overall resource use.
254    const MAX_CONCURRENT_WARMS: usize = 16;
255    let logic = async move {
256        let fetchers: Vec<_> = tokio_stream::iter(shard_ids)
257            .map(|shard_id| {
258                let client = client.clone();
259                async move {
260                    client
261                        .create_batch_fetcher::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
262                            shard_id,
263                            Arc::new(RelationDesc::empty()),
264                            Arc::new(UnitSchema),
265                            true,
266                            Diagnostics::from_purpose("warm persist load state"),
267                        )
268                        .await
269                }
270            })
271            .buffer_unordered(MAX_CONCURRENT_WARMS)
272            .collect()
273            .await;
274        let fetchers: Box<dyn Debug + Send> = Box::new(fetchers);
275        fetchers
276    };
277    mz_ore::task::spawn(|| "warm_persist_load_state", logic)
278}
279
280#[async_trait(?Send)]
281impl<T> StorageController for Controller<T>
282where
283    T: Timestamp
284        + Lattice
285        + TotalOrder
286        + Codec64
287        + From<EpochMillis>
288        + TimestampManipulation
289        + Into<Datum<'static>>
290        + Display,
291    StorageCommand<T>: RustType<ProtoStorageCommand>,
292    StorageResponse<T>: RustType<ProtoStorageResponse>,
293{
294    type Timestamp = T;
295
296    fn initialization_complete(&mut self) {
297        self.reconcile_dangling_statistics();
298        self.initialized = true;
299
300        for instance in self.instances.values_mut() {
301            instance.send(StorageCommand::InitializationComplete);
302        }
303    }
304
305    fn update_parameters(&mut self, config_params: StorageParameters) {
306        self.storage_collections
307            .update_parameters(config_params.clone());
308
309        // We serialize the dyncfg updates in StorageParameters, but configure
310        // persist separately.
311        self.persist.cfg().apply_from(&config_params.dyncfg_updates);
312
313        for instance in self.instances.values_mut() {
314            let params = Box::new(config_params.clone());
315            instance.send(StorageCommand::UpdateConfiguration(params));
316        }
317        self.config.update(config_params);
318        self.statistics_interval_sender
319            .send_replace(self.config.parameters.statistics_interval);
320        self.collection_manager.update_user_batch_duration(
321            self.config
322                .parameters
323                .user_storage_managed_collections_batch_duration,
324        );
325    }
326
327    /// Get the current configuration
328    fn config(&self) -> &StorageConfiguration {
329        &self.config
330    }
331
332    fn collection_metadata(
333        &self,
334        id: GlobalId,
335    ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>> {
336        self.storage_collections.collection_metadata(id)
337    }
338
339    fn collection_hydrated(
340        &self,
341        collection_id: GlobalId,
342    ) -> Result<bool, StorageError<Self::Timestamp>> {
343        let collection = self.collection(collection_id)?;
344
345        let instance_id = match &collection.data_source {
346            DataSource::Ingestion(ingestion_description) => ingestion_description.instance_id,
347            DataSource::IngestionExport { ingestion_id, .. } => {
348                let ingestion_state = self.collections.get(ingestion_id).expect("known to exist");
349
350                let instance_id = match &ingestion_state.data_source {
351                    DataSource::Ingestion(ingestion_desc) => ingestion_desc.instance_id,
352                    _ => unreachable!("SourceExport must only refer to primary source"),
353                };
354
355                instance_id
356            }
357            _ => return Ok(true),
358        };
359
360        let instance = self.instances.get(&instance_id).ok_or_else(|| {
361            StorageError::IngestionInstanceMissing {
362                storage_instance_id: instance_id,
363                ingestion_id: collection_id,
364            }
365        })?;
366
367        if instance.replica_ids().next().is_none() {
368            // Ingestions on zero-replica clusters are always considered
369            // hydrated.
370            return Ok(true);
371        }
372
373        match &collection.extra_state {
374            CollectionStateExtra::Ingestion(ingestion_state) => {
375                // An ingestion is hydrated if it is hydrated on at least one replica.
376                Ok(ingestion_state.hydrated_on.len() >= 1)
377            }
378            CollectionStateExtra::Export(_) => {
379                // For now, sinks are always considered hydrated. We rely on
380                // them starting up "instantly" and don't wait for them when
381                // checking hydration status of a replica.  TODO(sinks): base
382                // this off of the sink shard's frontier?
383                Ok(true)
384            }
385            CollectionStateExtra::None => {
386                // For now, objects that are not ingestions are always
387                // considered hydrated. This is tables and webhooks, as of
388                // today.
389                Ok(true)
390            }
391        }
392    }
393
394    #[mz_ore::instrument(level = "debug")]
395    fn collections_hydrated_on_replicas(
396        &self,
397        target_replica_ids: Option<Vec<ReplicaId>>,
398        target_cluster_id: &StorageInstanceId,
399        exclude_collections: &BTreeSet<GlobalId>,
400    ) -> Result<bool, StorageError<Self::Timestamp>> {
401        // If an empty set of replicas is provided there can be
402        // no collections on them, we'll count this as hydrated.
403        if target_replica_ids.as_ref().is_some_and(|v| v.is_empty()) {
404            return Ok(true);
405        }
406
407        // If target_replica_ids is provided, use it as the set of target
408        // replicas. Otherwise check for hydration on any replica.
409        let target_replicas: Option<BTreeSet<ReplicaId>> =
410            target_replica_ids.map(|ids| ids.into_iter().collect());
411
412        let mut all_hydrated = true;
413        for (collection_id, collection_state) in self.collections.iter() {
414            if collection_id.is_transient() || exclude_collections.contains(collection_id) {
415                continue;
416            }
417            let hydrated = match &collection_state.extra_state {
418                CollectionStateExtra::Ingestion(state) => {
419                    if &state.instance_id != target_cluster_id {
420                        continue;
421                    }
422                    match &target_replicas {
423                        Some(target_replicas) => !state.hydrated_on.is_disjoint(target_replicas),
424                        None => {
425                            // Not target replicas, so check that it's hydrated
426                            // on at least one replica.
427                            state.hydrated_on.len() >= 1
428                        }
429                    }
430                }
431                CollectionStateExtra::Export(_) => {
432                    // For now, sinks are always considered hydrated. We rely on
433                    // them starting up "instantly" and don't wait for them when
434                    // checking hydration status of a replica.  TODO(sinks):
435                    // base this off of the sink shard's frontier?
436                    true
437                }
438                CollectionStateExtra::None => {
439                    // For now, objects that are not ingestions are always
440                    // considered hydrated. This is tables and webhooks, as of
441                    // today.
442                    true
443                }
444            };
445            if !hydrated {
446                tracing::info!(%collection_id, "collection is not hydrated on any replica");
447                all_hydrated = false;
448                // We continue with our loop instead of breaking out early, so
449                // that we log all non-hydrated replicas.
450            }
451        }
452        Ok(all_hydrated)
453    }
454
455    fn collection_frontiers(
456        &self,
457        id: GlobalId,
458    ) -> Result<
459        (Antichain<Self::Timestamp>, Antichain<Self::Timestamp>),
460        StorageError<Self::Timestamp>,
461    > {
462        let frontiers = self.storage_collections.collection_frontiers(id)?;
463        Ok((frontiers.implied_capability, frontiers.write_frontier))
464    }
465
466    fn collections_frontiers(
467        &self,
468        mut ids: Vec<GlobalId>,
469    ) -> Result<Vec<(GlobalId, Antichain<T>, Antichain<T>)>, StorageError<Self::Timestamp>> {
470        let mut result = vec![];
471        // In theory, we could pull all our frontiers from storage collections...
472        // but in practice those frontiers may not be identical. For historical reasons, we use the
473        // locally-tracked frontier for sinks but the storage-collections-maintained frontier for
474        // sources.
475        ids.retain(|&id| match self.export(id) {
476            Ok(export) => {
477                result.push((
478                    id,
479                    export.input_hold().since().clone(),
480                    export.write_frontier.clone(),
481                ));
482                false
483            }
484            Err(_) => true,
485        });
486        result.extend(
487            self.storage_collections
488                .collections_frontiers(ids)?
489                .into_iter()
490                .map(|frontiers| {
491                    (
492                        frontiers.id,
493                        frontiers.implied_capability,
494                        frontiers.write_frontier,
495                    )
496                }),
497        );
498
499        Ok(result)
500    }
501
502    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
503        self.storage_collections.active_collection_metadatas()
504    }
505
506    fn active_ingestions(
507        &self,
508        instance_id: StorageInstanceId,
509    ) -> Box<dyn Iterator<Item = &GlobalId> + '_> {
510        Box::new(self.instances[&instance_id].active_ingestions())
511    }
512
513    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
514        self.storage_collections.check_exists(id)
515    }
516
517    fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>) {
518        let metrics = self.metrics.for_instance(id);
519        let mut instance = Instance::new(
520            workload_class,
521            metrics,
522            self.now.clone(),
523            self.instance_response_tx.clone(),
524        );
525        if self.initialized {
526            instance.send(StorageCommand::InitializationComplete);
527        }
528        if !self.read_only {
529            instance.send(StorageCommand::AllowWrites);
530        }
531
532        let params = Box::new(self.config.parameters.clone());
533        instance.send(StorageCommand::UpdateConfiguration(params));
534
535        let old_instance = self.instances.insert(id, instance);
536        assert_none!(old_instance, "storage instance {id} already exists");
537    }
538
539    fn drop_instance(&mut self, id: StorageInstanceId) {
540        let instance = self.instances.remove(&id);
541        assert!(instance.is_some(), "storage instance {id} does not exist");
542    }
543
544    fn update_instance_workload_class(
545        &mut self,
546        id: StorageInstanceId,
547        workload_class: Option<String>,
548    ) {
549        let instance = self
550            .instances
551            .get_mut(&id)
552            .unwrap_or_else(|| panic!("instance {id} does not exist"));
553
554        instance.workload_class = workload_class;
555    }
556
557    fn connect_replica(
558        &mut self,
559        instance_id: StorageInstanceId,
560        replica_id: ReplicaId,
561        location: ClusterReplicaLocation,
562        enable_ctp: bool,
563    ) {
564        let instance = self
565            .instances
566            .get_mut(&instance_id)
567            .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
568
569        let config = ReplicaConfig {
570            build_info: self.build_info,
571            location,
572            grpc_client: self.config.parameters.grpc_client.clone(),
573            enable_ctp,
574        };
575        instance.add_replica(replica_id, config);
576    }
577
578    fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId) {
579        let instance = self
580            .instances
581            .get_mut(&instance_id)
582            .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
583
584        let status_now = mz_ore::now::to_datetime((self.now)());
585        let mut source_status_updates = vec![];
586        let mut sink_status_updates = vec![];
587
588        let make_update = |id, object_type| StatusUpdate {
589            id,
590            status: Status::Paused,
591            timestamp: status_now,
592            error: None,
593            hints: BTreeSet::from([format!(
594                "The replica running this {object_type} has been dropped"
595            )]),
596            namespaced_errors: Default::default(),
597            replica_id: Some(replica_id),
598        };
599
600        for id in instance.active_ingestions() {
601            if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
602                active_replicas.remove(&replica_id);
603                if active_replicas.is_empty() {
604                    self.dropped_objects.remove(id);
605                }
606            }
607
608            let ingestion = self
609                .collections
610                .get_mut(id)
611                .expect("instance contains unknown ingestion");
612
613            let ingestion_description = match &ingestion.data_source {
614                DataSource::Ingestion(ingestion_description) => ingestion_description.clone(),
615                _ => panic!(
616                    "unexpected data source for ingestion: {:?}",
617                    ingestion.data_source
618                ),
619            };
620
621            // NOTE(aljoscha): We filter out the remap collection because we
622            // don't get any status updates about it from the replica side. So
623            // we don't want to synthesize a 'paused' status here.
624            //
625            // TODO(aljoscha): I think we want to fix this eventually, and make
626            // sure we get status updates for the remap shard as well. Currently
627            // its handling in the source status collection is a bit difficult
628            // because we don't have updates for it in the status history
629            // collection.
630            let subsource_ids = ingestion_description
631                .collection_ids()
632                .filter(|id| id != &ingestion_description.remap_collection_id);
633            for id in subsource_ids {
634                source_status_updates.push(make_update(id, "source"));
635            }
636        }
637
638        for id in instance.active_exports() {
639            if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
640                active_replicas.remove(&replica_id);
641                if active_replicas.is_empty() {
642                    self.dropped_objects.remove(id);
643                }
644            }
645
646            sink_status_updates.push(make_update(*id, "sink"));
647        }
648
649        instance.drop_replica(replica_id);
650
651        if !self.read_only {
652            if !source_status_updates.is_empty() {
653                self.append_status_introspection_updates(
654                    IntrospectionType::SourceStatusHistory,
655                    source_status_updates,
656                );
657            }
658            if !sink_status_updates.is_empty() {
659                self.append_status_introspection_updates(
660                    IntrospectionType::SinkStatusHistory,
661                    sink_status_updates,
662                );
663            }
664        }
665    }
666
667    async fn evolve_nullability_for_bootstrap(
668        &mut self,
669        storage_metadata: &StorageMetadata,
670        collections: Vec<(GlobalId, RelationDesc)>,
671    ) -> Result<(), StorageError<Self::Timestamp>> {
672        let persist_client = self
673            .persist
674            .open(self.persist_location.clone())
675            .await
676            .unwrap();
677
678        for (global_id, relation_desc) in collections {
679            let shard_id = storage_metadata.get_collection_shard(global_id)?;
680            let diagnostics = Diagnostics {
681                shard_name: global_id.to_string(),
682                handle_purpose: "evolve nullability for bootstrap".to_string(),
683            };
684            let latest_schema = persist_client
685                .latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
686                .await
687                .expect("invalid persist usage");
688            let Some((schema_id, current_schema, _)) = latest_schema else {
689                tracing::debug!(?global_id, "no schema registered");
690                continue;
691            };
692            tracing::debug!(?global_id, ?current_schema, new_schema = ?relation_desc, "migrating schema");
693
694            let diagnostics = Diagnostics {
695                shard_name: global_id.to_string(),
696                handle_purpose: "evolve nullability for bootstrap".to_string(),
697            };
698            let evolve_result = persist_client
699                .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
700                    shard_id,
701                    schema_id,
702                    &relation_desc,
703                    &UnitSchema,
704                    diagnostics,
705                )
706                .await
707                .expect("invalid persist usage");
708            match evolve_result {
709                CaESchema::Ok(_) => (),
710                CaESchema::ExpectedMismatch {
711                    schema_id,
712                    key,
713                    val: _,
714                } => {
715                    return Err(StorageError::PersistSchemaEvolveRace {
716                        global_id,
717                        shard_id,
718                        schema_id,
719                        relation_desc: key,
720                    });
721                }
722                CaESchema::Incompatible => {
723                    return Err(StorageError::PersistInvalidSchemaEvolve {
724                        global_id,
725                        shard_id,
726                    });
727                }
728            };
729        }
730
731        Ok(())
732    }
733
734    /// Create and "execute" the described collection.
735    ///
736    /// "Execute" is in scare quotes because what executing a collection means
737    /// varies widely based on the type of collection you're creating.
738    ///
739    /// The general process creating a collection undergoes is:
740    /// 1. Enrich the description we get from the user with the metadata only
741    ///    the storage controller's metadata. This is mostly a matter of
742    ///    separating concerns.
743    /// 2. Generate write and read persist handles for the collection.
744    /// 3. Store the collection's metadata in the appropriate field.
745    /// 4. "Execute" the collection. What that means is contingent on the type of
746    ///    collection. so consult the code for more details.
747    ///
748    // TODO(aljoscha): It would be swell if we could refactor this Leviathan of
749    // a method/move individual parts to their own methods. @guswynn observes
750    // that a number of these operations could be moved into fns on
751    // `DataSource`.
752    #[instrument(name = "storage::create_collections")]
753    async fn create_collections_for_bootstrap(
754        &mut self,
755        storage_metadata: &StorageMetadata,
756        register_ts: Option<Self::Timestamp>,
757        mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
758        migrated_storage_collections: &BTreeSet<GlobalId>,
759    ) -> Result<(), StorageError<Self::Timestamp>> {
760        self.migrated_storage_collections
761            .extend(migrated_storage_collections.iter().cloned());
762
763        self.storage_collections
764            .create_collections_for_bootstrap(
765                storage_metadata,
766                register_ts.clone(),
767                collections.clone(),
768                migrated_storage_collections,
769            )
770            .await?;
771
772        // At this point we're connected to all the collection shards in persist. Our warming task
773        // is no longer useful, so abort it if it's still running.
774        drop(self.persist_warm_task.take());
775
776        // Validate first, to avoid corrupting state.
777        // 1. create a dropped identifier, or
778        // 2. create an existing identifier with a new description.
779        // Make sure to check for errors within `ingestions` as well.
780        collections.sort_by_key(|(id, _)| *id);
781        collections.dedup();
782        for pos in 1..collections.len() {
783            if collections[pos - 1].0 == collections[pos].0 {
784                return Err(StorageError::CollectionIdReused(collections[pos].0));
785            }
786        }
787
788        // We first enrich each collection description with some additional metadata...
789        let enriched_with_metadata = collections
790            .into_iter()
791            .map(|(id, description)| {
792                let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
793
794                let get_shard = |id| -> Result<ShardId, StorageError<T>> {
795                    let shard = storage_metadata.get_collection_shard::<T>(id)?;
796                    Ok(shard)
797                };
798
799                let remap_shard = match &description.data_source {
800                    // Only ingestions can have remap shards.
801                    DataSource::Ingestion(IngestionDescription {
802                        remap_collection_id,
803                        ..
804                    }) => {
805                        // Iff ingestion has a remap collection, its metadata must
806                        // exist (and be correct) by this point.
807                        Some(get_shard(*remap_collection_id)?)
808                    }
809                    _ => None,
810                };
811
812                // If the shard is being managed by txn-wal (initially, tables), then we need to
813                // pass along the shard id for the txns shard to dataflow rendering.
814                let txns_shard = description
815                    .data_source
816                    .in_txns()
817                    .then(|| *self.txns_read.txns_id());
818
819                let metadata = CollectionMetadata {
820                    persist_location: self.persist_location.clone(),
821                    remap_shard,
822                    data_shard,
823                    relation_desc: description.desc.clone(),
824                    txns_shard,
825                };
826
827                Ok((id, description, metadata))
828            })
829            .collect_vec();
830
831        // So that we can open persist handles for each collections concurrently.
832        let persist_client = self
833            .persist
834            .open(self.persist_location.clone())
835            .await
836            .unwrap();
837        let persist_client = &persist_client;
838
839        // Reborrow the `&mut self` as immutable, as all the concurrent work to be processed in
840        // this stream cannot all have exclusive access.
841        use futures::stream::{StreamExt, TryStreamExt};
842        let this = &*self;
843        let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
844            .map(|data: Result<_, StorageError<Self::Timestamp>>| {
845                async move {
846                    let (id, description, metadata) = data?;
847
848                    // should be replaced with real introspection (https://github.com/MaterializeInc/database-issues/issues/4078)
849                    // but for now, it's helpful to have this mapping written down somewhere
850                    debug!(
851                        "mapping GlobalId={} to remap shard ({:?}), data shard ({})",
852                        id, metadata.remap_shard, metadata.data_shard
853                    );
854
855                    let write = this
856                        .open_data_handles(
857                            &id,
858                            metadata.data_shard,
859                            metadata.relation_desc.clone(),
860                            persist_client,
861                        )
862                        .await;
863
864                    Ok::<_, StorageError<T>>((id, description, write, metadata))
865                }
866            })
867            // Poll each future for each collection concurrently, maximum of 50 at a time.
868            .buffer_unordered(50)
869            // HERE BE DRAGONS:
870            //
871            // There are at least 2 subtleties in using `FuturesUnordered` (which
872            // `buffer_unordered` uses underneath:
873            // - One is captured here <https://github.com/rust-lang/futures-rs/issues/2387>
874            // - And the other is deadlocking if processing an OUTPUT of a `FuturesUnordered`
875            // stream attempts to obtain an async mutex that is also obtained in the futures
876            // being polled.
877            //
878            // Both of these could potentially be issues in all usages of `buffer_unordered` in
879            // this method, so we stick the standard advice: only use `try_collect` or
880            // `collect`!
881            .try_collect()
882            .await?;
883
884        // The set of collections that we should render at the end of this
885        // function.
886        let mut to_execute = BTreeSet::new();
887        // New collections that are being created; this is distinct from the set
888        // of collections we plan to execute because
889        // `DataSource::IngestionExport` is added as a new collection, but is
890        // not executed directly.
891        let mut new_collections = BTreeSet::new();
892        let mut table_registers = Vec::with_capacity(to_register.len());
893
894        // Reorder in dependency order.
895        to_register.sort_by_key(|(id, ..)| *id);
896
897        // Register tables first, but register them in reverse order since earlier tables
898        // can depend on later tables.
899        //
900        // Note: We could do more complex sorting to avoid the allocations, but IMO it's
901        // easier to reason about it this way.
902        let (tables_to_register, collections_to_register): (Vec<_>, Vec<_>) = to_register
903            .into_iter()
904            .partition(|(_id, desc, ..)| matches!(desc.data_source, DataSource::Table { .. }));
905        let to_register = tables_to_register
906            .into_iter()
907            .rev()
908            .chain(collections_to_register.into_iter());
909
910        // Statistics need a level of indirection so we can mutably borrow
911        // `self` when registering collections and when we are inserting
912        // statistics.
913        let mut new_source_statistic_entries = BTreeSet::new();
914        let mut new_webhook_statistic_entries = BTreeSet::new();
915        let mut new_sink_statistic_entries = BTreeSet::new();
916
917        for (id, description, write, metadata) in to_register {
918            let is_in_txns = |id, metadata: &CollectionMetadata| {
919                metadata.txns_shard.is_some()
920                    && !(self.read_only && migrated_storage_collections.contains(&id))
921            };
922
923            let mut data_source = description.data_source;
924
925            to_execute.insert(id);
926            new_collections.insert(id);
927
928            // Ensure that the ingestion has an export for its primary source if applicable.
929            // This is done in an awkward spot to appease the borrow checker.
930            // TODO(database-issues#8620): This will be removed once sources no longer export
931            // to primary collections and only export to explicit SourceExports (tables).
932            if let DataSource::Ingestion(ingestion) = &mut data_source {
933                let export = ingestion.desc.primary_source_export();
934                ingestion.source_exports.insert(id, export);
935            }
936
937            let write_frontier = write.upper();
938
939            // Determine if this collection has another dependency.
940            let storage_dependencies = self.determine_collection_dependencies(id, &data_source)?;
941
942            let dependency_read_holds = self
943                .storage_collections
944                .acquire_read_holds(storage_dependencies)
945                .expect("can acquire read holds");
946
947            let mut dependency_since = Antichain::from_elem(T::minimum());
948            for read_hold in dependency_read_holds.iter() {
949                dependency_since.join_assign(read_hold.since());
950            }
951
952            // Assert some invariants.
953            //
954            // TODO(alter_table): Include Tables (is_in_txns) in this check. After
955            // supporting ALTER TABLE, it's now possible for a table to have a dependency
956            // and thus run this check. But tables are managed by txn-wal and thus the
957            // upper of the shard's `write_handle` generally isn't the logical upper of
958            // the shard. Instead we need to thread through the upper of the `txn` shard
959            // here so we can check this invariant.
960            if !dependency_read_holds.is_empty()
961                && !is_in_txns(id, &metadata)
962                && !matches!(&data_source, DataSource::Sink { .. })
963            {
964                // As the halt message says, this can happen when trying to come
965                // up in read-only mode and the current read-write
966                // environmentd/controller deletes a collection. We exit
967                // gracefully, which means we'll get restarted and get to try
968                // again.
969                if dependency_since.is_empty() {
970                    halt!(
971                        "dependency since frontier is empty while dependent upper \
972                        is not empty (dependent id={id}, write_frontier={:?}, dependency_read_holds={:?}), \
973                        this indicates concurrent deletion of a collection",
974                        write_frontier,
975                        dependency_read_holds,
976                    );
977                }
978
979                // The dependency since cannot be beyond the dependent (our)
980                // upper unless the collection is new. In practice, the
981                // depdenency is the remap shard of a source (export), and if
982                // the since is allowed to "catch up" to the upper, that is
983                // `upper <= since`, a restarting ingestion cannot differentiate
984                // between updates that have already been written out to the
985                // backing persist shard and updates that have yet to be
986                // written. We would write duplicate updates.
987                //
988                // If this check fails, it means that the read hold installed on
989                // the dependency was probably not upheld –– if it were, the
990                // dependency's since could not have advanced as far the
991                // dependent's upper.
992                //
993                // We don't care about the dependency since when the write
994                // frontier is empty. In that case, no-one can write down any
995                // more updates.
996                mz_ore::soft_assert_or_log!(
997                    write_frontier.elements() == &[T::minimum()]
998                        || write_frontier.is_empty()
999                        || PartialOrder::less_than(&dependency_since, write_frontier),
1000                    "dependency since has advanced past dependent ({id}) upper \n
1001                            dependent ({id}): upper {:?} \n
1002                            dependency since {:?} \n
1003                            dependency read holds: {:?}",
1004                    write_frontier,
1005                    dependency_since,
1006                    dependency_read_holds,
1007                );
1008            }
1009
1010            // Perform data source-specific setup.
1011            let mut extra_state = CollectionStateExtra::None;
1012            let mut maybe_instance_id = None;
1013            match &data_source {
1014                DataSource::Introspection(typ) => {
1015                    debug!(
1016                        ?data_source, meta = ?metadata,
1017                        "registering {id} with persist monotonic worker",
1018                    );
1019                    // We always register the collection with the collection manager,
1020                    // regardless of read-only mode. The CollectionManager itself is
1021                    // aware of read-only mode and will not attempt to write before told
1022                    // to do so.
1023                    //
1024                    self.register_introspection_collection(
1025                        id,
1026                        *typ,
1027                        write,
1028                        persist_client.clone(),
1029                    )?;
1030                }
1031                DataSource::Webhook => {
1032                    debug!(
1033                        ?data_source, meta = ?metadata,
1034                        "registering {id} with persist monotonic worker",
1035                    );
1036                    new_source_statistic_entries.insert(id);
1037                    // This collection of statistics is periodically aggregated into
1038                    // `source_statistics`.
1039                    new_webhook_statistic_entries.insert(id);
1040                    // Register the collection so our manager knows about it.
1041                    //
1042                    // NOTE: Maybe this shouldn't be in the collection manager,
1043                    // and collection manager should only be responsible for
1044                    // built-in introspection collections?
1045                    self.collection_manager
1046                        .register_append_only_collection(id, write, false, None);
1047                }
1048                DataSource::IngestionExport {
1049                    ingestion_id,
1050                    details,
1051                    data_config,
1052                } => {
1053                    debug!(
1054                        ?data_source, meta = ?metadata,
1055                        "not registering {id} with a controller persist worker",
1056                    );
1057                    // Adjust the source to contain this export.
1058                    let ingestion_state = self
1059                        .collections
1060                        .get_mut(ingestion_id)
1061                        .expect("known to exist");
1062
1063                    let instance_id = match &mut ingestion_state.data_source {
1064                        DataSource::Ingestion(ingestion_desc) => {
1065                            ingestion_desc.source_exports.insert(
1066                                id,
1067                                SourceExport {
1068                                    storage_metadata: (),
1069                                    details: details.clone(),
1070                                    data_config: data_config.clone(),
1071                                },
1072                            );
1073
1074                            // Record the ingestion's cluster ID for the
1075                            // ingestion export. This way we always have a
1076                            // record of it, even if the ingestion's collection
1077                            // description disappears.
1078                            ingestion_desc.instance_id
1079                        }
1080                        _ => unreachable!(
1081                            "SourceExport must only refer to primary sources that already exist"
1082                        ),
1083                    };
1084
1085                    // Executing the source export doesn't do anything, ensure we execute the source instead.
1086                    to_execute.remove(&id);
1087                    to_execute.insert(*ingestion_id);
1088
1089                    let ingestion_state = IngestionState {
1090                        read_capabilities: MutableAntichain::from(dependency_since.clone()),
1091                        dependency_read_holds,
1092                        derived_since: dependency_since,
1093                        write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1094                        hold_policy: ReadPolicy::step_back(),
1095                        instance_id,
1096                        hydrated_on: BTreeSet::new(),
1097                    };
1098
1099                    extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1100                    maybe_instance_id = Some(instance_id);
1101
1102                    new_source_statistic_entries.insert(id);
1103                }
1104                DataSource::Table { .. } => {
1105                    debug!(
1106                        ?data_source, meta = ?metadata,
1107                        "registering {id} with persist table worker",
1108                    );
1109                    table_registers.push((id, write));
1110                }
1111                DataSource::Progress | DataSource::Other => {
1112                    debug!(
1113                        ?data_source, meta = ?metadata,
1114                        "not registering {id} with a controller persist worker",
1115                    );
1116                }
1117                DataSource::Ingestion(ingestion_desc) => {
1118                    debug!(
1119                        ?data_source, meta = ?metadata,
1120                        "not registering {id} with a controller persist worker",
1121                    );
1122
1123                    let mut dependency_since = Antichain::from_elem(T::minimum());
1124                    for read_hold in dependency_read_holds.iter() {
1125                        dependency_since.join_assign(read_hold.since());
1126                    }
1127
1128                    let ingestion_state = IngestionState {
1129                        read_capabilities: MutableAntichain::from(dependency_since.clone()),
1130                        dependency_read_holds,
1131                        derived_since: dependency_since,
1132                        write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1133                        hold_policy: ReadPolicy::step_back(),
1134                        instance_id: ingestion_desc.instance_id,
1135                        hydrated_on: BTreeSet::new(),
1136                    };
1137
1138                    extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1139                    maybe_instance_id = Some(ingestion_desc.instance_id);
1140
1141                    new_source_statistic_entries.insert(id);
1142                }
1143                DataSource::Sink { desc } => {
1144                    let mut dependency_since = Antichain::from_elem(T::minimum());
1145                    for read_hold in dependency_read_holds.iter() {
1146                        dependency_since.join_assign(read_hold.since());
1147                    }
1148
1149                    let [self_hold, read_hold] =
1150                        dependency_read_holds.try_into().expect("two holds");
1151
1152                    let state = ExportState::new(
1153                        desc.instance_id,
1154                        read_hold,
1155                        self_hold,
1156                        write_frontier.clone(),
1157                        ReadPolicy::step_back(),
1158                    );
1159                    maybe_instance_id = Some(state.cluster_id);
1160                    extra_state = CollectionStateExtra::Export(state);
1161
1162                    new_sink_statistic_entries.insert(id);
1163                }
1164            }
1165
1166            let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(id, maybe_instance_id);
1167            let collection_state =
1168                CollectionState::new(data_source, metadata, extra_state, wallclock_lag_metrics);
1169
1170            self.collections.insert(id, collection_state);
1171        }
1172
1173        {
1174            let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1175
1176            // Webhooks don't run on clusters/replicas, so we initialize their
1177            // statistics collection here.
1178            for id in new_webhook_statistic_entries {
1179                source_statistics.webhook_statistics.entry(id).or_default();
1180            }
1181
1182            // Sources and sinks only have statistics in the collection when
1183            // there is a replica that is reporting them. No need to initialize
1184            // here.
1185        }
1186
1187        // Register the tables all in one batch.
1188        if !table_registers.is_empty() {
1189            let register_ts = register_ts
1190                .expect("caller should have provided a register_ts when creating a table");
1191
1192            if self.read_only {
1193                // In read-only mode, we use a special read-only table worker
1194                // that allows writing to migrated tables and will continually
1195                // bump their shard upper so that it tracks the txn shard upper.
1196                // We do this, so that they remain readable at a recent
1197                // timestamp, which in turn allows dataflows that depend on them
1198                // to (re-)hydrate.
1199                //
1200                // We only want to register migrated tables, though, and leave
1201                // existing tables out/never write to them in read-only mode.
1202                table_registers
1203                    .retain(|(id, _write_handle)| migrated_storage_collections.contains(id));
1204
1205                self.persist_table_worker
1206                    .register(register_ts, table_registers)
1207                    .await
1208                    .expect("table worker unexpectedly shut down");
1209            } else {
1210                self.persist_table_worker
1211                    .register(register_ts, table_registers)
1212                    .await
1213                    .expect("table worker unexpectedly shut down");
1214            }
1215        }
1216
1217        self.append_shard_mappings(new_collections.into_iter(), Diff::ONE);
1218
1219        // TODO(guswynn): perform the io in this final section concurrently.
1220        for id in to_execute {
1221            match &self.collection(id)?.data_source {
1222                DataSource::Ingestion(ingestion) => {
1223                    if !self.read_only
1224                        || (ENABLE_0DT_DEPLOYMENT_SOURCES.get(self.config.config_set())
1225                            && ingestion.desc.connection.supports_read_only())
1226                    {
1227                        self.run_ingestion(id)?;
1228                    }
1229                }
1230                DataSource::IngestionExport { .. } => unreachable!(
1231                    "ingestion exports do not execute directly, but instead schedule their source to be re-executed"
1232                ),
1233                DataSource::Introspection(_)
1234                | DataSource::Webhook
1235                | DataSource::Table { .. }
1236                | DataSource::Progress
1237                | DataSource::Other => {}
1238                DataSource::Sink { .. } => {
1239                    if !self.read_only {
1240                        self.run_export(id)?;
1241                    }
1242                }
1243            };
1244        }
1245
1246        Ok(())
1247    }
1248
1249    fn check_alter_ingestion_source_desc(
1250        &mut self,
1251        ingestion_id: GlobalId,
1252        source_desc: &SourceDesc,
1253    ) -> Result<(), StorageError<Self::Timestamp>> {
1254        let source_collection = self.collection(ingestion_id)?;
1255        let data_source = &source_collection.data_source;
1256        match &data_source {
1257            DataSource::Ingestion(cur_ingestion) => {
1258                cur_ingestion
1259                    .desc
1260                    .alter_compatible(ingestion_id, source_desc)?;
1261            }
1262            o => {
1263                tracing::info!(
1264                    "{ingestion_id} inalterable because its data source is {:?} and not an ingestion",
1265                    o
1266                );
1267                Err(AlterError { id: ingestion_id })?
1268            }
1269        }
1270
1271        Ok(())
1272    }
1273
1274    async fn alter_ingestion_connections(
1275        &mut self,
1276        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
1277    ) -> Result<(), StorageError<Self::Timestamp>> {
1278        // Also have to let StorageCollections know!
1279        self.storage_collections
1280            .alter_ingestion_connections(source_connections.clone())
1281            .await?;
1282
1283        let mut ingestions_to_run = BTreeSet::new();
1284
1285        for (id, conn) in source_connections {
1286            let collection = self
1287                .collections
1288                .get_mut(&id)
1289                .ok_or_else(|| StorageError::IdentifierMissing(id))?;
1290
1291            match &mut collection.data_source {
1292                DataSource::Ingestion(ingestion) => {
1293                    // If the connection hasn't changed, there's no sense in
1294                    // re-rendering the dataflow.
1295                    if ingestion.desc.connection != conn {
1296                        tracing::info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
1297                        ingestion.desc.connection = conn;
1298                        ingestions_to_run.insert(id);
1299                    } else {
1300                        tracing::warn!(
1301                            "update_source_connection called on {id} but the \
1302                            connection was the same"
1303                        );
1304                    }
1305                }
1306                o => {
1307                    tracing::warn!("update_source_connection called on {:?}", o);
1308                    Err(StorageError::IdentifierInvalid(id))?;
1309                }
1310            }
1311        }
1312
1313        for id in ingestions_to_run {
1314            self.run_ingestion(id)?;
1315        }
1316        Ok(())
1317    }
1318
1319    async fn alter_ingestion_export_data_configs(
1320        &mut self,
1321        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
1322    ) -> Result<(), StorageError<Self::Timestamp>> {
1323        // Also have to let StorageCollections know!
1324        self.storage_collections
1325            .alter_ingestion_export_data_configs(source_exports.clone())
1326            .await?;
1327
1328        let mut ingestions_to_run = BTreeSet::new();
1329
1330        for (source_export_id, new_data_config) in source_exports {
1331            // We need to adjust the data config on the CollectionState for
1332            // the source export collection directly
1333            let source_export_collection = self
1334                .collections
1335                .get_mut(&source_export_id)
1336                .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1337            let ingestion_id = match &mut source_export_collection.data_source {
1338                DataSource::IngestionExport {
1339                    ingestion_id,
1340                    details: _,
1341                    data_config,
1342                } => {
1343                    *data_config = new_data_config.clone();
1344                    *ingestion_id
1345                }
1346                o => {
1347                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1348                    Err(StorageError::IdentifierInvalid(source_export_id))?
1349                }
1350            };
1351            // We also need to adjust the data config on the CollectionState of the
1352            // Ingestion that the export is associated with.
1353            let ingestion_collection = self
1354                .collections
1355                .get_mut(&ingestion_id)
1356                .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
1357
1358            match &mut ingestion_collection.data_source {
1359                DataSource::Ingestion(ingestion_desc) => {
1360                    let source_export = ingestion_desc
1361                        .source_exports
1362                        .get_mut(&source_export_id)
1363                        .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1364
1365                    // If the data config hasn't changed, there's no sense in
1366                    // re-rendering the dataflow.
1367                    if source_export.data_config != new_data_config {
1368                        tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
1369                        source_export.data_config = new_data_config;
1370
1371                        ingestions_to_run.insert(ingestion_id);
1372                    } else {
1373                        tracing::warn!(
1374                            "alter_ingestion_export_data_configs called on \
1375                                    export {source_export_id} of {ingestion_id} but \
1376                                    the data config was the same"
1377                        );
1378                    }
1379                }
1380                o => {
1381                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1382                    Err(StorageError::IdentifierInvalid(ingestion_id))?
1383                }
1384            }
1385        }
1386
1387        for id in ingestions_to_run {
1388            self.run_ingestion(id)?;
1389        }
1390        Ok(())
1391    }
1392
1393    async fn alter_table_desc(
1394        &mut self,
1395        existing_collection: GlobalId,
1396        new_collection: GlobalId,
1397        new_desc: RelationDesc,
1398        expected_version: RelationVersion,
1399        register_ts: Self::Timestamp,
1400    ) -> Result<(), StorageError<Self::Timestamp>> {
1401        let data_shard = {
1402            let Controller {
1403                collections,
1404                storage_collections,
1405                ..
1406            } = self;
1407
1408            let existing = collections
1409                .get(&existing_collection)
1410                .ok_or(StorageError::IdentifierMissing(existing_collection))?;
1411            if !matches!(existing.data_source, DataSource::Table { .. }) {
1412                return Err(StorageError::IdentifierInvalid(existing_collection));
1413            }
1414
1415            // Let StorageCollections know!
1416            storage_collections
1417                .alter_table_desc(
1418                    existing_collection,
1419                    new_collection,
1420                    new_desc.clone(),
1421                    expected_version,
1422                )
1423                .await?;
1424
1425            existing.collection_metadata.data_shard.clone()
1426        };
1427
1428        let persist_client = self
1429            .persist
1430            .open(self.persist_location.clone())
1431            .await
1432            .expect("invalid persist location");
1433        let write_handle = self
1434            .open_data_handles(
1435                &existing_collection,
1436                data_shard,
1437                new_desc.clone(),
1438                &persist_client,
1439            )
1440            .await;
1441
1442        // Note: The new collection is now the "primary collection" so we specify `None` here.
1443        let collection_desc = CollectionDescription::<T>::for_table(new_desc.clone(), None);
1444        let collection_meta = CollectionMetadata {
1445            persist_location: self.persist_location.clone(),
1446            data_shard,
1447            relation_desc: new_desc.clone(),
1448            // TODO(alter_table): Support schema evolution on sources.
1449            remap_shard: None,
1450            txns_shard: Some(self.txns_read.txns_id().clone()),
1451        };
1452        // TODO(alter_table): Support schema evolution on sources.
1453        let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(new_collection, None);
1454        let collection_state = CollectionState::new(
1455            collection_desc.data_source.clone(),
1456            collection_meta,
1457            CollectionStateExtra::None,
1458            wallclock_lag_metrics,
1459        );
1460
1461        // Great! We have successfully evolved the schema of our Table, now we need to update our
1462        // in-memory data structures.
1463        self.collections.insert(new_collection, collection_state);
1464        let existing = self
1465            .collections
1466            .get_mut(&existing_collection)
1467            .expect("missing existing collection");
1468        assert!(matches!(
1469            existing.data_source,
1470            DataSource::Table { primary: None }
1471        ));
1472        existing.data_source = DataSource::Table {
1473            primary: Some(new_collection),
1474        };
1475
1476        self.persist_table_worker
1477            .register(register_ts, vec![(new_collection, write_handle)])
1478            .await
1479            .expect("table worker unexpectedly shut down");
1480
1481        self.append_shard_mappings([new_collection].into_iter(), Diff::ONE);
1482
1483        Ok(())
1484    }
1485
1486    fn export(
1487        &self,
1488        id: GlobalId,
1489    ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1490        self.collections
1491            .get(&id)
1492            .and_then(|c| match &c.extra_state {
1493                CollectionStateExtra::Export(state) => Some(state),
1494                _ => None,
1495            })
1496            .ok_or(StorageError::IdentifierMissing(id))
1497    }
1498
1499    fn export_mut(
1500        &mut self,
1501        id: GlobalId,
1502    ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1503        self.collections
1504            .get_mut(&id)
1505            .and_then(|c| match &mut c.extra_state {
1506                CollectionStateExtra::Export(state) => Some(state),
1507                _ => None,
1508            })
1509            .ok_or(StorageError::IdentifierMissing(id))
1510    }
1511
1512    /// Create a oneshot ingestion.
1513    async fn create_oneshot_ingestion(
1514        &mut self,
1515        ingestion_id: uuid::Uuid,
1516        collection_id: GlobalId,
1517        instance_id: StorageInstanceId,
1518        request: OneshotIngestionRequest,
1519        result_tx: OneshotResultCallback<ProtoBatch>,
1520    ) -> Result<(), StorageError<Self::Timestamp>> {
1521        let collection_meta = self
1522            .collections
1523            .get(&collection_id)
1524            .ok_or_else(|| StorageError::IdentifierMissing(collection_id))?
1525            .collection_metadata
1526            .clone();
1527        let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1528            // TODO(cf2): Refine this error.
1529            StorageError::Generic(anyhow::anyhow!("missing cluster {instance_id}"))
1530        })?;
1531        let oneshot_cmd = RunOneshotIngestion {
1532            ingestion_id,
1533            collection_id,
1534            collection_meta,
1535            request,
1536        };
1537
1538        if !self.read_only {
1539            instance.send(StorageCommand::RunOneshotIngestion(Box::new(oneshot_cmd)));
1540            let pending = PendingOneshotIngestion {
1541                result_tx,
1542                cluster_id: instance_id,
1543            };
1544            let novel = self
1545                .pending_oneshot_ingestions
1546                .insert(ingestion_id, pending);
1547            assert_none!(novel);
1548            Ok(())
1549        } else {
1550            Err(StorageError::ReadOnly)
1551        }
1552    }
1553
1554    fn cancel_oneshot_ingestion(
1555        &mut self,
1556        ingestion_id: uuid::Uuid,
1557    ) -> Result<(), StorageError<Self::Timestamp>> {
1558        if self.read_only {
1559            return Err(StorageError::ReadOnly);
1560        }
1561
1562        let pending = self
1563            .pending_oneshot_ingestions
1564            .remove(&ingestion_id)
1565            .ok_or_else(|| {
1566                // TODO(cf2): Refine this error.
1567                StorageError::Generic(anyhow::anyhow!("missing oneshot ingestion {ingestion_id}"))
1568            })?;
1569
1570        match self.instances.get_mut(&pending.cluster_id) {
1571            Some(instance) => {
1572                instance.send(StorageCommand::CancelOneshotIngestion(ingestion_id));
1573            }
1574            None => {
1575                mz_ore::soft_panic_or_log!(
1576                    "canceling oneshot ingestion on non-existent cluster, ingestion {:?}, instance {}",
1577                    ingestion_id,
1578                    pending.cluster_id,
1579                );
1580            }
1581        }
1582        // Respond to the user that the request has been canceled.
1583        pending.cancel();
1584
1585        Ok(())
1586    }
1587
1588    async fn alter_export(
1589        &mut self,
1590        id: GlobalId,
1591        new_description: ExportDescription<Self::Timestamp>,
1592    ) -> Result<(), StorageError<Self::Timestamp>> {
1593        let from_id = new_description.sink.from;
1594
1595        // Acquire read holds at StorageCollections to ensure that the
1596        // sinked collection is not dropped while we're sinking it.
1597        let desired_read_holds = vec![from_id.clone(), id.clone()];
1598        let [input_hold, self_hold] = self
1599            .storage_collections
1600            .acquire_read_holds(desired_read_holds)
1601            .expect("missing dependency")
1602            .try_into()
1603            .expect("expected number of holds");
1604        let from_storage_metadata = self.storage_collections.collection_metadata(from_id)?;
1605        let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1606
1607        // Check whether the sink's write frontier is beyond the read hold we got
1608        let cur_export = self.export_mut(id)?;
1609        let input_readable = cur_export
1610            .write_frontier
1611            .iter()
1612            .all(|t| input_hold.since().less_than(t));
1613        if !input_readable {
1614            return Err(StorageError::ReadBeforeSince(from_id));
1615        }
1616
1617        let new_export = ExportState {
1618            read_capabilities: cur_export.read_capabilities.clone(),
1619            cluster_id: new_description.instance_id,
1620            derived_since: cur_export.derived_since.clone(),
1621            read_holds: [input_hold, self_hold],
1622            read_policy: cur_export.read_policy.clone(),
1623            write_frontier: cur_export.write_frontier.clone(),
1624        };
1625        *cur_export = new_export;
1626
1627        let cmd = RunSinkCommand {
1628            id,
1629            description: StorageSinkDesc {
1630                from: from_id,
1631                from_desc: new_description.sink.from_desc,
1632                connection: new_description.sink.connection,
1633                envelope: new_description.sink.envelope,
1634                as_of: new_description.sink.as_of,
1635                version: new_description.sink.version,
1636                from_storage_metadata,
1637                with_snapshot: new_description.sink.with_snapshot,
1638                to_storage_metadata,
1639            },
1640        };
1641
1642        // Fetch the client for this export's cluster.
1643        let instance = self
1644            .instances
1645            .get_mut(&new_description.instance_id)
1646            .ok_or_else(|| StorageError::ExportInstanceMissing {
1647                storage_instance_id: new_description.instance_id,
1648                export_id: id,
1649            })?;
1650
1651        instance.send(StorageCommand::RunSink(Box::new(cmd)));
1652        Ok(())
1653    }
1654
1655    /// Create the sinks described by the `ExportDescription`.
1656    async fn alter_export_connections(
1657        &mut self,
1658        exports: BTreeMap<GlobalId, StorageSinkConnection>,
1659    ) -> Result<(), StorageError<Self::Timestamp>> {
1660        let mut updates_by_instance =
1661            BTreeMap::<StorageInstanceId, Vec<(RunSinkCommand<T>, ExportDescription<T>)>>::new();
1662
1663        for (id, connection) in exports {
1664            // We stage changes in new_export_description and then apply all
1665            // updates to exports at the end.
1666            //
1667            // We don't just go ahead and clone the `ExportState` itself and
1668            // update that because `ExportState` is not clone, because it holds
1669            // a `ReadHandle` and cloning that would cause additional work for
1670            // whoever guarantees those read holds.
1671            let (mut new_export_description, as_of): (ExportDescription<Self::Timestamp>, _) = {
1672                let export = &self.collections[&id];
1673                let DataSource::Sink { desc } = &export.data_source else {
1674                    panic!("export exists")
1675                };
1676                let CollectionStateExtra::Export(state) = &export.extra_state else {
1677                    panic!("export exists")
1678                };
1679                let export_description = desc.clone();
1680                let as_of = state.input_hold().since().clone();
1681
1682                (export_description, as_of)
1683            };
1684            let current_sink = new_export_description.sink.clone();
1685
1686            new_export_description.sink.connection = connection;
1687
1688            // Ensure compatibility
1689            current_sink.alter_compatible(id, &new_export_description.sink)?;
1690
1691            let from_storage_metadata = self
1692                .storage_collections
1693                .collection_metadata(new_export_description.sink.from)?;
1694            let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1695
1696            let cmd = RunSinkCommand {
1697                id,
1698                description: StorageSinkDesc {
1699                    from: new_export_description.sink.from,
1700                    from_desc: new_export_description.sink.from_desc.clone(),
1701                    connection: new_export_description.sink.connection.clone(),
1702                    envelope: new_export_description.sink.envelope,
1703                    with_snapshot: new_export_description.sink.with_snapshot,
1704                    version: new_export_description.sink.version,
1705                    // Here we are about to send a RunSinkCommand with the current read capaibility
1706                    // held by this sink. However, clusters are already running a version of the
1707                    // sink and nothing guarantees that by the time this command arrives at the
1708                    // clusters they won't have made additional progress such that this read
1709                    // capability is invalidated.
1710                    // The solution to this problem is for the controller to track specific
1711                    // executions of dataflows such that it can track the shutdown of the current
1712                    // instance and the initialization of the new instance separately and ensure
1713                    // read holds are held for the correct amount of time.
1714                    // TODO(petrosagg): change the controller to explicitly track dataflow executions
1715                    as_of: as_of.to_owned(),
1716                    from_storage_metadata,
1717                    to_storage_metadata,
1718                },
1719            };
1720
1721            let update = updates_by_instance
1722                .entry(new_export_description.instance_id)
1723                .or_default();
1724            update.push((cmd, new_export_description));
1725        }
1726
1727        for (instance_id, updates) in updates_by_instance {
1728            let mut export_updates = BTreeMap::new();
1729            let mut cmds = Vec::with_capacity(updates.len());
1730
1731            for (cmd, export_state) in updates {
1732                export_updates.insert(cmd.id, export_state);
1733                cmds.push(cmd);
1734            }
1735
1736            // Fetch the client for this exports's cluster.
1737            let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1738                StorageError::ExportInstanceMissing {
1739                    storage_instance_id: instance_id,
1740                    export_id: *export_updates
1741                        .keys()
1742                        .next()
1743                        .expect("set of exports not empty"),
1744                }
1745            })?;
1746
1747            for cmd in cmds {
1748                instance.send(StorageCommand::RunSink(Box::new(cmd)));
1749            }
1750
1751            // Update state only after all possible errors have occurred.
1752            for (id, new_export_description) in export_updates {
1753                let Some(state) = self.collections.get_mut(&id) else {
1754                    panic!("export known to exist")
1755                };
1756                let DataSource::Sink { desc } = &mut state.data_source else {
1757                    panic!("export known to exist")
1758                };
1759                *desc = new_export_description;
1760            }
1761        }
1762
1763        Ok(())
1764    }
1765
1766    // Dropping a table takes roughly the following flow:
1767    //
1768    // First determine if this is a TableWrites table or a source-fed table (an IngestionExport):
1769    //
1770    // If this is a TableWrites table:
1771    //   1. We remove the table from the persist table write worker.
1772    //   2. The table removal is awaited in an async task.
1773    //   3. A message is sent to the storage controller that the table has been removed from the
1774    //      table write worker.
1775    //   4. The controller drains all table drop messages during `process`.
1776    //   5. `process` calls `drop_sources` with the dropped tables.
1777    //
1778    // If this is an IngestionExport table:
1779    //   1. We validate the ids and then call drop_sources_unvalidated to proceed dropping.
1780    fn drop_tables(
1781        &mut self,
1782        storage_metadata: &StorageMetadata,
1783        identifiers: Vec<GlobalId>,
1784        ts: Self::Timestamp,
1785    ) -> Result<(), StorageError<Self::Timestamp>> {
1786        // Collect tables by their data_source
1787        let (table_write_ids, data_source_ids): (Vec<_>, Vec<_>) = identifiers
1788            .into_iter()
1789            .partition(|id| match self.collections[id].data_source {
1790                DataSource::Table { .. } => true,
1791                DataSource::IngestionExport { .. } | DataSource::Webhook => false,
1792                _ => panic!("identifier is not a table: {}", id),
1793            });
1794
1795        // Drop table write tables
1796        if table_write_ids.len() > 0 {
1797            let drop_notif = self
1798                .persist_table_worker
1799                .drop_handles(table_write_ids.clone(), ts);
1800            let tx = self.pending_table_handle_drops_tx.clone();
1801            mz_ore::task::spawn(|| "table-cleanup".to_string(), async move {
1802                drop_notif.await;
1803                for identifier in table_write_ids {
1804                    let _ = tx.send(identifier);
1805                }
1806            });
1807        }
1808
1809        // Drop source-fed tables
1810        if data_source_ids.len() > 0 {
1811            self.validate_collection_ids(data_source_ids.iter().cloned())?;
1812            self.drop_sources_unvalidated(storage_metadata, data_source_ids)?;
1813        }
1814
1815        Ok(())
1816    }
1817
1818    fn drop_sources(
1819        &mut self,
1820        storage_metadata: &StorageMetadata,
1821        identifiers: Vec<GlobalId>,
1822    ) -> Result<(), StorageError<Self::Timestamp>> {
1823        self.validate_collection_ids(identifiers.iter().cloned())?;
1824        self.drop_sources_unvalidated(storage_metadata, identifiers)
1825    }
1826
1827    fn drop_sources_unvalidated(
1828        &mut self,
1829        storage_metadata: &StorageMetadata,
1830        ids: Vec<GlobalId>,
1831    ) -> Result<(), StorageError<Self::Timestamp>> {
1832        // Keep track of which ingestions we have to execute still, and which we
1833        // have to change because of dropped subsources.
1834        let mut ingestions_to_execute = BTreeSet::new();
1835        let mut ingestions_to_drop = BTreeSet::new();
1836        let mut source_statistics_to_drop = Vec::new();
1837
1838        // Ingestions (and their exports) are also collections, but we keep
1839        // track of non-ingestion collections separately, because they have
1840        // slightly different cleanup logic below.
1841        let mut collections_to_drop = Vec::new();
1842
1843        for id in ids.iter() {
1844            let metadata = storage_metadata.get_collection_shard::<T>(*id);
1845            mz_ore::soft_assert_or_log!(
1846                matches!(metadata, Err(StorageError::IdentifierMissing(_))),
1847                "dropping {id}, but drop was not synchronized with storage \
1848                controller via `synchronize_collections`"
1849            );
1850
1851            let collection_state = self.collections.get(id);
1852
1853            if let Some(collection_state) = collection_state {
1854                match collection_state.data_source {
1855                    DataSource::Webhook => {
1856                        // TODO(parkmycar): The Collection Manager and PersistMonotonicWriter
1857                        // could probably use some love and maybe get merged together?
1858                        let fut = self.collection_manager.unregister_collection(*id);
1859                        mz_ore::task::spawn(|| format!("storage-webhook-cleanup-{id}"), fut);
1860
1861                        collections_to_drop.push(*id);
1862                        source_statistics_to_drop.push(*id);
1863                    }
1864                    DataSource::Ingestion(_) => {
1865                        ingestions_to_drop.insert(*id);
1866                        source_statistics_to_drop.push(*id);
1867                    }
1868                    DataSource::IngestionExport { ingestion_id, .. } => {
1869                        // If we are dropping source exports, we need to modify the
1870                        // ingestion that it runs on.
1871                        //
1872                        // If we remove this export, we need to stop producing data to
1873                        // it, so plan to re-execute the ingestion with the amended
1874                        // description.
1875                        ingestions_to_execute.insert(ingestion_id);
1876
1877                        // Adjust the source to remove this export.
1878                        let ingestion_state = match self.collections.get_mut(&ingestion_id) {
1879                            Some(ingestion_collection) => ingestion_collection,
1880                            // Primary ingestion already dropped.
1881                            None => {
1882                                tracing::error!(
1883                                    "primary source {ingestion_id} seemingly dropped before subsource {id}"
1884                                );
1885                                continue;
1886                            }
1887                        };
1888
1889                        match &mut ingestion_state.data_source {
1890                            DataSource::Ingestion(ingestion_desc) => {
1891                                let removed = ingestion_desc.source_exports.remove(id);
1892                                mz_ore::soft_assert_or_log!(
1893                                    removed.is_some(),
1894                                    "dropped subsource {id} already removed from source exports"
1895                                );
1896                            }
1897                            _ => unreachable!(
1898                                "SourceExport must only refer to primary sources that already exist"
1899                            ),
1900                        };
1901
1902                        // Ingestion exports also have ReadHolds that we need to
1903                        // downgrade, and much of their drop machinery is the
1904                        // same as for the "main" ingestion.
1905                        ingestions_to_drop.insert(*id);
1906                        source_statistics_to_drop.push(*id);
1907                    }
1908                    DataSource::Progress | DataSource::Table { .. } | DataSource::Other => {
1909                        collections_to_drop.push(*id);
1910                    }
1911                    DataSource::Introspection(_) | DataSource::Sink { .. } => {
1912                        // Collections of these types are either not sources and should be dropped
1913                        // through other means, or are sources but should never be dropped.
1914                        soft_panic_or_log!(
1915                            "drop_sources called on a {:?} (id={id}))",
1916                            collection_state.data_source,
1917                        );
1918                    }
1919                }
1920            }
1921        }
1922
1923        // Do not bother re-executing ingestions we know we plan to drop.
1924        ingestions_to_execute.retain(|id| !ingestions_to_drop.contains(id));
1925        for ingestion_id in ingestions_to_execute {
1926            self.run_ingestion(ingestion_id)?;
1927        }
1928
1929        // For ingestions, we fabricate a new hold that will propagate through
1930        // the cluster and then back to us.
1931
1932        // We don't explicitly remove read capabilities! Downgrading the
1933        // frontier of the source to `[]` (the empty Antichain), will propagate
1934        // to the storage dependencies.
1935        let ingestion_policies = ingestions_to_drop
1936            .iter()
1937            .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
1938            .collect();
1939
1940        tracing::debug!(
1941            ?ingestion_policies,
1942            "dropping sources by setting read hold policies"
1943        );
1944        self.set_hold_policies(ingestion_policies);
1945
1946        // Delete all collection->shard mappings
1947        let shards_to_update: BTreeSet<_> = ingestions_to_drop
1948            .iter()
1949            .chain(collections_to_drop.iter())
1950            .cloned()
1951            .collect();
1952        self.append_shard_mappings(shards_to_update.into_iter(), Diff::MINUS_ONE);
1953
1954        let status_now = mz_ore::now::to_datetime((self.now)());
1955        let mut status_updates = vec![];
1956        for id in ingestions_to_drop.iter() {
1957            status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
1958        }
1959
1960        if !self.read_only {
1961            self.append_status_introspection_updates(
1962                IntrospectionType::SourceStatusHistory,
1963                status_updates,
1964            );
1965        }
1966
1967        {
1968            let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1969            for id in source_statistics_to_drop {
1970                source_statistics
1971                    .source_statistics
1972                    .retain(|(stats_id, _), _| stats_id != &id);
1973                source_statistics
1974                    .webhook_statistics
1975                    .retain(|stats_id, _| stats_id != &id);
1976            }
1977        }
1978
1979        // Remove collection state
1980        for id in ingestions_to_drop.iter().chain(collections_to_drop.iter()) {
1981            tracing::info!(%id, "dropping collection state");
1982            let collection = self
1983                .collections
1984                .remove(id)
1985                .expect("list populated after checking that self.collections contains it");
1986
1987            let instance = match &collection.extra_state {
1988                CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
1989                CollectionStateExtra::Export(export) => Some(export.cluster_id()),
1990                CollectionStateExtra::None => None,
1991            }
1992            .and_then(|i| self.instances.get(&i));
1993
1994            // Record which replicas were running a collection, so that we can
1995            // match DroppedId messages against them and eventually remove state
1996            // from self.dropped_objects
1997            if let Some(instance) = instance {
1998                let active_replicas = instance.get_active_replicas_for_object(id);
1999                if !active_replicas.is_empty() {
2000                    // The remap collection of an ingestion doesn't have extra
2001                    // state and doesn't have an instance_id, but we still get
2002                    // upper updates for them, so want to make sure to populate
2003                    // dropped_ids.
2004                    // TODO(aljoscha): All this is a bit icky. But here we are
2005                    // for now...
2006                    match &collection.data_source {
2007                        DataSource::Ingestion(ingestion_desc) => {
2008                            self.dropped_objects.insert(
2009                                ingestion_desc.remap_collection_id,
2010                                active_replicas.clone(),
2011                            );
2012                        }
2013                        _ => {}
2014                    }
2015
2016                    self.dropped_objects.insert(*id, active_replicas);
2017                }
2018            }
2019        }
2020
2021        // Also let StorageCollections know!
2022        self.storage_collections
2023            .drop_collections_unvalidated(storage_metadata, ids);
2024
2025        Ok(())
2026    }
2027
2028    /// Drops the read capability for the sinks and allows their resources to be reclaimed.
2029    fn drop_sinks(
2030        &mut self,
2031        storage_metadata: &StorageMetadata,
2032        identifiers: Vec<GlobalId>,
2033    ) -> Result<(), StorageError<Self::Timestamp>> {
2034        self.validate_export_ids(identifiers.iter().cloned())?;
2035        self.drop_sinks_unvalidated(storage_metadata, identifiers);
2036        Ok(())
2037    }
2038
2039    fn drop_sinks_unvalidated(
2040        &mut self,
2041        storage_metadata: &StorageMetadata,
2042        mut sinks_to_drop: Vec<GlobalId>,
2043    ) {
2044        // Ignore exports that have already been removed.
2045        sinks_to_drop.retain(|id| self.export(*id).is_ok());
2046
2047        // TODO: ideally we'd advance the write frontier ourselves here, but this function's
2048        // not yet marked async.
2049
2050        // We don't explicitly remove read capabilities! Downgrading the
2051        // frontier of the source to `[]` (the empty Antichain), will propagate
2052        // to the storage dependencies.
2053        let drop_policy = sinks_to_drop
2054            .iter()
2055            .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
2056            .collect();
2057
2058        tracing::debug!(
2059            ?drop_policy,
2060            "dropping sources by setting read hold policies"
2061        );
2062        self.set_hold_policies(drop_policy);
2063
2064        // Record the drop status for all sink drops.
2065        //
2066        // We also delete the items' statistics objects.
2067        //
2068        // The locks are held for a short time, only while we do some removals from a map.
2069
2070        let status_now = mz_ore::now::to_datetime((self.now)());
2071
2072        // Record the drop status for all pending sink drops.
2073        let mut status_updates = vec![];
2074        {
2075            let mut sink_statistics = self.sink_statistics.lock().expect("poisoned");
2076            for id in sinks_to_drop.iter() {
2077                status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
2078                sink_statistics.retain(|(stats_id, _), _| stats_id != id);
2079            }
2080        }
2081
2082        if !self.read_only {
2083            self.append_status_introspection_updates(
2084                IntrospectionType::SinkStatusHistory,
2085                status_updates,
2086            );
2087        }
2088
2089        // Remove collection/export state
2090        for id in sinks_to_drop.iter() {
2091            tracing::info!(%id, "dropping export state");
2092            let collection = self
2093                .collections
2094                .remove(id)
2095                .expect("list populated after checking that self.collections contains it");
2096
2097            let instance = match &collection.extra_state {
2098                CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
2099                CollectionStateExtra::Export(export) => Some(export.cluster_id()),
2100                CollectionStateExtra::None => None,
2101            }
2102            .and_then(|i| self.instances.get(&i));
2103
2104            // Record how many replicas were running an export, so that we can
2105            // match `DroppedId` messages against it and eventually remove state
2106            // from `self.dropped_objects`.
2107            if let Some(instance) = instance {
2108                let active_replicas = instance.get_active_replicas_for_object(id);
2109                if !active_replicas.is_empty() {
2110                    self.dropped_objects.insert(*id, active_replicas);
2111                }
2112            }
2113        }
2114
2115        // Also let StorageCollections know!
2116        self.storage_collections
2117            .drop_collections_unvalidated(storage_metadata, sinks_to_drop);
2118    }
2119
2120    #[instrument(level = "debug")]
2121    fn append_table(
2122        &mut self,
2123        write_ts: Self::Timestamp,
2124        advance_to: Self::Timestamp,
2125        commands: Vec<(GlobalId, Vec<TableData>)>,
2126    ) -> Result<
2127        tokio::sync::oneshot::Receiver<Result<(), StorageError<Self::Timestamp>>>,
2128        StorageError<Self::Timestamp>,
2129    > {
2130        if self.read_only {
2131            // While in read only mode, ONLY collections that have been migrated
2132            // and need to be re-hydrated in read only mode can be written to.
2133            if !commands
2134                .iter()
2135                .all(|(id, _)| id.is_system() && self.migrated_storage_collections.contains(id))
2136            {
2137                return Err(StorageError::ReadOnly);
2138            }
2139        }
2140
2141        // TODO(petrosagg): validate appends against the expected RelationDesc of the collection
2142        for (id, updates) in commands.iter() {
2143            if !updates.is_empty() {
2144                if !write_ts.less_than(&advance_to) {
2145                    return Err(StorageError::UpdateBeyondUpper(*id));
2146                }
2147            }
2148        }
2149
2150        Ok(self
2151            .persist_table_worker
2152            .append(write_ts, advance_to, commands))
2153    }
2154
2155    fn monotonic_appender(
2156        &self,
2157        id: GlobalId,
2158    ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>> {
2159        self.collection_manager.monotonic_appender(id)
2160    }
2161
2162    fn webhook_statistics(
2163        &self,
2164        id: GlobalId,
2165    ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>> {
2166        // Call to this method are usually cached so the lock is not in the critical path.
2167        let source_statistics = self.source_statistics.lock().expect("poisoned");
2168        source_statistics
2169            .webhook_statistics
2170            .get(&id)
2171            .cloned()
2172            .ok_or(StorageError::IdentifierMissing(id))
2173    }
2174
2175    async fn ready(&mut self) {
2176        if self.maintenance_scheduled {
2177            return;
2178        }
2179
2180        if !self.pending_table_handle_drops_rx.is_empty() {
2181            return;
2182        }
2183
2184        tokio::select! {
2185            Some(m) = self.instance_response_rx.recv() => {
2186                self.stashed_responses.push(m);
2187                while let Ok(m) = self.instance_response_rx.try_recv() {
2188                    self.stashed_responses.push(m);
2189                }
2190            }
2191            _ = self.maintenance_ticker.tick() => {
2192                self.maintenance_scheduled = true;
2193            },
2194        };
2195    }
2196
2197    #[instrument(level = "debug")]
2198    fn process(
2199        &mut self,
2200        storage_metadata: &StorageMetadata,
2201    ) -> Result<Option<Response<T>>, anyhow::Error> {
2202        // Perform periodic maintenance work.
2203        if self.maintenance_scheduled {
2204            self.maintain();
2205            self.maintenance_scheduled = false;
2206        }
2207
2208        for instance in self.instances.values_mut() {
2209            instance.rehydrate_failed_replicas();
2210        }
2211
2212        let mut status_updates = vec![];
2213        let mut updated_frontiers = BTreeMap::new();
2214
2215        // Take the currently stashed responses so that we can call mut receiver functions in the loop.
2216        let stashed_responses = std::mem::take(&mut self.stashed_responses);
2217        for resp in stashed_responses {
2218            match resp {
2219                (_replica_id, StorageResponse::FrontierUpper(id, upper)) => {
2220                    self.update_write_frontier(id, &upper);
2221                    updated_frontiers.insert(id, upper);
2222                }
2223                (replica_id, StorageResponse::DroppedId(id)) => {
2224                    let replica_id = replica_id.expect("DroppedId from unknown replica");
2225                    if let Some(remaining_replicas) = self.dropped_objects.get_mut(&id) {
2226                        remaining_replicas.remove(&replica_id);
2227                        if remaining_replicas.is_empty() {
2228                            self.dropped_objects.remove(&id);
2229                        }
2230                    } else {
2231                        soft_panic_or_log!("unexpected DroppedId for {id}");
2232                    }
2233                }
2234                (replica_id, StorageResponse::StatisticsUpdates(source_stats, sink_stats)) => {
2235                    // Note we only hold the locks while moving some plain-old-data around here.
2236                    {
2237                        // NOTE(aljoscha): We explicitly unwrap the `Option`,
2238                        // because we expect that stats coming from replicas
2239                        // have a replica id.
2240                        //
2241                        // If this is `None` and we would use that to access
2242                        // state below we might clobber something unexpectedly.
2243                        let replica_id = if let Some(replica_id) = replica_id {
2244                            replica_id
2245                        } else {
2246                            tracing::error!(
2247                                ?source_stats,
2248                                "missing replica_id for source statistics update"
2249                            );
2250                            continue;
2251                        };
2252
2253                        let mut shared_stats = self.source_statistics.lock().expect("poisoned");
2254
2255                        for stat in source_stats {
2256                            let collection_id = stat.id.clone();
2257
2258                            if self.collection(collection_id).is_err() {
2259                                // We can get updates for collections that have
2260                                // already been deleted, ignore those.
2261                                continue;
2262                            }
2263
2264                            let entry = shared_stats
2265                                .source_statistics
2266                                .entry((stat.id, Some(replica_id)));
2267
2268                            match entry {
2269                                btree_map::Entry::Vacant(vacant_entry) => {
2270                                    let mut stats = ControllerSourceStatistics::new(
2271                                        collection_id,
2272                                        Some(replica_id),
2273                                    );
2274                                    stats.incorporate(stat);
2275                                    vacant_entry.insert(stats);
2276                                }
2277                                btree_map::Entry::Occupied(mut occupied_entry) => {
2278                                    occupied_entry.get_mut().incorporate(stat);
2279                                }
2280                            }
2281                        }
2282                    }
2283
2284                    {
2285                        // NOTE(aljoscha); Same as above. We want to be
2286                        // explicit.
2287                        //
2288                        // Also, technically for sinks there is no webhook
2289                        // "sources" that would force us to use an `Option`. But
2290                        // we still have to use an option for other reasons: the
2291                        // scraper expects a trait for working with stats, and
2292                        // that in the end forces the sink stats map to also
2293                        // have `Option` in the key. Do I like that? No, but
2294                        // here we are.
2295                        let replica_id = if let Some(replica_id) = replica_id {
2296                            replica_id
2297                        } else {
2298                            tracing::error!(
2299                                ?sink_stats,
2300                                "missing replica_id for sink statistics update"
2301                            );
2302                            continue;
2303                        };
2304
2305                        let mut shared_stats = self.sink_statistics.lock().expect("poisoned");
2306
2307                        for stat in sink_stats {
2308                            let collection_id = stat.id.clone();
2309
2310                            if self.collection(collection_id).is_err() {
2311                                // We can get updates for collections that have
2312                                // already been deleted, ignore those.
2313                                continue;
2314                            }
2315
2316                            let entry = shared_stats.entry((stat.id, Some(replica_id)));
2317
2318                            match entry {
2319                                btree_map::Entry::Vacant(vacant_entry) => {
2320                                    let mut stats =
2321                                        ControllerSinkStatistics::new(collection_id, replica_id);
2322                                    stats.incorporate(stat);
2323                                    vacant_entry.insert(stats);
2324                                }
2325                                btree_map::Entry::Occupied(mut occupied_entry) => {
2326                                    occupied_entry.get_mut().incorporate(stat);
2327                                }
2328                            }
2329                        }
2330                    }
2331                }
2332                (replica_id, StorageResponse::StatusUpdate(mut status_update)) => {
2333                    // NOTE(aljoscha): We sniff out the hydration status for
2334                    // ingestions from status updates. This is the easiest we
2335                    // can do right now, without going deeper into changing the
2336                    // comms protocol between controller and cluster. We cannot,
2337                    // for example use `StorageResponse::FrontierUpper`,
2338                    // because those will already get sent when the ingestion is
2339                    // just being created.
2340                    //
2341                    // Sources differ in when they will report as Running. Kafka
2342                    // UPSERT sources will only switch to `Running` once their
2343                    // state has been initialized from persist, which is the
2344                    // first case that we care about right now.
2345                    //
2346                    // I wouldn't say it's ideal, but it's workable until we
2347                    // find something better.
2348                    match status_update.status {
2349                        Status::Running => {
2350                            let collection = self.collections.get_mut(&status_update.id);
2351                            match collection {
2352                                Some(collection) => {
2353                                    match collection.extra_state {
2354                                        CollectionStateExtra::Ingestion(
2355                                            ref mut ingestion_state,
2356                                        ) => {
2357                                            if ingestion_state.hydrated_on.is_empty() {
2358                                                tracing::debug!(ingestion_id = %status_update.id, "ingestion is hydrated");
2359                                            }
2360                                            ingestion_state.hydrated_on.insert(replica_id.expect(
2361                                                "replica id should be present for status running",
2362                                            ));
2363                                        }
2364                                        CollectionStateExtra::Export(_) => {
2365                                            // TODO(sinks): track sink hydration?
2366                                        }
2367                                        CollectionStateExtra::None => {
2368                                            // Nothing to do
2369                                        }
2370                                    }
2371                                }
2372                                None => (), // no collection, let's say that's fine
2373                                            // here
2374                            }
2375                        }
2376                        Status::Paused => {
2377                            let collection = self.collections.get_mut(&status_update.id);
2378                            match collection {
2379                                Some(collection) => {
2380                                    match collection.extra_state {
2381                                        CollectionStateExtra::Ingestion(
2382                                            ref mut ingestion_state,
2383                                        ) => {
2384                                            // TODO: Paused gets send when there
2385                                            // are no active replicas. We should
2386                                            // change this to send a targeted
2387                                            // Pause for each replica, and do
2388                                            // more fine-grained hydration
2389                                            // tracking here.
2390                                            tracing::debug!(ingestion_id = %status_update.id, "ingestion is now paused");
2391                                            ingestion_state.hydrated_on.clear();
2392                                        }
2393                                        CollectionStateExtra::Export(_) => {
2394                                            // TODO(sinks): track sink hydration?
2395                                        }
2396                                        CollectionStateExtra::None => {
2397                                            // Nothing to do
2398                                        }
2399                                    }
2400                                }
2401                                None => (), // no collection, let's say that's fine
2402                                            // here
2403                            }
2404                        }
2405                        _ => (),
2406                    }
2407
2408                    // Set replica_id in the status update if available
2409                    if let Some(id) = replica_id {
2410                        status_update.replica_id = Some(id);
2411                    }
2412                    status_updates.push(status_update);
2413                }
2414                (_replica_id, StorageResponse::StagedBatches(batches)) => {
2415                    for (ingestion_id, batches) in batches {
2416                        match self.pending_oneshot_ingestions.remove(&ingestion_id) {
2417                            Some(pending) => {
2418                                // Send a cancel command so our command history is correct. And to
2419                                // avoid duplicate work once we have active replication.
2420                                if let Some(instance) = self.instances.get_mut(&pending.cluster_id)
2421                                {
2422                                    instance
2423                                        .send(StorageCommand::CancelOneshotIngestion(ingestion_id));
2424                                }
2425                                // Send the results down our channel.
2426                                (pending.result_tx)(batches)
2427                            }
2428                            None => {
2429                                // We might not be tracking this oneshot ingestion anymore because
2430                                // it was canceled.
2431                            }
2432                        }
2433                    }
2434                }
2435            }
2436        }
2437
2438        self.record_status_updates(status_updates);
2439
2440        // Process dropped tables in a single batch.
2441        let mut dropped_table_ids = Vec::new();
2442        while let Ok(dropped_id) = self.pending_table_handle_drops_rx.try_recv() {
2443            dropped_table_ids.push(dropped_id);
2444        }
2445        if !dropped_table_ids.is_empty() {
2446            self.drop_sources(storage_metadata, dropped_table_ids)?;
2447        }
2448
2449        if updated_frontiers.is_empty() {
2450            Ok(None)
2451        } else {
2452            Ok(Some(Response::FrontierUpdates(
2453                updated_frontiers.into_iter().collect(),
2454            )))
2455        }
2456    }
2457
2458    async fn inspect_persist_state(
2459        &self,
2460        id: GlobalId,
2461    ) -> Result<serde_json::Value, anyhow::Error> {
2462        let collection = &self.storage_collections.collection_metadata(id)?;
2463        let client = self
2464            .persist
2465            .open(collection.persist_location.clone())
2466            .await?;
2467        let shard_state = client
2468            .inspect_shard::<Self::Timestamp>(&collection.data_shard)
2469            .await?;
2470        let json_state = serde_json::to_value(shard_state)?;
2471        Ok(json_state)
2472    }
2473
2474    fn append_introspection_updates(
2475        &mut self,
2476        type_: IntrospectionType,
2477        updates: Vec<(Row, Diff)>,
2478    ) {
2479        let id = self.introspection_ids[&type_];
2480        let updates = updates.into_iter().map(|update| update.into()).collect();
2481        self.collection_manager.blind_write(id, updates);
2482    }
2483
2484    fn append_status_introspection_updates(
2485        &mut self,
2486        type_: IntrospectionType,
2487        updates: Vec<StatusUpdate>,
2488    ) {
2489        let id = self.introspection_ids[&type_];
2490        let updates: Vec<_> = updates.into_iter().map(|update| update.into()).collect();
2491        if !updates.is_empty() {
2492            self.collection_manager.blind_write(id, updates);
2493        }
2494    }
2495
2496    fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp) {
2497        let id = self.introspection_ids[&type_];
2498        self.collection_manager.differential_write(id, op);
2499    }
2500
2501    fn append_only_introspection_tx(
2502        &self,
2503        type_: IntrospectionType,
2504    ) -> mpsc::UnboundedSender<(
2505        Vec<AppendOnlyUpdate>,
2506        oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2507    )> {
2508        let id = self.introspection_ids[&type_];
2509        self.collection_manager.append_only_write_sender(id)
2510    }
2511
2512    fn differential_introspection_tx(
2513        &self,
2514        type_: IntrospectionType,
2515    ) -> mpsc::UnboundedSender<(
2516        StorageWriteOp,
2517        oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2518    )> {
2519        let id = self.introspection_ids[&type_];
2520        self.collection_manager.differential_write_sender(id)
2521    }
2522
2523    async fn real_time_recent_timestamp(
2524        &self,
2525        timestamp_objects: BTreeSet<GlobalId>,
2526        timeout: Duration,
2527    ) -> Result<
2528        BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>,
2529        StorageError<Self::Timestamp>,
2530    > {
2531        use mz_storage_types::sources::GenericSourceConnection;
2532
2533        let mut rtr_futures = BTreeMap::new();
2534
2535        // Only user sources can be read from w/ RTR.
2536        for id in timestamp_objects.into_iter().filter(GlobalId::is_user) {
2537            let collection = match self.collection(id) {
2538                Ok(c) => c,
2539                // Not a storage item, which we accept.
2540                Err(_) => continue,
2541            };
2542
2543            let (source_conn, remap_id) = match &collection.data_source {
2544                DataSource::Ingestion(IngestionDescription {
2545                    desc: SourceDesc { connection, .. },
2546                    remap_collection_id,
2547                    ..
2548                }) => match connection {
2549                    GenericSourceConnection::Kafka(_)
2550                    | GenericSourceConnection::Postgres(_)
2551                    | GenericSourceConnection::MySql(_)
2552                    | GenericSourceConnection::SqlServer(_) => {
2553                        (connection.clone(), *remap_collection_id)
2554                    }
2555
2556                    // These internal sources do not yet (and might never)
2557                    // support RTR. However, erroring if they're selected from
2558                    // poses an annoying user experience, so instead just skip
2559                    // over them.
2560                    GenericSourceConnection::LoadGenerator(_) => continue,
2561                },
2562                // Skip over all other objects
2563                _ => {
2564                    continue;
2565                }
2566            };
2567
2568            // Prepare for getting the external system's frontier.
2569            let config = self.config().clone();
2570
2571            // Determine the remap collection we plan to read from.
2572            //
2573            // Note that the process of reading from the remap shard is the same
2574            // as other areas in this code that do the same thing, but we inline
2575            // it here because we must prove that we have not taken ownership of
2576            // `self` to move the stream of data from the remap shard into a
2577            // future.
2578            let read_handle = self.read_handle_for_snapshot(remap_id).await?;
2579
2580            // Have to acquire a read hold to prevent the since from advancing
2581            // while we read.
2582            let remap_read_hold = self
2583                .storage_collections
2584                .acquire_read_holds(vec![remap_id])
2585                .map_err(|_e| StorageError::ReadBeforeSince(remap_id))?
2586                .expect_element(|| "known to be exactly one");
2587
2588            let remap_as_of = remap_read_hold
2589                .since()
2590                .to_owned()
2591                .into_option()
2592                .ok_or(StorageError::ReadBeforeSince(remap_id))?;
2593
2594            rtr_futures.insert(
2595                id,
2596                tokio::time::timeout(timeout, async move {
2597                    use mz_storage_types::sources::SourceConnection as _;
2598
2599                    // Fetch the remap shard's contents; we must do this first so
2600                    // that the `as_of` doesn't change.
2601                    let as_of = Antichain::from_elem(remap_as_of);
2602                    let remap_subscribe = read_handle
2603                        .subscribe(as_of.clone())
2604                        .await
2605                        .map_err(|_| StorageError::ReadBeforeSince(remap_id))?;
2606
2607                    tracing::debug!(?id, type_ = source_conn.name(), upstream = ?source_conn.external_reference(), "fetching real time recency");
2608
2609                    let result = rtr::real_time_recency_ts(source_conn, id, config, as_of, remap_subscribe)
2610                        .await.map_err(|e| {
2611                            tracing::debug!(?id, "real time recency error: {:?}", e);
2612                            e
2613                        });
2614
2615                    // Drop once we have read succesfully.
2616                    drop(remap_read_hold);
2617
2618                    result
2619                }),
2620            );
2621        }
2622
2623        Ok(Box::pin(async move {
2624            let (ids, futs): (Vec<_>, Vec<_>) = rtr_futures.into_iter().unzip();
2625            ids.into_iter()
2626                .zip_eq(futures::future::join_all(futs).await)
2627                .try_fold(T::minimum(), |curr, (id, per_source_res)| {
2628                    let new =
2629                        per_source_res.map_err(|_e: Elapsed| StorageError::RtrTimeout(id))??;
2630                    Ok::<_, StorageError<Self::Timestamp>>(std::cmp::max(curr, new))
2631                })
2632        }))
2633    }
2634}
2635
2636/// Seed [`StorageTxn`] with any state required to instantiate a
2637/// [`StorageController`].
2638///
2639/// This cannot be a member of [`StorageController`] because it cannot take a
2640/// `self` parameter.
2641///
2642pub fn prepare_initialization<T>(txn: &mut dyn StorageTxn<T>) -> Result<(), StorageError<T>> {
2643    if txn.get_txn_wal_shard().is_none() {
2644        let txns_id = ShardId::new();
2645        txn.write_txn_wal_shard(txns_id)?;
2646    }
2647
2648    Ok(())
2649}
2650
2651impl<T> Controller<T>
2652where
2653    T: Timestamp
2654        + Lattice
2655        + TotalOrder
2656        + Codec64
2657        + From<EpochMillis>
2658        + TimestampManipulation
2659        + Into<Datum<'static>>,
2660    StorageCommand<T>: RustType<ProtoStorageCommand>,
2661    StorageResponse<T>: RustType<ProtoStorageResponse>,
2662    Self: StorageController<Timestamp = T>,
2663{
2664    /// Create a new storage controller from a client it should wrap.
2665    ///
2666    /// Note that when creating a new storage controller, you must also
2667    /// reconcile it with the previous state.
2668    ///
2669    /// # Panics
2670    /// If this function is called before [`prepare_initialization`].
2671    pub async fn new(
2672        build_info: &'static BuildInfo,
2673        persist_location: PersistLocation,
2674        persist_clients: Arc<PersistClientCache>,
2675        now: NowFn,
2676        wallclock_lag: WallclockLagFn<T>,
2677        txns_metrics: Arc<TxnMetrics>,
2678        read_only: bool,
2679        metrics_registry: &MetricsRegistry,
2680        controller_metrics: ControllerMetrics,
2681        connection_context: ConnectionContext,
2682        txn: &dyn StorageTxn<T>,
2683        storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
2684    ) -> Self {
2685        let txns_client = persist_clients
2686            .open(persist_location.clone())
2687            .await
2688            .expect("location should be valid");
2689
2690        let persist_warm_task = warm_persist_state_in_background(
2691            txns_client.clone(),
2692            txn.get_collection_metadata().into_values(),
2693        );
2694        let persist_warm_task = Some(persist_warm_task.abort_on_drop());
2695
2696        // This value must be already installed because we must ensure it's
2697        // durably recorded before it is used, otherwise we risk leaking persist
2698        // state.
2699        let txns_id = txn
2700            .get_txn_wal_shard()
2701            .expect("must call prepare initialization before creating storage controller");
2702
2703        let persist_table_worker = if read_only {
2704            let txns_write = txns_client
2705                .open_writer(
2706                    txns_id,
2707                    Arc::new(TxnsCodecRow::desc()),
2708                    Arc::new(UnitSchema),
2709                    Diagnostics {
2710                        shard_name: "txns".to_owned(),
2711                        handle_purpose: "follow txns upper".to_owned(),
2712                    },
2713                )
2714                .await
2715                .expect("txns schema shouldn't change");
2716            persist_handles::PersistTableWriteWorker::new_read_only_mode(txns_write)
2717        } else {
2718            let txns = TxnsHandle::open(
2719                T::minimum(),
2720                txns_client.clone(),
2721                txns_client.dyncfgs().clone(),
2722                Arc::clone(&txns_metrics),
2723                txns_id,
2724            )
2725            .await;
2726            persist_handles::PersistTableWriteWorker::new_txns(txns)
2727        };
2728        let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
2729
2730        let collection_manager = collection_mgmt::CollectionManager::new(read_only, now.clone());
2731
2732        let introspection_ids = BTreeMap::new();
2733        let introspection_tokens = Arc::new(Mutex::new(BTreeMap::new()));
2734
2735        let (statistics_interval_sender, _) =
2736            channel(mz_storage_types::parameters::STATISTICS_INTERVAL_DEFAULT);
2737
2738        let (pending_table_handle_drops_tx, pending_table_handle_drops_rx) =
2739            tokio::sync::mpsc::unbounded_channel();
2740
2741        let mut maintenance_ticker = tokio::time::interval(Duration::from_secs(1));
2742        maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2743
2744        let (instance_response_tx, instance_response_rx) = mpsc::unbounded_channel();
2745
2746        let metrics = StorageControllerMetrics::new(metrics_registry, controller_metrics);
2747
2748        let now_dt = mz_ore::now::to_datetime(now());
2749
2750        Self {
2751            build_info,
2752            collections: BTreeMap::default(),
2753            dropped_objects: Default::default(),
2754            persist_table_worker,
2755            txns_read,
2756            txns_metrics,
2757            stashed_responses: vec![],
2758            pending_table_handle_drops_tx,
2759            pending_table_handle_drops_rx,
2760            pending_oneshot_ingestions: BTreeMap::default(),
2761            collection_manager,
2762            introspection_ids,
2763            introspection_tokens,
2764            now,
2765            read_only,
2766            source_statistics: Arc::new(Mutex::new(statistics::SourceStatistics {
2767                source_statistics: BTreeMap::new(),
2768                webhook_statistics: BTreeMap::new(),
2769            })),
2770            sink_statistics: Arc::new(Mutex::new(BTreeMap::new())),
2771            statistics_interval_sender,
2772            instances: BTreeMap::new(),
2773            initialized: false,
2774            config: StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs()),
2775            persist_location,
2776            persist: persist_clients,
2777            metrics,
2778            recorded_frontiers: BTreeMap::new(),
2779            recorded_replica_frontiers: BTreeMap::new(),
2780            wallclock_lag,
2781            wallclock_lag_last_recorded: now_dt,
2782            storage_collections,
2783            migrated_storage_collections: BTreeSet::new(),
2784            maintenance_ticker,
2785            maintenance_scheduled: false,
2786            instance_response_rx,
2787            instance_response_tx,
2788            persist_warm_task,
2789        }
2790    }
2791
2792    // This is different from `set_read_policies`, which is for external users.
2793    // This method is for setting the policy that the controller uses when
2794    // maintaining the read holds that it has for collections/exports at the
2795    // StorageCollections.
2796    //
2797    // This is really only used when dropping things, where we set the
2798    // ReadPolicy to the empty Antichain.
2799    #[instrument(level = "debug")]
2800    fn set_hold_policies(&mut self, policies: Vec<(GlobalId, ReadPolicy<T>)>) {
2801        let mut read_capability_changes = BTreeMap::default();
2802
2803        for (id, policy) in policies.into_iter() {
2804            if let Some(collection) = self.collections.get_mut(&id) {
2805                let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state
2806                {
2807                    CollectionStateExtra::Ingestion(ingestion) => (
2808                        ingestion.write_frontier.borrow(),
2809                        &mut ingestion.derived_since,
2810                        &mut ingestion.hold_policy,
2811                    ),
2812                    CollectionStateExtra::None => {
2813                        unreachable!("set_hold_policies is only called for ingestions");
2814                    }
2815                    CollectionStateExtra::Export(export) => (
2816                        export.write_frontier.borrow(),
2817                        &mut export.derived_since,
2818                        &mut export.read_policy,
2819                    ),
2820                };
2821
2822                let new_derived_since = policy.frontier(write_frontier);
2823                let mut update = swap_updates(derived_since, new_derived_since);
2824                if !update.is_empty() {
2825                    read_capability_changes.insert(id, update);
2826                }
2827
2828                *hold_policy = policy;
2829            }
2830        }
2831
2832        if !read_capability_changes.is_empty() {
2833            self.update_hold_capabilities(&mut read_capability_changes);
2834        }
2835    }
2836
2837    #[instrument(level = "debug", fields(updates))]
2838    fn update_write_frontier(&mut self, id: GlobalId, new_upper: &Antichain<T>) {
2839        let mut read_capability_changes = BTreeMap::default();
2840
2841        if let Some(collection) = self.collections.get_mut(&id) {
2842            let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state {
2843                CollectionStateExtra::Ingestion(ingestion) => (
2844                    &mut ingestion.write_frontier,
2845                    &mut ingestion.derived_since,
2846                    &ingestion.hold_policy,
2847                ),
2848                CollectionStateExtra::None => {
2849                    if matches!(collection.data_source, DataSource::Progress) {
2850                        // We do get these, but can't do anything with it!
2851                    } else {
2852                        tracing::error!(
2853                            ?collection,
2854                            ?new_upper,
2855                            "updated write frontier for collection which is not an ingestion"
2856                        );
2857                    }
2858                    return;
2859                }
2860                CollectionStateExtra::Export(export) => (
2861                    &mut export.write_frontier,
2862                    &mut export.derived_since,
2863                    &export.read_policy,
2864                ),
2865            };
2866
2867            if PartialOrder::less_than(write_frontier, new_upper) {
2868                write_frontier.clone_from(new_upper);
2869            }
2870
2871            let new_derived_since = hold_policy.frontier(write_frontier.borrow());
2872            let mut update = swap_updates(derived_since, new_derived_since);
2873            if !update.is_empty() {
2874                read_capability_changes.insert(id, update);
2875            }
2876        } else if self.dropped_objects.contains_key(&id) {
2877            // We dropped an object but might still get updates from cluster
2878            // side, before it notices the drop. This is expected and fine.
2879        } else {
2880            soft_panic_or_log!("spurious upper update for {id}: {new_upper:?}");
2881        }
2882
2883        if !read_capability_changes.is_empty() {
2884            self.update_hold_capabilities(&mut read_capability_changes);
2885        }
2886    }
2887
2888    // This is different from `update_read_capabilities`, which is for external users.
2889    // This method is for maintaining the read holds that the controller has at
2890    // the StorageCollections, for storage dependencies.
2891    #[instrument(level = "debug", fields(updates))]
2892    fn update_hold_capabilities(&mut self, updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>) {
2893        // Location to record consequences that we need to act on.
2894        let mut collections_net = BTreeMap::new();
2895
2896        // We must not rely on any specific relative ordering of `GlobalId`s.
2897        // That said, it is reasonable to assume that collections generally have
2898        // greater IDs than their dependencies, so starting with the largest is
2899        // a useful optimization.
2900        while let Some(key) = updates.keys().rev().next().cloned() {
2901            let mut update = updates.remove(&key).unwrap();
2902
2903            if key.is_user() {
2904                debug!(id = %key, ?update, "update_hold_capability");
2905            }
2906
2907            if let Some(collection) = self.collections.get_mut(&key) {
2908                match &mut collection.extra_state {
2909                    CollectionStateExtra::Ingestion(ingestion) => {
2910                        let changes = ingestion.read_capabilities.update_iter(update.drain());
2911                        update.extend(changes);
2912
2913                        let (changes, frontier, _cluster_id) =
2914                            collections_net.entry(key).or_insert_with(|| {
2915                                (
2916                                    <ChangeBatch<_>>::new(),
2917                                    Antichain::new(),
2918                                    ingestion.instance_id,
2919                                )
2920                            });
2921
2922                        changes.extend(update.drain());
2923                        *frontier = ingestion.read_capabilities.frontier().to_owned();
2924                    }
2925                    CollectionStateExtra::None => {
2926                        // WIP: See if this ever panics in ci.
2927                        soft_panic_or_log!(
2928                            "trying to update holds for collection {collection:?} which is not \
2929                             an ingestion: {update:?}"
2930                        );
2931                        continue;
2932                    }
2933                    CollectionStateExtra::Export(export) => {
2934                        let changes = export.read_capabilities.update_iter(update.drain());
2935                        update.extend(changes);
2936
2937                        let (changes, frontier, _cluster_id) =
2938                            collections_net.entry(key).or_insert_with(|| {
2939                                (<ChangeBatch<_>>::new(), Antichain::new(), export.cluster_id)
2940                            });
2941
2942                        changes.extend(update.drain());
2943                        *frontier = export.read_capabilities.frontier().to_owned();
2944                    }
2945                }
2946            } else {
2947                // This is confusing and we should probably error.
2948                tracing::warn!(id = ?key, ?update, "update_hold_capabilities for unknown object");
2949            }
2950        }
2951
2952        // Translate our net compute actions into `AllowCompaction` commands and
2953        // downgrade persist sinces.
2954        for (key, (mut changes, frontier, cluster_id)) in collections_net {
2955            if !changes.is_empty() {
2956                if key.is_user() {
2957                    debug!(id = %key, ?frontier, "downgrading ingestion read holds!");
2958                }
2959
2960                let collection = self
2961                    .collections
2962                    .get_mut(&key)
2963                    .expect("missing collection state");
2964
2965                let read_holds = match &mut collection.extra_state {
2966                    CollectionStateExtra::Ingestion(ingestion) => {
2967                        ingestion.dependency_read_holds.as_mut_slice()
2968                    }
2969                    CollectionStateExtra::Export(export) => export.read_holds.as_mut_slice(),
2970                    CollectionStateExtra::None => {
2971                        soft_panic_or_log!(
2972                            "trying to downgrade read holds for collection which is not an \
2973                             ingestion: {collection:?}"
2974                        );
2975                        continue;
2976                    }
2977                };
2978
2979                for read_hold in read_holds.iter_mut() {
2980                    read_hold
2981                        .try_downgrade(frontier.clone())
2982                        .expect("we only advance the frontier");
2983                }
2984
2985                // Send AllowCompaction command directly to the instance
2986                if let Some(instance) = self.instances.get_mut(&cluster_id) {
2987                    instance.send(StorageCommand::AllowCompaction(key, frontier.clone()));
2988                } else {
2989                    soft_panic_or_log!(
2990                        "missing instance client for cluster {cluster_id} while we still have outstanding AllowCompaction command {frontier:?} for {key}"
2991                    );
2992                }
2993            }
2994        }
2995    }
2996
2997    /// Validate that a collection exists for all identifiers, and error if any do not.
2998    fn validate_collection_ids(
2999        &self,
3000        ids: impl Iterator<Item = GlobalId>,
3001    ) -> Result<(), StorageError<T>> {
3002        for id in ids {
3003            self.storage_collections.check_exists(id)?;
3004        }
3005        Ok(())
3006    }
3007
3008    /// Validate that a collection exists for all identifiers, and error if any do not.
3009    fn validate_export_ids(
3010        &self,
3011        ids: impl Iterator<Item = GlobalId>,
3012    ) -> Result<(), StorageError<T>> {
3013        for id in ids {
3014            self.export(id)?;
3015        }
3016        Ok(())
3017    }
3018
3019    /// Opens a write and critical since handles for the given `shard`.
3020    ///
3021    /// `since` is an optional `since` that the read handle will be forwarded to if it is less than
3022    /// its current since.
3023    ///
3024    /// This will `halt!` the process if we cannot successfully acquire a critical handle with our
3025    /// current epoch.
3026    async fn open_data_handles(
3027        &self,
3028        id: &GlobalId,
3029        shard: ShardId,
3030        relation_desc: RelationDesc,
3031        persist_client: &PersistClient,
3032    ) -> WriteHandle<SourceData, (), T, StorageDiff> {
3033        let diagnostics = Diagnostics {
3034            shard_name: id.to_string(),
3035            handle_purpose: format!("controller data for {}", id),
3036        };
3037
3038        let mut write = persist_client
3039            .open_writer(
3040                shard,
3041                Arc::new(relation_desc),
3042                Arc::new(UnitSchema),
3043                diagnostics.clone(),
3044            )
3045            .await
3046            .expect("invalid persist usage");
3047
3048        // N.B.
3049        // Fetch the most recent upper for the write handle. Otherwise, this may be behind
3050        // the since of the since handle. Its vital this happens AFTER we create
3051        // the since handle as it needs to be linearized with that operation. It may be true
3052        // that creating the write handle after the since handle already ensures this, but we
3053        // do this out of an abundance of caution.
3054        //
3055        // Note that this returns the upper, but also sets it on the handle to be fetched later.
3056        write.fetch_recent_upper().await;
3057
3058        write
3059    }
3060
3061    /// Registers the given introspection collection and does any preparatory
3062    /// work that we have to do before we start writing to it. This
3063    /// preparatory work will include partial truncation or other cleanup
3064    /// schemes, depending on introspection type.
3065    fn register_introspection_collection(
3066        &mut self,
3067        id: GlobalId,
3068        introspection_type: IntrospectionType,
3069        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
3070        persist_client: PersistClient,
3071    ) -> Result<(), StorageError<T>> {
3072        tracing::info!(%id, ?introspection_type, "registering introspection collection");
3073
3074        // In read-only mode we create a new shard for all migrated storage collections. So we
3075        // "trick" the write task into thinking that it's not in read-only mode so something is
3076        // advancing this new shard.
3077        let force_writable = self.read_only && self.migrated_storage_collections.contains(&id);
3078        if force_writable {
3079            assert!(id.is_system(), "unexpected non-system global id: {id:?}");
3080            info!("writing to migrated storage collection {id} in read-only mode");
3081        }
3082
3083        let prev = self.introspection_ids.insert(introspection_type, id);
3084        assert!(
3085            prev.is_none(),
3086            "cannot have multiple IDs for introspection type"
3087        );
3088
3089        let metadata = self.storage_collections.collection_metadata(id)?.clone();
3090
3091        let read_handle_fn = move || {
3092            let persist_client = persist_client.clone();
3093            let metadata = metadata.clone();
3094
3095            let fut = async move {
3096                let read_handle = persist_client
3097                    .open_leased_reader::<SourceData, (), T, StorageDiff>(
3098                        metadata.data_shard,
3099                        Arc::new(metadata.relation_desc.clone()),
3100                        Arc::new(UnitSchema),
3101                        Diagnostics {
3102                            shard_name: id.to_string(),
3103                            handle_purpose: format!("snapshot {}", id),
3104                        },
3105                        USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3106                    )
3107                    .await
3108                    .expect("invalid persist usage");
3109                read_handle
3110            };
3111
3112            fut.boxed()
3113        };
3114
3115        let recent_upper = write_handle.shared_upper();
3116
3117        match CollectionManagerKind::from(&introspection_type) {
3118            // For these, we first register the collection and then prepare it,
3119            // because the code that prepares differential collection expects to
3120            // be able to update desired state via the collection manager
3121            // already.
3122            CollectionManagerKind::Differential => {
3123                let statistics_retention_duration =
3124                    dyncfgs::STATISTICS_RETENTION_DURATION.get(self.config().config_set());
3125
3126                // These do a shallow copy.
3127                let introspection_config = DifferentialIntrospectionConfig {
3128                    recent_upper,
3129                    introspection_type,
3130                    storage_collections: Arc::clone(&self.storage_collections),
3131                    collection_manager: self.collection_manager.clone(),
3132                    source_statistics: Arc::clone(&self.source_statistics),
3133                    sink_statistics: Arc::clone(&self.sink_statistics),
3134                    statistics_interval: self.config.parameters.statistics_interval.clone(),
3135                    statistics_interval_receiver: self.statistics_interval_sender.subscribe(),
3136                    statistics_retention_duration,
3137                    metrics: self.metrics.clone(),
3138                    introspection_tokens: Arc::clone(&self.introspection_tokens),
3139                };
3140                self.collection_manager.register_differential_collection(
3141                    id,
3142                    write_handle,
3143                    read_handle_fn,
3144                    force_writable,
3145                    introspection_config,
3146                );
3147            }
3148            // For these, we first have to prepare and then register with
3149            // collection manager, because the preparation logic wants to read
3150            // the shard's contents and then do uncontested writes.
3151            //
3152            // TODO(aljoscha): We should make the truncation/cleanup work that
3153            // happens when we take over instead be a periodic thing, and make
3154            // it resilient to the upper moving concurrently.
3155            CollectionManagerKind::AppendOnly => {
3156                let introspection_config = AppendOnlyIntrospectionConfig {
3157                    introspection_type,
3158                    config_set: Arc::clone(self.config.config_set()),
3159                    parameters: self.config.parameters.clone(),
3160                    storage_collections: Arc::clone(&self.storage_collections),
3161                };
3162                self.collection_manager.register_append_only_collection(
3163                    id,
3164                    write_handle,
3165                    force_writable,
3166                    Some(introspection_config),
3167                );
3168            }
3169        }
3170
3171        Ok(())
3172    }
3173
3174    /// Remove statistics for sources/sinks that were dropped but still have statistics rows
3175    /// hanging around.
3176    fn reconcile_dangling_statistics(&self) {
3177        self.source_statistics
3178            .lock()
3179            .expect("poisoned")
3180            .source_statistics
3181            // collections should also contain subsources.
3182            .retain(|(k, _replica_id), _| self.storage_collections.check_exists(*k).is_ok());
3183        self.sink_statistics
3184            .lock()
3185            .expect("poisoned")
3186            .retain(|(k, _replica_id), _| self.export(*k).is_ok());
3187    }
3188
3189    /// Appends a new global ID, shard ID pair to the appropriate collection.
3190    /// Use a `diff` of 1 to append a new entry; -1 to retract an existing
3191    /// entry.
3192    ///
3193    /// # Panics
3194    /// - If `self.collections` does not have an entry for `global_id`.
3195    /// - If `IntrospectionType::ShardMapping`'s `GlobalId` is not registered as
3196    ///   a managed collection.
3197    /// - If diff is any value other than `1` or `-1`.
3198    #[instrument(level = "debug")]
3199    fn append_shard_mappings<I>(&self, global_ids: I, diff: Diff)
3200    where
3201        I: Iterator<Item = GlobalId>,
3202    {
3203        mz_ore::soft_assert_or_log!(
3204            diff == Diff::MINUS_ONE || diff == Diff::ONE,
3205            "use 1 for insert or -1 for delete"
3206        );
3207
3208        let id = *self
3209            .introspection_ids
3210            .get(&IntrospectionType::ShardMapping)
3211            .expect("should be registered before this call");
3212
3213        let mut updates = vec![];
3214        // Pack updates into rows
3215        let mut row_buf = Row::default();
3216
3217        for global_id in global_ids {
3218            let shard_id = if let Some(collection) = self.collections.get(&global_id) {
3219                collection.collection_metadata.data_shard.clone()
3220            } else {
3221                panic!("unknown global id: {}", global_id);
3222            };
3223
3224            let mut packer = row_buf.packer();
3225            packer.push(Datum::from(global_id.to_string().as_str()));
3226            packer.push(Datum::from(shard_id.to_string().as_str()));
3227            updates.push((row_buf.clone(), diff));
3228        }
3229
3230        self.collection_manager.differential_append(id, updates);
3231    }
3232
3233    /// Determines and returns this collection's dependencies, if any.
3234    fn determine_collection_dependencies(
3235        &self,
3236        self_id: GlobalId,
3237        data_source: &DataSource<T>,
3238    ) -> Result<Vec<GlobalId>, StorageError<T>> {
3239        let dependency = match &data_source {
3240            DataSource::Introspection(_)
3241            | DataSource::Webhook
3242            | DataSource::Table { primary: None }
3243            | DataSource::Progress
3244            | DataSource::Other => vec![],
3245            DataSource::Table {
3246                primary: Some(primary),
3247            } => vec![*primary],
3248            DataSource::IngestionExport { ingestion_id, .. } => {
3249                // Ingestion exports depend on their primary source's remap
3250                // collection.
3251                let source_collection = self.collection(*ingestion_id)?;
3252                let ingestion_remap_collection_id = match &source_collection.data_source {
3253                    DataSource::Ingestion(ingestion) => ingestion.remap_collection_id,
3254                    _ => unreachable!(
3255                        "SourceExport must only refer to primary sources that already exist"
3256                    ),
3257                };
3258
3259                // Ingestion exports (aka. subsources) must make sure that 1)
3260                // their own collection's since stays one step behind the upper,
3261                // and, 2) that the remap shard's since stays one step behind
3262                // their upper. Hence they track themselves and the remap shard
3263                // as dependencies.
3264                vec![self_id, ingestion_remap_collection_id]
3265            }
3266            // Ingestions depend on their remap collection.
3267            DataSource::Ingestion(ingestion) => {
3268                // Ingestions must make sure that 1) their own collection's
3269                // since stays one step behind the upper, and, 2) that the remap
3270                // shard's since stays one step behind their upper. Hence they
3271                // track themselves and the remap shard as dependencies.
3272                vec![self_id, ingestion.remap_collection_id]
3273            }
3274            DataSource::Sink { desc } => {
3275                // Sinks hold back their own frontier and the frontier of their input.
3276                vec![self_id, desc.sink.from]
3277            }
3278        };
3279
3280        Ok(dependency)
3281    }
3282
3283    async fn read_handle_for_snapshot(
3284        &self,
3285        id: GlobalId,
3286    ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
3287        let metadata = self.storage_collections.collection_metadata(id)?;
3288        read_handle_for_snapshot(&self.persist, id, &metadata).await
3289    }
3290
3291    /// Handles writing of status updates for sources/sinks to the appropriate
3292    /// status relation
3293    fn record_status_updates(&mut self, updates: Vec<StatusUpdate>) {
3294        if self.read_only {
3295            return;
3296        }
3297
3298        let mut sink_status_updates = vec![];
3299        let mut source_status_updates = vec![];
3300
3301        for update in updates {
3302            let id = update.id;
3303            if self.export(id).is_ok() {
3304                sink_status_updates.push(update);
3305            } else if self.storage_collections.check_exists(id).is_ok() {
3306                source_status_updates.push(update);
3307            }
3308        }
3309
3310        self.append_status_introspection_updates(
3311            IntrospectionType::SourceStatusHistory,
3312            source_status_updates,
3313        );
3314        self.append_status_introspection_updates(
3315            IntrospectionType::SinkStatusHistory,
3316            sink_status_updates,
3317        );
3318    }
3319
3320    fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, StorageError<T>> {
3321        self.collections
3322            .get(&id)
3323            .ok_or(StorageError::IdentifierMissing(id))
3324    }
3325
3326    /// Runs the identified ingestion using the current definition of the
3327    /// ingestion in-memory.
3328    fn run_ingestion(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3329        tracing::info!(%id, "starting ingestion");
3330
3331        let collection = self.collection(id)?;
3332        let ingestion_description = match &collection.data_source {
3333            DataSource::Ingestion(i) => i.clone(),
3334            _ => {
3335                tracing::warn!("run_ingestion called on non-ingestion ID {}", id);
3336                Err(StorageError::IdentifierInvalid(id))?
3337            }
3338        };
3339
3340        // Enrich all of the exports with their metadata
3341        let mut source_exports = BTreeMap::new();
3342        for (
3343            export_id,
3344            SourceExport {
3345                storage_metadata: (),
3346                details,
3347                data_config,
3348            },
3349        ) in ingestion_description.source_exports.clone()
3350        {
3351            let export_storage_metadata = self.collection(export_id)?.collection_metadata.clone();
3352            source_exports.insert(
3353                export_id,
3354                SourceExport {
3355                    storage_metadata: export_storage_metadata,
3356                    details,
3357                    data_config,
3358                },
3359            );
3360        }
3361
3362        let description = IngestionDescription::<CollectionMetadata> {
3363            source_exports,
3364            // The ingestion metadata is simply the collection metadata of the collection with
3365            // the associated ingestion
3366            ingestion_metadata: collection.collection_metadata.clone(),
3367            // The rest of the fields are identical
3368            desc: ingestion_description.desc.clone(),
3369            instance_id: ingestion_description.instance_id,
3370            remap_collection_id: ingestion_description.remap_collection_id,
3371        };
3372
3373        let storage_instance_id = description.instance_id;
3374        // Fetch the client for this ingestion's instance.
3375        let instance = self
3376            .instances
3377            .get_mut(&storage_instance_id)
3378            .ok_or_else(|| StorageError::IngestionInstanceMissing {
3379                storage_instance_id,
3380                ingestion_id: id,
3381            })?;
3382
3383        let augmented_ingestion = Box::new(RunIngestionCommand { id, description });
3384        instance.send(StorageCommand::RunIngestion(augmented_ingestion));
3385
3386        Ok(())
3387    }
3388
3389    /// Runs the identified export using the current definition of the export
3390    /// that we have in memory.
3391    fn run_export(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3392        let DataSource::Sink { desc: description } = &self.collections[&id].data_source else {
3393            return Err(StorageError::IdentifierMissing(id));
3394        };
3395
3396        let from_storage_metadata = self
3397            .storage_collections
3398            .collection_metadata(description.sink.from)?;
3399        let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
3400
3401        // Choose an as-of frontier for this execution of the sink. If the write frontier of the sink
3402        // is strictly larger than its read hold, it must have at least written out its snapshot, and we can skip
3403        // reading it; otherwise assume we may have to replay from the beginning.
3404        let export_state = self.storage_collections.collection_frontiers(id)?;
3405        let mut as_of = description.sink.as_of.clone();
3406        as_of.join_assign(&export_state.implied_capability);
3407        let with_snapshot = description.sink.with_snapshot
3408            && !PartialOrder::less_than(&as_of, &export_state.write_frontier);
3409
3410        info!(
3411            sink_id = %id,
3412            from_id = %description.sink.from,
3413            write_frontier = ?export_state.write_frontier,
3414            ?as_of,
3415            ?with_snapshot,
3416            "run_export"
3417        );
3418
3419        let cmd = RunSinkCommand {
3420            id,
3421            description: StorageSinkDesc {
3422                from: description.sink.from,
3423                from_desc: description.sink.from_desc.clone(),
3424                connection: description.sink.connection.clone(),
3425                envelope: description.sink.envelope,
3426                as_of,
3427                version: description.sink.version,
3428                from_storage_metadata,
3429                with_snapshot,
3430                to_storage_metadata,
3431            },
3432        };
3433
3434        let storage_instance_id = description.instance_id.clone();
3435
3436        let instance = self
3437            .instances
3438            .get_mut(&storage_instance_id)
3439            .ok_or_else(|| StorageError::ExportInstanceMissing {
3440                storage_instance_id,
3441                export_id: id,
3442            })?;
3443
3444        instance.send(StorageCommand::RunSink(Box::new(cmd)));
3445
3446        Ok(())
3447    }
3448
3449    /// Update introspection with the current frontiers of storage objects.
3450    ///
3451    /// This method is invoked by `Controller::maintain`, which we expect to be called once per
3452    /// second during normal operation.
3453    fn update_frontier_introspection(&mut self) {
3454        let mut global_frontiers = BTreeMap::new();
3455        let mut replica_frontiers = BTreeMap::new();
3456
3457        for collection_frontiers in self.storage_collections.active_collection_frontiers() {
3458            let id = collection_frontiers.id;
3459            let since = collection_frontiers.read_capabilities;
3460            let upper = collection_frontiers.write_frontier;
3461
3462            let instance = self
3463                .collections
3464                .get(&id)
3465                .and_then(|collection_state| match &collection_state.extra_state {
3466                    CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
3467                    CollectionStateExtra::Export(export) => Some(export.cluster_id()),
3468                    CollectionStateExtra::None => None,
3469                })
3470                .and_then(|i| self.instances.get(&i));
3471
3472            if let Some(instance) = instance {
3473                for replica_id in instance.replica_ids() {
3474                    replica_frontiers.insert((id, replica_id), upper.clone());
3475                }
3476            }
3477
3478            global_frontiers.insert(id, (since, upper));
3479        }
3480
3481        let mut global_updates = Vec::new();
3482        let mut replica_updates = Vec::new();
3483
3484        let mut push_global_update =
3485            |id: GlobalId, (since, upper): (Antichain<T>, Antichain<T>), diff: Diff| {
3486                let read_frontier = since.into_option().map_or(Datum::Null, |t| t.into());
3487                let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3488                let row = Row::pack_slice(&[
3489                    Datum::String(&id.to_string()),
3490                    read_frontier,
3491                    write_frontier,
3492                ]);
3493                global_updates.push((row, diff));
3494            };
3495
3496        let mut push_replica_update =
3497            |(id, replica_id): (GlobalId, ReplicaId), upper: Antichain<T>, diff: Diff| {
3498                let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3499                let row = Row::pack_slice(&[
3500                    Datum::String(&id.to_string()),
3501                    Datum::String(&replica_id.to_string()),
3502                    write_frontier,
3503                ]);
3504                replica_updates.push((row, diff));
3505            };
3506
3507        let mut old_global_frontiers =
3508            std::mem::replace(&mut self.recorded_frontiers, global_frontiers);
3509        for (&id, new) in &self.recorded_frontiers {
3510            match old_global_frontiers.remove(&id) {
3511                Some(old) if &old != new => {
3512                    push_global_update(id, new.clone(), Diff::ONE);
3513                    push_global_update(id, old, Diff::MINUS_ONE);
3514                }
3515                Some(_) => (),
3516                None => push_global_update(id, new.clone(), Diff::ONE),
3517            }
3518        }
3519        for (id, old) in old_global_frontiers {
3520            push_global_update(id, old, Diff::MINUS_ONE);
3521        }
3522
3523        let mut old_replica_frontiers =
3524            std::mem::replace(&mut self.recorded_replica_frontiers, replica_frontiers);
3525        for (&key, new) in &self.recorded_replica_frontiers {
3526            match old_replica_frontiers.remove(&key) {
3527                Some(old) if &old != new => {
3528                    push_replica_update(key, new.clone(), Diff::ONE);
3529                    push_replica_update(key, old, Diff::MINUS_ONE);
3530                }
3531                Some(_) => (),
3532                None => push_replica_update(key, new.clone(), Diff::ONE),
3533            }
3534        }
3535        for (key, old) in old_replica_frontiers {
3536            push_replica_update(key, old, Diff::MINUS_ONE);
3537        }
3538
3539        let id = self.introspection_ids[&IntrospectionType::Frontiers];
3540        self.collection_manager
3541            .differential_append(id, global_updates);
3542
3543        let id = self.introspection_ids[&IntrospectionType::ReplicaFrontiers];
3544        self.collection_manager
3545            .differential_append(id, replica_updates);
3546    }
3547
3548    /// Refresh the wallclock lag introspection and metrics with the current lag values.
3549    ///
3550    /// This method produces wallclock lag metrics of two different shapes:
3551    ///
3552    /// * Histories: For each replica and each collection, we measure the lag of the write frontier
3553    ///   behind the wallclock time every second. Every minute we emit the maximum lag observed
3554    ///   over the last minute, together with the current time.
3555    /// * Histograms: For each collection, we measure the lag of the write frontier behind
3556    ///   wallclock time every second. Every minute we emit all lags observed over the last minute,
3557    ///   together with the current histogram period.
3558    ///
3559    /// Histories are emitted to both Mz introspection and Prometheus, histograms only to
3560    /// introspection. We treat lags of unreadable collections (i.e. collections that contain no
3561    /// readable times) as undefined and set them to NULL in introspection and `u64::MAX` in
3562    /// Prometheus.
3563    ///
3564    /// This method is invoked by `Controller::maintain`, which we expect to be called once per
3565    /// second during normal operation.
3566    fn refresh_wallclock_lag(&mut self) {
3567        let now_ms = (self.now)();
3568        let histogram_period =
3569            WallclockLagHistogramPeriod::from_epoch_millis(now_ms, self.config.config_set());
3570
3571        let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
3572            Some(ts) => (self.wallclock_lag)(ts.clone()),
3573            None => Duration::ZERO,
3574        };
3575
3576        for frontiers in self.storage_collections.active_collection_frontiers() {
3577            let id = frontiers.id;
3578            let Some(collection) = self.collections.get_mut(&id) else {
3579                continue;
3580            };
3581
3582            let collection_unreadable =
3583                PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
3584            let lag = if collection_unreadable {
3585                WallclockLag::Undefined
3586            } else {
3587                let lag = frontier_lag(&frontiers.write_frontier);
3588                WallclockLag::Seconds(lag.as_secs())
3589            };
3590
3591            collection.wallclock_lag_max = collection.wallclock_lag_max.max(lag);
3592
3593            // No way to specify values as undefined in Prometheus metrics, so we use the
3594            // maximum value instead.
3595            let secs = lag.unwrap_seconds_or(u64::MAX);
3596            collection.wallclock_lag_metrics.observe(secs);
3597
3598            if !ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION.get(self.config.config_set()) {
3599                continue;
3600            }
3601
3602            if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
3603                let bucket = lag.map_seconds(|secs| secs.next_power_of_two());
3604
3605                let instance_id = match &collection.extra_state {
3606                    CollectionStateExtra::Ingestion(i) => Some(i.instance_id),
3607                    CollectionStateExtra::Export(e) => Some(e.cluster_id()),
3608                    CollectionStateExtra::None => None,
3609                };
3610                let workload_class = instance_id
3611                    .and_then(|id| self.instances.get(&id))
3612                    .and_then(|i| i.workload_class.clone());
3613                let labels = match workload_class {
3614                    Some(wc) => [("workload_class", wc.clone())].into(),
3615                    None => BTreeMap::new(),
3616                };
3617
3618                let key = (histogram_period, bucket, labels);
3619                *stash.entry(key).or_default() += Diff::ONE;
3620            }
3621        }
3622
3623        // Record lags to persist, if it's time.
3624        self.maybe_record_wallclock_lag();
3625    }
3626
3627    /// Produce new wallclock lag introspection updates, provided enough time has passed since the
3628    /// last recording.
3629    ///
3630    /// We emit new introspection updates if the system time has passed into a new multiple of the
3631    /// recording interval (typically 1 minute) since the last refresh. The compute controller uses
3632    /// the same approach, ensuring that both controllers commit their lags at roughly the same
3633    /// time, avoiding confusion caused by inconsistencies.
3634    fn maybe_record_wallclock_lag(&mut self) {
3635        if self.read_only {
3636            return;
3637        }
3638
3639        let duration_trunc = |datetime: DateTime<_>, interval| {
3640            let td = TimeDelta::from_std(interval).ok()?;
3641            datetime.duration_trunc(td).ok()
3642        };
3643
3644        let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(self.config.config_set());
3645        let now_dt = mz_ore::now::to_datetime((self.now)());
3646        let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
3647            soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
3648            let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
3649            duration_trunc(now_dt, *default).unwrap()
3650        });
3651        if now_trunc <= self.wallclock_lag_last_recorded {
3652            return;
3653        }
3654
3655        let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
3656
3657        let mut history_updates = Vec::new();
3658        let mut histogram_updates = Vec::new();
3659        let mut row_buf = Row::default();
3660        for frontiers in self.storage_collections.active_collection_frontiers() {
3661            let id = frontiers.id;
3662            let Some(collection) = self.collections.get_mut(&id) else {
3663                continue;
3664            };
3665
3666            let max_lag = std::mem::replace(&mut collection.wallclock_lag_max, WallclockLag::MIN);
3667            let row = Row::pack_slice(&[
3668                Datum::String(&id.to_string()),
3669                Datum::Null,
3670                max_lag.into_interval_datum(),
3671                Datum::TimestampTz(now_ts),
3672            ]);
3673            history_updates.push((row, Diff::ONE));
3674
3675            let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
3676                continue;
3677            };
3678
3679            for ((period, lag, labels), count) in std::mem::take(stash) {
3680                let mut packer = row_buf.packer();
3681                packer.extend([
3682                    Datum::TimestampTz(period.start),
3683                    Datum::TimestampTz(period.end),
3684                    Datum::String(&id.to_string()),
3685                    lag.into_uint64_datum(),
3686                ]);
3687                let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
3688                packer.push_dict(labels);
3689
3690                histogram_updates.push((row_buf.clone(), count));
3691            }
3692        }
3693
3694        if !history_updates.is_empty() {
3695            self.append_introspection_updates(
3696                IntrospectionType::WallclockLagHistory,
3697                history_updates,
3698            );
3699        }
3700        if !histogram_updates.is_empty() {
3701            self.append_introspection_updates(
3702                IntrospectionType::WallclockLagHistogram,
3703                histogram_updates,
3704            );
3705        }
3706
3707        self.wallclock_lag_last_recorded = now_trunc;
3708    }
3709
3710    /// Run periodic tasks.
3711    ///
3712    /// This method is invoked roughly once per second during normal operation. It is a good place
3713    /// for tasks that need to run periodically, such as state cleanup or updating of metrics.
3714    fn maintain(&mut self) {
3715        self.update_frontier_introspection();
3716        self.refresh_wallclock_lag();
3717
3718        // Perform instance maintenance work.
3719        for instance in self.instances.values_mut() {
3720            instance.refresh_state_metrics();
3721        }
3722    }
3723}
3724
3725impl From<&IntrospectionType> for CollectionManagerKind {
3726    fn from(value: &IntrospectionType) -> Self {
3727        match value {
3728            IntrospectionType::ShardMapping
3729            | IntrospectionType::Frontiers
3730            | IntrospectionType::ReplicaFrontiers
3731            | IntrospectionType::StorageSourceStatistics
3732            | IntrospectionType::StorageSinkStatistics
3733            | IntrospectionType::ComputeDependencies
3734            | IntrospectionType::ComputeOperatorHydrationStatus
3735            | IntrospectionType::ComputeMaterializedViewRefreshes
3736            | IntrospectionType::ComputeErrorCounts
3737            | IntrospectionType::ComputeHydrationTimes => CollectionManagerKind::Differential,
3738
3739            IntrospectionType::SourceStatusHistory
3740            | IntrospectionType::SinkStatusHistory
3741            | IntrospectionType::PrivatelinkConnectionStatusHistory
3742            | IntrospectionType::ReplicaStatusHistory
3743            | IntrospectionType::ReplicaMetricsHistory
3744            | IntrospectionType::WallclockLagHistory
3745            | IntrospectionType::WallclockLagHistogram
3746            | IntrospectionType::PreparedStatementHistory
3747            | IntrospectionType::StatementExecutionHistory
3748            | IntrospectionType::SessionHistory
3749            | IntrospectionType::StatementLifecycleHistory
3750            | IntrospectionType::SqlText => CollectionManagerKind::AppendOnly,
3751        }
3752    }
3753}
3754
3755/// Get the current rows in the given statistics table. This is used to bootstrap
3756/// the statistics tasks.
3757///
3758// TODO(guswynn): we need to be more careful about the update time we get here:
3759// <https://github.com/MaterializeInc/database-issues/issues/7564>
3760async fn snapshot_statistics<T>(
3761    id: GlobalId,
3762    upper: Antichain<T>,
3763    storage_collections: &Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
3764) -> Vec<Row>
3765where
3766    T: Codec64 + From<EpochMillis> + TimestampManipulation,
3767{
3768    match upper.as_option() {
3769        Some(f) if f > &T::minimum() => {
3770            let as_of = f.step_back().unwrap();
3771
3772            let snapshot = storage_collections.snapshot(id, as_of).await.unwrap();
3773            snapshot
3774                .into_iter()
3775                .map(|(row, diff)| {
3776                    assert_eq!(diff, 1);
3777                    row
3778                })
3779                .collect()
3780        }
3781        // If collection is closed or the frontier is the minimum, we cannot
3782        // or don't need to truncate (respectively).
3783        _ => Vec::new(),
3784    }
3785}
3786
3787async fn read_handle_for_snapshot<T>(
3788    persist: &PersistClientCache,
3789    id: GlobalId,
3790    metadata: &CollectionMetadata,
3791) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>>
3792where
3793    T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
3794{
3795    let persist_client = persist
3796        .open(metadata.persist_location.clone())
3797        .await
3798        .unwrap();
3799
3800    // We create a new read handle every time someone requests a snapshot and then immediately
3801    // expire it instead of keeping a read handle permanently in our state to avoid having it
3802    // heartbeat continously. The assumption is that calls to snapshot are rare and therefore
3803    // worth it to always create a new handle.
3804    let read_handle = persist_client
3805        .open_leased_reader::<SourceData, (), _, _>(
3806            metadata.data_shard,
3807            Arc::new(metadata.relation_desc.clone()),
3808            Arc::new(UnitSchema),
3809            Diagnostics {
3810                shard_name: id.to_string(),
3811                handle_purpose: format!("snapshot {}", id),
3812            },
3813            USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3814        )
3815        .await
3816        .expect("invalid persist usage");
3817    Ok(read_handle)
3818}
3819
3820/// State maintained about individual collections.
3821#[derive(Debug)]
3822struct CollectionState<T: TimelyTimestamp> {
3823    /// The source of this collection's data.
3824    pub data_source: DataSource<T>,
3825
3826    pub collection_metadata: CollectionMetadata,
3827
3828    pub extra_state: CollectionStateExtra<T>,
3829
3830    /// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update.
3831    wallclock_lag_max: WallclockLag,
3832    /// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
3833    /// introspection update.
3834    ///
3835    /// Keys are `(period, lag, labels)` triples, values are counts.
3836    ///
3837    /// If this is `None`, wallclock lag is not tracked for this collection.
3838    wallclock_lag_histogram_stash: Option<
3839        BTreeMap<
3840            (
3841                WallclockLagHistogramPeriod,
3842                WallclockLag,
3843                BTreeMap<&'static str, String>,
3844            ),
3845            Diff,
3846        >,
3847    >,
3848    /// Frontier wallclock lag metrics tracked for this collection.
3849    wallclock_lag_metrics: WallclockLagMetrics,
3850}
3851
3852impl<T: TimelyTimestamp> CollectionState<T> {
3853    fn new(
3854        data_source: DataSource<T>,
3855        collection_metadata: CollectionMetadata,
3856        extra_state: CollectionStateExtra<T>,
3857        wallclock_lag_metrics: WallclockLagMetrics,
3858    ) -> Self {
3859        // Only collect wallclock lag histogram data for collections written by storage, to avoid
3860        // duplicate measurements. Collections written by other components (e.g. compute) have
3861        // their wallclock lags recorded by these components.
3862        let wallclock_lag_histogram_stash = match &data_source {
3863            DataSource::Other => None,
3864            _ => Some(Default::default()),
3865        };
3866
3867        Self {
3868            data_source,
3869            collection_metadata,
3870            extra_state,
3871            wallclock_lag_max: WallclockLag::MIN,
3872            wallclock_lag_histogram_stash,
3873            wallclock_lag_metrics,
3874        }
3875    }
3876}
3877
3878/// Additional state that the controller maintains for select collection types.
3879#[derive(Debug)]
3880enum CollectionStateExtra<T: TimelyTimestamp> {
3881    Ingestion(IngestionState<T>),
3882    Export(ExportState<T>),
3883    None,
3884}
3885
3886/// State maintained about ingestions and ingestion exports
3887#[derive(Debug)]
3888struct IngestionState<T: TimelyTimestamp> {
3889    /// Really only for keeping track of changes to the `derived_since`.
3890    pub read_capabilities: MutableAntichain<T>,
3891
3892    /// The current since frontier, derived from `write_frontier` using
3893    /// `hold_policy`.
3894    pub derived_since: Antichain<T>,
3895
3896    /// Holds that this ingestion (or ingestion export) has on its dependencies.
3897    pub dependency_read_holds: Vec<ReadHold<T>>,
3898
3899    /// Reported write frontier.
3900    pub write_frontier: Antichain<T>,
3901
3902    /// The policy that drives how we downgrade our read hold. That is how we
3903    /// derive our since from our upper.
3904    ///
3905    /// This is a _storage-controller-internal_ policy used to derive its
3906    /// personal read hold on the collection. It should not be confused with any
3907    /// read policies that the adapter might install at [StorageCollections].
3908    pub hold_policy: ReadPolicy<T>,
3909
3910    /// The ID of the instance in which the ingestion is running.
3911    pub instance_id: StorageInstanceId,
3912
3913    /// Set of replica IDs on which this ingestion is hydrated.
3914    pub hydrated_on: BTreeSet<ReplicaId>,
3915}
3916
3917/// A description of a status history collection.
3918///
3919/// Used to inform partial truncation, see
3920/// [`collection_mgmt::partially_truncate_status_history`].
3921struct StatusHistoryDesc<K> {
3922    retention_policy: StatusHistoryRetentionPolicy,
3923    extract_key: Box<dyn Fn(&[Datum]) -> K + Send>,
3924    extract_time: Box<dyn Fn(&[Datum]) -> CheckedTimestamp<DateTime<Utc>> + Send>,
3925}
3926enum StatusHistoryRetentionPolicy {
3927    // Truncates everything but the last N updates for each key.
3928    LastN(usize),
3929    // Truncates everything past the time window for each key.
3930    TimeWindow(Duration),
3931}
3932
3933fn source_status_history_desc(
3934    params: &StorageParameters,
3935) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3936    let desc = &MZ_SOURCE_STATUS_HISTORY_DESC;
3937    let (source_id_idx, _) = desc.get_by_name(&"source_id".into()).expect("exists");
3938    let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3939    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3940
3941    StatusHistoryDesc {
3942        retention_policy: StatusHistoryRetentionPolicy::LastN(
3943            params.keep_n_source_status_history_entries,
3944        ),
3945        extract_key: Box::new(move |datums| {
3946            (
3947                GlobalId::from_str(datums[source_id_idx].unwrap_str()).expect("GlobalId column"),
3948                if datums[replica_id_idx].is_null() {
3949                    None
3950                } else {
3951                    Some(
3952                        ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3953                            .expect("ReplicaId column"),
3954                    )
3955                },
3956            )
3957        }),
3958        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3959    }
3960}
3961
3962fn sink_status_history_desc(
3963    params: &StorageParameters,
3964) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3965    let desc = &MZ_SINK_STATUS_HISTORY_DESC;
3966    let (sink_id_idx, _) = desc.get_by_name(&"sink_id".into()).expect("exists");
3967    let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3968    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3969
3970    StatusHistoryDesc {
3971        retention_policy: StatusHistoryRetentionPolicy::LastN(
3972            params.keep_n_sink_status_history_entries,
3973        ),
3974        extract_key: Box::new(move |datums| {
3975            (
3976                GlobalId::from_str(datums[sink_id_idx].unwrap_str()).expect("GlobalId column"),
3977                if datums[replica_id_idx].is_null() {
3978                    None
3979                } else {
3980                    Some(
3981                        ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3982                            .expect("ReplicaId column"),
3983                    )
3984                },
3985            )
3986        }),
3987        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3988    }
3989}
3990
3991fn privatelink_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<GlobalId> {
3992    let desc = &MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC;
3993    let (key_idx, _) = desc.get_by_name(&"connection_id".into()).expect("exists");
3994    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3995
3996    StatusHistoryDesc {
3997        retention_policy: StatusHistoryRetentionPolicy::LastN(
3998            params.keep_n_privatelink_status_history_entries,
3999        ),
4000        extract_key: Box::new(move |datums| {
4001            GlobalId::from_str(datums[key_idx].unwrap_str()).expect("GlobalId column")
4002        }),
4003        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4004    }
4005}
4006
4007fn replica_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<(GlobalId, u64)> {
4008    let desc = &REPLICA_STATUS_HISTORY_DESC;
4009    let (replica_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
4010    let (process_idx, _) = desc.get_by_name(&"process_id".into()).expect("exists");
4011    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4012
4013    StatusHistoryDesc {
4014        retention_policy: StatusHistoryRetentionPolicy::TimeWindow(
4015            params.replica_status_history_retention_window,
4016        ),
4017        extract_key: Box::new(move |datums| {
4018            (
4019                GlobalId::from_str(datums[replica_idx].unwrap_str()).expect("GlobalId column"),
4020                datums[process_idx].unwrap_uint64(),
4021            )
4022        }),
4023        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4024    }
4025}
4026
4027/// Replace one antichain with another, tracking the overall changes in the returned `ChangeBatch`.
4028fn swap_updates<T: Timestamp>(
4029    from: &mut Antichain<T>,
4030    mut replace_with: Antichain<T>,
4031) -> ChangeBatch<T> {
4032    let mut update = ChangeBatch::new();
4033    if PartialOrder::less_equal(from, &replace_with) {
4034        update.extend(replace_with.iter().map(|time| (time.clone(), 1)));
4035        std::mem::swap(from, &mut replace_with);
4036        update.extend(replace_with.iter().map(|time| (time.clone(), -1)));
4037    }
4038    update
4039}