Skip to main content

mz_storage_controller/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of the storage controller trait.
11
12use std::any::Any;
13use std::collections::btree_map;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt::Debug;
16use std::str::FromStr;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use crate::collection_mgmt::{
21    AppendOnlyIntrospectionConfig, CollectionManagerKind, DifferentialIntrospectionConfig,
22};
23use crate::instance::{Instance, ReplicaConfig};
24use async_trait::async_trait;
25use chrono::{DateTime, DurationRound, TimeDelta, Utc};
26use derivative::Derivative;
27use differential_dataflow::lattice::Lattice;
28use futures::FutureExt;
29use futures::StreamExt;
30use itertools::Itertools;
31use mz_build_info::BuildInfo;
32use mz_cluster_client::client::ClusterReplicaLocation;
33use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
34use mz_cluster_client::{ReplicaId, WallclockLagFn};
35use mz_controller_types::dyncfgs::{
36    ENABLE_0DT_DEPLOYMENT_SOURCES, WALLCLOCK_LAG_RECORDING_INTERVAL,
37};
38use mz_ore::collections::CollectionExt;
39use mz_ore::metrics::MetricsRegistry;
40use mz_ore::now::NowFn;
41use mz_ore::task::AbortOnDropHandle;
42use mz_ore::{assert_none, halt, instrument, soft_panic_or_log};
43use mz_persist_client::batch::ProtoBatch;
44use mz_persist_client::cache::PersistClientCache;
45use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
46use mz_persist_client::critical::Opaque;
47use mz_persist_client::read::ReadHandle;
48use mz_persist_client::schema::CaESchema;
49use mz_persist_client::write::WriteHandle;
50use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
51use mz_persist_types::codec_impls::UnitSchema;
52use mz_repr::adt::timestamp::CheckedTimestamp;
53use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, Timestamp};
54use mz_storage_client::client::{
55    AppendOnlyUpdate, RunIngestionCommand, RunOneshotIngestion, RunSinkCommand, Status,
56    StatusUpdate, StorageCommand, StorageResponse, TableData,
57};
58use mz_storage_client::controller::{
59    BoxFuture, CollectionDescription, DataSource, ExportDescription, ExportState,
60    IntrospectionType, MonotonicAppender, PersistEpoch, Response, StorageController,
61    StorageMetadata, StorageTxn, StorageWriteOp, WallclockLag, WallclockLagHistogramPeriod,
62};
63use mz_storage_client::healthcheck::{
64    MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC,
65    MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_STATUS_HISTORY_DESC,
66};
67use mz_storage_client::metrics::StorageControllerMetrics;
68use mz_storage_client::statistics::{
69    ControllerSinkStatistics, ControllerSourceStatistics, WebhookStatistics,
70};
71use mz_storage_client::storage_collections::StorageCollections;
72use mz_storage_types::configuration::StorageConfiguration;
73use mz_storage_types::connections::ConnectionContext;
74use mz_storage_types::connections::inline::InlinedConnection;
75use mz_storage_types::controller::{AlterError, CollectionMetadata, StorageError, TxnsCodecRow};
76use mz_storage_types::errors::CollectionMissing;
77use mz_storage_types::instances::StorageInstanceId;
78use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
79use mz_storage_types::parameters::StorageParameters;
80use mz_storage_types::read_holds::ReadHold;
81use mz_storage_types::read_policy::ReadPolicy;
82use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
83use mz_storage_types::sources::{
84    GenericSourceConnection, IngestionDescription, SourceConnection, SourceData, SourceDesc,
85    SourceExport, SourceExportDataConfig,
86};
87use mz_storage_types::{AlterCompatible, StorageDiff, dyncfgs};
88use mz_txn_wal::metrics::Metrics as TxnMetrics;
89use mz_txn_wal::txn_read::TxnsRead;
90use mz_txn_wal::txns::TxnsHandle;
91use timely::order::PartialOrder;
92use timely::progress::frontier::MutableAntichain;
93use timely::progress::{Antichain, ChangeBatch};
94use tokio::sync::watch::{Sender, channel};
95use tokio::sync::{mpsc, oneshot};
96use tokio::time::MissedTickBehavior;
97use tokio::time::error::Elapsed;
98use tracing::{debug, info, warn};
99
100mod collection_mgmt;
101mod history;
102mod instance;
103mod persist_handles;
104mod rtr;
105mod statistics;
106
107#[derive(Derivative)]
108#[derivative(Debug)]
109struct PendingOneshotIngestion {
110    /// Callback used to provide results of the ingestion.
111    #[derivative(Debug = "ignore")]
112    result_tx: OneshotResultCallback<ProtoBatch>,
113    /// Cluster currently running this ingestion
114    cluster_id: StorageInstanceId,
115}
116
117impl PendingOneshotIngestion {
118    /// Consume the pending ingestion, responding with a cancelation message.
119    ///
120    /// TODO(cf2): Refine these error messages so they're not stringly typed.
121    pub(crate) fn cancel(self) {
122        (self.result_tx)(vec![Err("canceled".to_string())])
123    }
124}
125
126/// A storage controller for a storage instance.
127#[derive(Derivative)]
128#[derivative(Debug)]
129pub struct Controller {
130    /// The build information for this process.
131    build_info: &'static BuildInfo,
132    /// A function that returns the current time.
133    now: NowFn,
134
135    /// Whether or not this controller is in read-only mode.
136    ///
137    /// When in read-only mode, neither this controller nor the instances
138    /// controlled by it are allowed to affect changes to external systems
139    /// (largely persist).
140    read_only: bool,
141
142    /// Collections maintained by the storage controller.
143    ///
144    /// This collection only grows, although individual collections may be rendered unusable.
145    /// This is to prevent the re-binding of identifiers to other descriptions.
146    pub(crate) collections: BTreeMap<GlobalId, CollectionState>,
147
148    /// Map from IDs of objects that have been dropped to replicas we are still
149    /// expecting DroppedId messages from. This is cleared out once all replicas
150    /// have responded.
151    ///
152    /// We use this only to catch problems in the protocol between controller
153    /// and replicas, for example we can differentiate between late messages for
154    /// objects that have already been dropped and unexpected (read erroneous)
155    /// messages from the replica.
156    dropped_objects: BTreeMap<GlobalId, BTreeSet<ReplicaId>>,
157
158    /// Write handle for table shards.
159    pub(crate) persist_table_worker: persist_handles::PersistTableWriteWorker,
160    /// A shared TxnsCache running in a task and communicated with over a channel.
161    txns_read: TxnsRead<Timestamp>,
162    txns_metrics: Arc<TxnMetrics>,
163    stashed_responses: Vec<(Option<ReplicaId>, StorageResponse)>,
164    /// Channel for sending table handle drops.
165    #[derivative(Debug = "ignore")]
166    pending_table_handle_drops_tx: mpsc::UnboundedSender<GlobalId>,
167    /// Channel for receiving table handle drops.
168    #[derivative(Debug = "ignore")]
169    pending_table_handle_drops_rx: mpsc::UnboundedReceiver<GlobalId>,
170    /// Closures that can be used to send responses from oneshot ingestions.
171    #[derivative(Debug = "ignore")]
172    pending_oneshot_ingestions: BTreeMap<uuid::Uuid, PendingOneshotIngestion>,
173
174    /// Interface for managed collections
175    pub(crate) collection_manager: collection_mgmt::CollectionManager,
176
177    /// Tracks which collection is responsible for which [`IntrospectionType`].
178    pub(crate) introspection_ids: BTreeMap<IntrospectionType, GlobalId>,
179    /// Tokens for tasks that drive updating introspection collections. Dropping
180    /// this will make sure that any tasks (or other resources) will stop when
181    /// needed.
182    // TODO(aljoscha): Should these live somewhere else?
183    introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
184
185    // The following two fields must always be locked in order.
186    /// Consolidated metrics updates to periodically write. We do not eagerly initialize this,
187    /// and its contents are entirely driven by `StorageResponse::StatisticsUpdates`'s, as well
188    /// as webhook statistics.
189    source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
190    /// Consolidated metrics updates to periodically write. We do not eagerly initialize this,
191    /// and its contents are entirely driven by `StorageResponse::StatisticsUpdates`'s.
192    sink_statistics: Arc<Mutex<BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSinkStatistics>>>,
193    /// A way to update the statistics interval in the statistics tasks.
194    statistics_interval_sender: Sender<Duration>,
195
196    /// Clients for all known storage instances.
197    instances: BTreeMap<StorageInstanceId, Instance>,
198    /// Set to `true` once `initialization_complete` has been called.
199    initialized: bool,
200    /// Storage configuration to apply to newly provisioned instances, and use during purification.
201    config: StorageConfiguration,
202    /// The persist location where all storage collections are being written to
203    persist_location: PersistLocation,
204    /// A persist client used to write to storage collections
205    persist: Arc<PersistClientCache>,
206    /// Metrics of the Storage controller
207    metrics: StorageControllerMetrics,
208    /// `(read, write)` frontiers that have been recorded in the `Frontiers` collection, kept to be
209    /// able to retract old rows.
210    recorded_frontiers: BTreeMap<GlobalId, (Antichain<Timestamp>, Antichain<Timestamp>)>,
211    /// Write frontiers that have been recorded in the `ReplicaFrontiers` collection, kept to be
212    /// able to retract old rows.
213    recorded_replica_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<Timestamp>>,
214
215    /// A function that computes the lag between the given time and wallclock time.
216    #[derivative(Debug = "ignore")]
217    wallclock_lag: WallclockLagFn<Timestamp>,
218    /// The last time wallclock lag introspection was recorded.
219    wallclock_lag_last_recorded: DateTime<Utc>,
220
221    /// Handle to a [StorageCollections].
222    storage_collections: Arc<dyn StorageCollections + Send + Sync>,
223    /// Migrated storage collections that can be written even in read only mode.
224    migrated_storage_collections: BTreeSet<GlobalId>,
225
226    /// Ticker for scheduling periodic maintenance work.
227    maintenance_ticker: tokio::time::Interval,
228    /// Whether maintenance work was scheduled.
229    maintenance_scheduled: bool,
230
231    /// Shared transmit channel for replicas to send responses.
232    instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse)>,
233    /// Receive end for replica responses.
234    instance_response_rx: mpsc::UnboundedReceiver<(Option<ReplicaId>, StorageResponse)>,
235
236    /// Background task run at startup to warm persist state.
237    persist_warm_task: Option<AbortOnDropHandle<Box<dyn Debug + Send>>>,
238}
239
240/// Warm up persist state for `shard_ids` in a background task.
241///
242/// With better parallelism during startup this would likely be unnecessary, but empirically we see
243/// some nice speedups with this relatively simple function.
244fn warm_persist_state_in_background(
245    client: PersistClient,
246    shard_ids: impl Iterator<Item = ShardId> + Send + 'static,
247) -> mz_ore::task::JoinHandle<Box<dyn Debug + Send>> {
248    /// Bound the number of shards that we warm at a single time, to limit our overall resource use.
249    const MAX_CONCURRENT_WARMS: usize = 16;
250    let logic = async move {
251        let fetchers: Vec<_> = tokio_stream::iter(shard_ids)
252            .map(|shard_id| {
253                let client = client.clone();
254                async move {
255                    client
256                        .create_batch_fetcher::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
257                            shard_id,
258                            Arc::new(RelationDesc::empty()),
259                            Arc::new(UnitSchema),
260                            true,
261                            Diagnostics::from_purpose("warm persist load state"),
262                        )
263                        .await
264                }
265            })
266            .buffer_unordered(MAX_CONCURRENT_WARMS)
267            .collect()
268            .await;
269        let fetchers: Box<dyn Debug + Send> = Box::new(fetchers);
270        fetchers
271    };
272    mz_ore::task::spawn(|| "warm_persist_load_state", logic)
273}
274
275#[async_trait(?Send)]
276impl StorageController for Controller {
277    fn initialization_complete(&mut self) {
278        self.reconcile_dangling_statistics();
279        self.initialized = true;
280
281        for instance in self.instances.values_mut() {
282            instance.send(StorageCommand::InitializationComplete);
283        }
284    }
285
286    fn update_parameters(&mut self, config_params: StorageParameters) {
287        self.storage_collections
288            .update_parameters(config_params.clone());
289
290        // We serialize the dyncfg updates in StorageParameters, but configure
291        // persist separately.
292        self.persist.cfg().apply_from(&config_params.dyncfg_updates);
293
294        for instance in self.instances.values_mut() {
295            let params = Box::new(config_params.clone());
296            instance.send(StorageCommand::UpdateConfiguration(params));
297        }
298        self.config.update(config_params);
299        self.statistics_interval_sender
300            .send_replace(self.config.parameters.statistics_interval);
301        self.collection_manager.update_user_batch_duration(
302            self.config
303                .parameters
304                .user_storage_managed_collections_batch_duration,
305        );
306    }
307
308    /// Get the current configuration
309    fn config(&self) -> &StorageConfiguration {
310        &self.config
311    }
312
313    fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing> {
314        self.storage_collections.collection_metadata(id)
315    }
316
317    fn collection_hydrated(&self, collection_id: GlobalId) -> Result<bool, StorageError> {
318        let collection = self.collection(collection_id)?;
319
320        let instance_id = match &collection.data_source {
321            DataSource::Ingestion(ingestion_description) => ingestion_description.instance_id,
322            DataSource::IngestionExport { ingestion_id, .. } => {
323                let ingestion_state = self.collections.get(ingestion_id).expect("known to exist");
324
325                let instance_id = match &ingestion_state.data_source {
326                    DataSource::Ingestion(ingestion_desc) => ingestion_desc.instance_id,
327                    _ => unreachable!("SourceExport must only refer to primary source"),
328                };
329
330                instance_id
331            }
332            _ => return Ok(true),
333        };
334
335        let instance = self.instances.get(&instance_id).ok_or_else(|| {
336            StorageError::IngestionInstanceMissing {
337                storage_instance_id: instance_id,
338                ingestion_id: collection_id,
339            }
340        })?;
341
342        if instance.replica_ids().next().is_none() {
343            // Ingestions on zero-replica clusters are always considered
344            // hydrated.
345            return Ok(true);
346        }
347
348        match &collection.extra_state {
349            CollectionStateExtra::Ingestion(ingestion_state) => {
350                // An ingestion is hydrated if it is hydrated on at least one replica.
351                Ok(ingestion_state.hydrated_on.len() >= 1)
352            }
353            CollectionStateExtra::Export(_) => {
354                // For now, sinks are always considered hydrated. We rely on
355                // them starting up "instantly" and don't wait for them when
356                // checking hydration status of a replica.  TODO(sinks): base
357                // this off of the sink shard's frontier?
358                Ok(true)
359            }
360            CollectionStateExtra::None => {
361                // For now, objects that are not ingestions are always
362                // considered hydrated. This is tables and webhooks, as of
363                // today.
364                Ok(true)
365            }
366        }
367    }
368
369    #[mz_ore::instrument(level = "debug")]
370    fn collections_hydrated_on_replicas(
371        &self,
372        target_replica_ids: Option<Vec<ReplicaId>>,
373        target_cluster_id: &StorageInstanceId,
374        exclude_collections: &BTreeSet<GlobalId>,
375    ) -> Result<bool, StorageError> {
376        // If an empty set of replicas is provided there can be
377        // no collections on them, we'll count this as hydrated.
378        if target_replica_ids.as_ref().is_some_and(|v| v.is_empty()) {
379            return Ok(true);
380        }
381
382        // If target_replica_ids is provided, use it as the set of target
383        // replicas. Otherwise check for hydration on any replica.
384        let target_replicas: Option<BTreeSet<ReplicaId>> =
385            target_replica_ids.map(|ids| ids.into_iter().collect());
386
387        let mut all_hydrated = true;
388        for (collection_id, collection_state) in self.collections.iter() {
389            if collection_id.is_transient() || exclude_collections.contains(collection_id) {
390                continue;
391            }
392            let hydrated = match &collection_state.extra_state {
393                CollectionStateExtra::Ingestion(state) => {
394                    if &state.instance_id != target_cluster_id {
395                        continue;
396                    }
397                    match &target_replicas {
398                        Some(target_replicas) => !state.hydrated_on.is_disjoint(target_replicas),
399                        None => {
400                            // Not target replicas, so check that it's hydrated
401                            // on at least one replica.
402                            state.hydrated_on.len() >= 1
403                        }
404                    }
405                }
406                CollectionStateExtra::Export(_) => {
407                    // For now, sinks are always considered hydrated. We rely on
408                    // them starting up "instantly" and don't wait for them when
409                    // checking hydration status of a replica.  TODO(sinks):
410                    // base this off of the sink shard's frontier?
411                    true
412                }
413                CollectionStateExtra::None => {
414                    // For now, objects that are not ingestions are always
415                    // considered hydrated. This is tables and webhooks, as of
416                    // today.
417                    true
418                }
419            };
420            if !hydrated {
421                tracing::info!(%collection_id, "collection is not hydrated on any replica");
422                all_hydrated = false;
423                // We continue with our loop instead of breaking out early, so
424                // that we log all non-hydrated replicas.
425            }
426        }
427        Ok(all_hydrated)
428    }
429
430    fn collection_frontiers(
431        &self,
432        id: GlobalId,
433    ) -> Result<(Antichain<Timestamp>, Antichain<Timestamp>), CollectionMissing> {
434        let frontiers = self.storage_collections.collection_frontiers(id)?;
435        Ok((frontiers.implied_capability, frontiers.write_frontier))
436    }
437
438    fn collections_frontiers(
439        &self,
440        mut ids: Vec<GlobalId>,
441    ) -> Result<Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)>, CollectionMissing>
442    {
443        let mut result = vec![];
444        // In theory, we could pull all our frontiers from storage collections...
445        // but in practice those frontiers may not be identical. For historical reasons, we use the
446        // locally-tracked frontier for sinks but the storage-collections-maintained frontier for
447        // sources.
448        ids.retain(|&id| match self.export(id) {
449            Ok(export) => {
450                result.push((
451                    id,
452                    export.input_hold().since().clone(),
453                    export.write_frontier.clone(),
454                ));
455                false
456            }
457            Err(_) => true,
458        });
459        result.extend(
460            self.storage_collections
461                .collections_frontiers(ids)?
462                .into_iter()
463                .map(|frontiers| {
464                    (
465                        frontiers.id,
466                        frontiers.implied_capability,
467                        frontiers.write_frontier,
468                    )
469                }),
470        );
471
472        Ok(result)
473    }
474
475    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
476        self.storage_collections.active_collection_metadatas()
477    }
478
479    fn active_ingestion_exports(
480        &self,
481        instance_id: StorageInstanceId,
482    ) -> Box<dyn Iterator<Item = &GlobalId> + '_> {
483        let active_storage_collections: BTreeMap<_, _> = self
484            .storage_collections
485            .active_collection_frontiers()
486            .into_iter()
487            .map(|c| (c.id, c))
488            .collect();
489
490        let active_exports = self.instances[&instance_id]
491            .active_ingestion_exports()
492            .filter(move |id| {
493                let frontiers = active_storage_collections.get(id);
494                match frontiers {
495                    Some(frontiers) => !frontiers.write_frontier.is_empty(),
496                    None => {
497                        // Not "active", so we don't care here.
498                        false
499                    }
500                }
501            });
502
503        Box::new(active_exports)
504    }
505
506    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError> {
507        self.storage_collections.check_exists(id)
508    }
509
510    fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>) {
511        let metrics = self.metrics.for_instance(id);
512        let mut instance = Instance::new(
513            workload_class,
514            metrics,
515            self.now.clone(),
516            self.instance_response_tx.clone(),
517        );
518        if self.initialized {
519            instance.send(StorageCommand::InitializationComplete);
520        }
521        if !self.read_only {
522            instance.send(StorageCommand::AllowWrites);
523        }
524
525        let params = Box::new(self.config.parameters.clone());
526        instance.send(StorageCommand::UpdateConfiguration(params));
527
528        let old_instance = self.instances.insert(id, instance);
529        assert_none!(old_instance, "storage instance {id} already exists");
530    }
531
532    fn drop_instance(&mut self, id: StorageInstanceId) {
533        let instance = self.instances.remove(&id);
534        assert!(instance.is_some(), "storage instance {id} does not exist");
535    }
536
537    fn update_instance_workload_class(
538        &mut self,
539        id: StorageInstanceId,
540        workload_class: Option<String>,
541    ) {
542        let instance = self
543            .instances
544            .get_mut(&id)
545            .unwrap_or_else(|| panic!("instance {id} does not exist"));
546
547        instance.workload_class = workload_class;
548    }
549
550    fn connect_replica(
551        &mut self,
552        instance_id: StorageInstanceId,
553        replica_id: ReplicaId,
554        location: ClusterReplicaLocation,
555    ) {
556        let instance = self
557            .instances
558            .get_mut(&instance_id)
559            .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
560
561        let config = ReplicaConfig {
562            build_info: self.build_info,
563            location,
564            grpc_client: self.config.parameters.grpc_client.clone(),
565        };
566        instance.add_replica(replica_id, config);
567    }
568
569    fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId) {
570        let instance = self
571            .instances
572            .get_mut(&instance_id)
573            .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
574
575        let status_now = mz_ore::now::to_datetime((self.now)());
576        let mut source_status_updates = vec![];
577        let mut sink_status_updates = vec![];
578
579        let make_update = |id, object_type| StatusUpdate {
580            id,
581            status: Status::Paused,
582            timestamp: status_now,
583            error: None,
584            hints: BTreeSet::from([format!(
585                "The replica running this {object_type} has been dropped"
586            )]),
587            namespaced_errors: Default::default(),
588            replica_id: Some(replica_id),
589        };
590
591        for ingestion_id in instance.active_ingestions() {
592            if let Some(active_replicas) = self.dropped_objects.get_mut(ingestion_id) {
593                active_replicas.remove(&replica_id);
594                if active_replicas.is_empty() {
595                    self.dropped_objects.remove(ingestion_id);
596                }
597            }
598
599            let ingestion = self
600                .collections
601                .get_mut(ingestion_id)
602                .expect("instance contains unknown ingestion");
603
604            let ingestion_description = match &ingestion.data_source {
605                DataSource::Ingestion(ingestion_description) => ingestion_description.clone(),
606                _ => panic!(
607                    "unexpected data source for ingestion: {:?}",
608                    ingestion.data_source
609                ),
610            };
611
612            let old_style_ingestion = *ingestion_id != ingestion_description.remap_collection_id;
613            let subsource_ids = ingestion_description.collection_ids().filter(|id| {
614                // NOTE(aljoscha): We filter out the remap collection for old style
615                // ingestions because it doesn't get any status updates about it from the
616                // replica side. So we don't want to synthesize a 'paused' status here.
617                // New style ingestion do, since the source itself contains the remap data.
618                let should_discard =
619                    old_style_ingestion && id == &ingestion_description.remap_collection_id;
620                !should_discard
621            });
622            for id in subsource_ids {
623                source_status_updates.push(make_update(id, "source"));
624            }
625        }
626
627        for id in instance.active_exports() {
628            if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
629                active_replicas.remove(&replica_id);
630                if active_replicas.is_empty() {
631                    self.dropped_objects.remove(id);
632                }
633            }
634
635            sink_status_updates.push(make_update(*id, "sink"));
636        }
637
638        instance.drop_replica(replica_id);
639
640        if !self.read_only {
641            if !source_status_updates.is_empty() {
642                self.append_status_introspection_updates(
643                    IntrospectionType::SourceStatusHistory,
644                    source_status_updates,
645                );
646            }
647            if !sink_status_updates.is_empty() {
648                self.append_status_introspection_updates(
649                    IntrospectionType::SinkStatusHistory,
650                    sink_status_updates,
651                );
652            }
653        }
654    }
655
656    async fn evolve_nullability_for_bootstrap(
657        &mut self,
658        storage_metadata: &StorageMetadata,
659        collections: Vec<(GlobalId, RelationDesc)>,
660    ) -> Result<(), StorageError> {
661        let persist_client = self
662            .persist
663            .open(self.persist_location.clone())
664            .await
665            .unwrap();
666
667        for (global_id, relation_desc) in collections {
668            let shard_id = storage_metadata.get_collection_shard(global_id)?;
669            let diagnostics = Diagnostics {
670                shard_name: global_id.to_string(),
671                handle_purpose: "evolve nullability for bootstrap".to_string(),
672            };
673            let latest_schema = persist_client
674                .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics)
675                .await
676                .expect("invalid persist usage");
677            let Some((schema_id, current_schema, _)) = latest_schema else {
678                tracing::debug!(?global_id, "no schema registered");
679                continue;
680            };
681            tracing::debug!(?global_id, ?current_schema, new_schema = ?relation_desc, "migrating schema");
682
683            let diagnostics = Diagnostics {
684                shard_name: global_id.to_string(),
685                handle_purpose: "evolve nullability for bootstrap".to_string(),
686            };
687            let evolve_result = persist_client
688                .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
689                    shard_id,
690                    schema_id,
691                    &relation_desc,
692                    &UnitSchema,
693                    diagnostics,
694                )
695                .await
696                .expect("invalid persist usage");
697            match evolve_result {
698                CaESchema::Ok(_) => (),
699                CaESchema::ExpectedMismatch {
700                    schema_id,
701                    key,
702                    val: _,
703                } => {
704                    return Err(StorageError::PersistSchemaEvolveRace {
705                        global_id,
706                        shard_id,
707                        schema_id,
708                        relation_desc: key,
709                    });
710                }
711                CaESchema::Incompatible => {
712                    return Err(StorageError::PersistInvalidSchemaEvolve {
713                        global_id,
714                        shard_id,
715                    });
716                }
717            };
718        }
719
720        Ok(())
721    }
722
723    /// Create and "execute" the described collection.
724    ///
725    /// "Execute" is in scare quotes because what executing a collection means
726    /// varies widely based on the type of collection you're creating.
727    ///
728    /// The general process creating a collection undergoes is:
729    /// 1. Enrich the description we get from the user with the metadata only
730    ///    the storage controller's metadata. This is mostly a matter of
731    ///    separating concerns.
732    /// 2. Generate write and read persist handles for the collection.
733    /// 3. Store the collection's metadata in the appropriate field.
734    /// 4. "Execute" the collection. What that means is contingent on the type of
735    ///    collection. so consult the code for more details.
736    ///
737    // TODO(aljoscha): It would be swell if we could refactor this Leviathan of
738    // a method/move individual parts to their own methods. @guswynn observes
739    // that a number of these operations could be moved into fns on
740    // `DataSource`.
741    #[instrument(name = "storage::create_collections")]
742    async fn create_collections_for_bootstrap(
743        &mut self,
744        storage_metadata: &StorageMetadata,
745        register_ts: Option<Timestamp>,
746        mut collections: Vec<(GlobalId, CollectionDescription)>,
747        migrated_storage_collections: &BTreeSet<GlobalId>,
748    ) -> Result<(), StorageError> {
749        self.migrated_storage_collections
750            .extend(migrated_storage_collections.iter().cloned());
751
752        self.storage_collections
753            .create_collections_for_bootstrap(
754                storage_metadata,
755                register_ts,
756                collections.clone(),
757                migrated_storage_collections,
758            )
759            .await?;
760
761        // At this point we're connected to all the collection shards in persist. Our warming task
762        // is no longer useful, so abort it if it's still running.
763        drop(self.persist_warm_task.take());
764
765        // Validate first, to avoid corrupting state.
766        // 1. create a dropped identifier, or
767        // 2. create an existing identifier with a new description.
768        // Make sure to check for errors within `ingestions` as well.
769        collections.sort_by_key(|(id, _)| *id);
770        collections.dedup();
771        for pos in 1..collections.len() {
772            if collections[pos - 1].0 == collections[pos].0 {
773                return Err(StorageError::CollectionIdReused(collections[pos].0));
774            }
775        }
776
777        // We first enrich each collection description with some additional metadata...
778        let enriched_with_metadata = collections
779            .into_iter()
780            .map(|(id, description)| {
781                let data_shard = storage_metadata.get_collection_shard(id)?;
782
783                // If the shard is being managed by txn-wal (initially, tables), then we need to
784                // pass along the shard id for the txns shard to dataflow rendering.
785                let txns_shard = description
786                    .data_source
787                    .in_txns()
788                    .then(|| *self.txns_read.txns_id());
789
790                let metadata = CollectionMetadata {
791                    persist_location: self.persist_location.clone(),
792                    data_shard,
793                    relation_desc: description.desc.clone(),
794                    txns_shard,
795                };
796
797                Ok((id, description, metadata))
798            })
799            .collect_vec();
800
801        // So that we can open persist handles for each collections concurrently.
802        let persist_client = self
803            .persist
804            .open(self.persist_location.clone())
805            .await
806            .unwrap();
807        let persist_client = &persist_client;
808
809        // Reborrow the `&mut self` as immutable, as all the concurrent work to be processed in
810        // this stream cannot all have exclusive access.
811        use futures::stream::{StreamExt, TryStreamExt};
812        let this = &*self;
813        let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
814            .map(|data: Result<_, StorageError>| {
815                async move {
816                    let (id, description, metadata) = data?;
817
818                    // should be replaced with real introspection (https://github.com/MaterializeInc/database-issues/issues/4078)
819                    // but for now, it's helpful to have this mapping written down somewhere
820                    debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
821
822                    let write = this
823                        .open_data_handles(
824                            &id,
825                            metadata.data_shard,
826                            metadata.relation_desc.clone(),
827                            persist_client,
828                        )
829                        .await;
830
831                    Ok::<_, StorageError>((id, description, write, metadata))
832                }
833            })
834            // Poll each future for each collection concurrently, maximum of 50 at a time.
835            .buffer_unordered(50)
836            // HERE BE DRAGONS:
837            //
838            // There are at least 2 subtleties in using `FuturesUnordered` (which
839            // `buffer_unordered` uses underneath:
840            // - One is captured here <https://github.com/rust-lang/futures-rs/issues/2387>
841            // - And the other is deadlocking if processing an OUTPUT of a `FuturesUnordered`
842            // stream attempts to obtain an async mutex that is also obtained in the futures
843            // being polled.
844            //
845            // Both of these could potentially be issues in all usages of `buffer_unordered` in
846            // this method, so we stick the standard advice: only use `try_collect` or
847            // `collect`!
848            .try_collect()
849            .await?;
850
851        // The set of collections that we should render at the end of this
852        // function.
853        let mut to_execute = BTreeSet::new();
854        // New collections that are being created; this is distinct from the set
855        // of collections we plan to execute because
856        // `DataSource::IngestionExport` is added as a new collection, but is
857        // not executed directly.
858        let mut new_collections = BTreeSet::new();
859        let mut table_registers = Vec::with_capacity(to_register.len());
860
861        // Reorder in dependency order.
862        to_register.sort_by_key(|(id, ..)| *id);
863
864        // Register tables first, but register them in reverse order since earlier tables
865        // can depend on later tables.
866        //
867        // Note: We could do more complex sorting to avoid the allocations, but IMO it's
868        // easier to reason about it this way.
869        let (tables_to_register, collections_to_register): (Vec<_>, Vec<_>) = to_register
870            .into_iter()
871            .partition(|(_id, desc, ..)| desc.data_source == DataSource::Table);
872        let to_register = tables_to_register
873            .into_iter()
874            .rev()
875            .chain(collections_to_register.into_iter());
876
877        // Statistics need a level of indirection so we can mutably borrow
878        // `self` when registering collections and when we are inserting
879        // statistics.
880        let mut new_webhook_statistic_entries = BTreeSet::new();
881
882        for (id, description, write, metadata) in to_register {
883            let is_in_txns = |id, metadata: &CollectionMetadata| {
884                metadata.txns_shard.is_some()
885                    && !(self.read_only && migrated_storage_collections.contains(&id))
886            };
887
888            to_execute.insert(id);
889            new_collections.insert(id);
890
891            let write_frontier = write.upper();
892
893            // Determine if this collection has another dependency.
894            let storage_dependencies = self.determine_collection_dependencies(id, &description)?;
895
896            let dependency_read_holds = self
897                .storage_collections
898                .acquire_read_holds(storage_dependencies)
899                .expect("can acquire read holds");
900
901            let mut dependency_since = Antichain::from_elem(Timestamp::MIN);
902            for read_hold in dependency_read_holds.iter() {
903                dependency_since.join_assign(read_hold.since());
904            }
905
906            let data_source = description.data_source;
907
908            // Assert some invariants.
909            //
910            // TODO(alter_table): Include Tables (is_in_txns) in this check. After
911            // supporting ALTER TABLE, it's now possible for a table to have a dependency
912            // and thus run this check. But tables are managed by txn-wal and thus the
913            // upper of the shard's `write_handle` generally isn't the logical upper of
914            // the shard. Instead we need to thread through the upper of the `txn` shard
915            // here so we can check this invariant.
916            if !dependency_read_holds.is_empty()
917                && !is_in_txns(id, &metadata)
918                && !matches!(&data_source, DataSource::Sink { .. })
919            {
920                // As the halt message says, this can happen when trying to come
921                // up in read-only mode and the current read-write
922                // environmentd/controller deletes a collection. We exit
923                // gracefully, which means we'll get restarted and get to try
924                // again.
925                if dependency_since.is_empty() {
926                    halt!(
927                        "dependency since frontier is empty while dependent upper \
928                        is not empty (dependent id={id}, write_frontier={:?}, dependency_read_holds={:?}), \
929                        this indicates concurrent deletion of a collection",
930                        write_frontier,
931                        dependency_read_holds,
932                    );
933                }
934
935                // If the dependency is between a remap shard and a source (and
936                // not a "primary" dependency), the dependency since cannot be
937                // beyond the dependent (our) upper unless the collection is
938                // new. If the since is allowed to "catch up" to the upper, that
939                // is `upper <= since`, a restarting ingestion cannot
940                // differentiate between updates that have already been written
941                // out to the backing persist shard and updates that have yet to
942                // be written. We would write duplicate updates.
943                //
944                // If this check fails, it means that the read hold installed on
945                // the dependency was probably not upheld –– if it were, the
946                // dependency's since could not have advanced as far the
947                // dependent's upper.
948                //
949                // We don't care about the dependency since when the write
950                // frontier is empty. In that case, no-one can write down any
951                // more updates.
952                if description.primary.is_none() {
953                    mz_ore::soft_assert_or_log!(
954                        write_frontier.elements() == &[Timestamp::MIN]
955                            || write_frontier.is_empty()
956                            || PartialOrder::less_than(&dependency_since, write_frontier),
957                        "dependency since has advanced past dependent ({id}) upper \n
958                            dependent ({id}): upper {:?} \n
959                            dependency since {:?} \n
960                            dependency read holds: {:?}",
961                        write_frontier,
962                        dependency_since,
963                        dependency_read_holds,
964                    );
965                }
966            }
967
968            // Perform data source-specific setup.
969            let mut extra_state = CollectionStateExtra::None;
970            let mut maybe_instance_id = None;
971            match &data_source {
972                DataSource::Introspection(typ) => {
973                    debug!(
974                        ?data_source, meta = ?metadata,
975                        "registering {id} with persist monotonic worker",
976                    );
977                    // We always register the collection with the collection manager,
978                    // regardless of read-only mode. The CollectionManager itself is
979                    // aware of read-only mode and will not attempt to write before told
980                    // to do so.
981                    //
982                    self.register_introspection_collection(
983                        id,
984                        *typ,
985                        write,
986                        persist_client.clone(),
987                    )?;
988                }
989                DataSource::Webhook => {
990                    debug!(
991                        ?data_source, meta = ?metadata,
992                        "registering {id} with persist monotonic worker",
993                    );
994                    // This collection of statistics is periodically aggregated into
995                    // `source_statistics`.
996                    new_webhook_statistic_entries.insert(id);
997                    // Register the collection so our manager knows about it.
998                    //
999                    // NOTE: Maybe this shouldn't be in the collection manager,
1000                    // and collection manager should only be responsible for
1001                    // built-in introspection collections?
1002                    self.collection_manager
1003                        .register_append_only_collection(id, write, false, None);
1004                }
1005                DataSource::IngestionExport {
1006                    ingestion_id,
1007                    details,
1008                    data_config,
1009                } => {
1010                    debug!(
1011                        ?data_source, meta = ?metadata,
1012                        "not registering {id} with a controller persist worker",
1013                    );
1014                    // Adjust the source to contain this export.
1015                    let ingestion_state = self
1016                        .collections
1017                        .get_mut(ingestion_id)
1018                        .expect("known to exist");
1019
1020                    let instance_id = match &mut ingestion_state.data_source {
1021                        DataSource::Ingestion(ingestion_desc) => {
1022                            ingestion_desc.source_exports.insert(
1023                                id,
1024                                SourceExport {
1025                                    storage_metadata: (),
1026                                    details: details.clone(),
1027                                    data_config: data_config.clone(),
1028                                },
1029                            );
1030
1031                            // Record the ingestion's cluster ID for the
1032                            // ingestion export. This way we always have a
1033                            // record of it, even if the ingestion's collection
1034                            // description disappears.
1035                            ingestion_desc.instance_id
1036                        }
1037                        _ => unreachable!(
1038                            "SourceExport must only refer to primary sources that already exist"
1039                        ),
1040                    };
1041
1042                    // Executing the source export doesn't do anything, ensure we execute the source instead.
1043                    to_execute.remove(&id);
1044                    to_execute.insert(*ingestion_id);
1045
1046                    let ingestion_state = IngestionState {
1047                        read_capabilities: MutableAntichain::from(dependency_since.clone()),
1048                        dependency_read_holds,
1049                        derived_since: dependency_since,
1050                        write_frontier: Antichain::from_elem(Timestamp::MIN),
1051                        hold_policy: ReadPolicy::step_back(),
1052                        instance_id,
1053                        hydrated_on: BTreeSet::new(),
1054                    };
1055
1056                    extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1057                    maybe_instance_id = Some(instance_id);
1058                }
1059                DataSource::Table => {
1060                    debug!(
1061                        ?data_source, meta = ?metadata,
1062                        "registering {id} with persist table worker",
1063                    );
1064                    table_registers.push((id, write));
1065                }
1066                DataSource::Progress | DataSource::Other => {
1067                    debug!(
1068                        ?data_source, meta = ?metadata,
1069                        "not registering {id} with a controller persist worker",
1070                    );
1071                }
1072                DataSource::Ingestion(ingestion_desc) => {
1073                    debug!(
1074                        ?data_source, meta = ?metadata,
1075                        "not registering {id} with a controller persist worker",
1076                    );
1077
1078                    let mut dependency_since = Antichain::from_elem(Timestamp::MIN);
1079                    for read_hold in dependency_read_holds.iter() {
1080                        dependency_since.join_assign(read_hold.since());
1081                    }
1082
1083                    let ingestion_state = IngestionState {
1084                        read_capabilities: MutableAntichain::from(dependency_since.clone()),
1085                        dependency_read_holds,
1086                        derived_since: dependency_since,
1087                        write_frontier: Antichain::from_elem(Timestamp::MIN),
1088                        hold_policy: ReadPolicy::step_back(),
1089                        instance_id: ingestion_desc.instance_id,
1090                        hydrated_on: BTreeSet::new(),
1091                    };
1092
1093                    extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1094                    maybe_instance_id = Some(ingestion_desc.instance_id);
1095                }
1096                DataSource::Sink { desc } => {
1097                    let mut dependency_since = Antichain::from_elem(Timestamp::MIN);
1098                    for read_hold in dependency_read_holds.iter() {
1099                        dependency_since.join_assign(read_hold.since());
1100                    }
1101
1102                    let [self_hold, read_hold] =
1103                        dependency_read_holds.try_into().expect("two holds");
1104
1105                    let state = ExportState::new(
1106                        desc.instance_id,
1107                        read_hold,
1108                        self_hold,
1109                        write_frontier.clone(),
1110                        ReadPolicy::step_back(),
1111                    );
1112                    maybe_instance_id = Some(state.cluster_id);
1113                    extra_state = CollectionStateExtra::Export(state);
1114                }
1115            }
1116
1117            let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(id, maybe_instance_id);
1118            let collection_state =
1119                CollectionState::new(data_source, metadata, extra_state, wallclock_lag_metrics);
1120
1121            self.collections.insert(id, collection_state);
1122        }
1123
1124        {
1125            let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1126
1127            // Webhooks don't run on clusters/replicas, so we initialize their
1128            // statistics collection here.
1129            for id in new_webhook_statistic_entries {
1130                source_statistics.webhook_statistics.entry(id).or_default();
1131            }
1132
1133            // Sources and sinks only have statistics in the collection when
1134            // there is a replica that is reporting them. No need to initialize
1135            // here.
1136        }
1137
1138        // Register the tables all in one batch.
1139        if !table_registers.is_empty() {
1140            let register_ts = register_ts
1141                .expect("caller should have provided a register_ts when creating a table");
1142
1143            if self.read_only {
1144                // In read-only mode, we use a special read-only table worker
1145                // that allows writing to migrated tables and will continually
1146                // bump their shard upper so that it tracks the txn shard upper.
1147                // We do this, so that they remain readable at a recent
1148                // timestamp, which in turn allows dataflows that depend on them
1149                // to (re-)hydrate.
1150                //
1151                // We only want to register migrated tables, though, and leave
1152                // existing tables out/never write to them in read-only mode.
1153                table_registers
1154                    .retain(|(id, _write_handle)| migrated_storage_collections.contains(id));
1155
1156                self.persist_table_worker
1157                    .register(register_ts, table_registers)
1158                    .await
1159                    .expect("table worker unexpectedly shut down");
1160            } else {
1161                self.persist_table_worker
1162                    .register(register_ts, table_registers)
1163                    .await
1164                    .expect("table worker unexpectedly shut down");
1165            }
1166        }
1167
1168        self.append_shard_mappings(new_collections.into_iter(), Diff::ONE);
1169
1170        // TODO(guswynn): perform the io in this final section concurrently.
1171        for id in to_execute {
1172            match &self.collection(id)?.data_source {
1173                DataSource::Ingestion(ingestion) => {
1174                    if !self.read_only
1175                        || (ENABLE_0DT_DEPLOYMENT_SOURCES.get(self.config.config_set())
1176                            && ingestion.desc.connection.supports_read_only())
1177                    {
1178                        self.run_ingestion(id)?;
1179                    }
1180                }
1181                DataSource::IngestionExport { .. } => unreachable!(
1182                    "ingestion exports do not execute directly, but instead schedule their source to be re-executed"
1183                ),
1184                DataSource::Introspection(_)
1185                | DataSource::Webhook
1186                | DataSource::Table
1187                | DataSource::Progress
1188                | DataSource::Other => {}
1189                DataSource::Sink { .. } => {
1190                    if !self.read_only {
1191                        self.run_export(id)?;
1192                    }
1193                }
1194            };
1195        }
1196
1197        Ok(())
1198    }
1199
1200    fn check_alter_ingestion_source_desc(
1201        &mut self,
1202        ingestion_id: GlobalId,
1203        source_desc: &SourceDesc,
1204    ) -> Result<(), StorageError> {
1205        let source_collection = self.collection(ingestion_id)?;
1206        let data_source = &source_collection.data_source;
1207        match &data_source {
1208            DataSource::Ingestion(cur_ingestion) => {
1209                cur_ingestion
1210                    .desc
1211                    .alter_compatible(ingestion_id, source_desc)?;
1212            }
1213            o => {
1214                tracing::info!(
1215                    "{ingestion_id} inalterable because its data source is {:?} and not an ingestion",
1216                    o
1217                );
1218                Err(AlterError { id: ingestion_id })?
1219            }
1220        }
1221
1222        Ok(())
1223    }
1224
1225    async fn alter_ingestion_source_desc(
1226        &mut self,
1227        ingestion_ids: BTreeMap<GlobalId, SourceDesc>,
1228    ) -> Result<(), StorageError> {
1229        let mut ingestions_to_run = BTreeSet::new();
1230
1231        for (id, new_desc) in ingestion_ids {
1232            let collection = self
1233                .collections
1234                .get_mut(&id)
1235                .ok_or_else(|| StorageError::IdentifierMissing(id))?;
1236
1237            match &mut collection.data_source {
1238                DataSource::Ingestion(ingestion) => {
1239                    if ingestion.desc != new_desc {
1240                        tracing::info!(
1241                            from = ?ingestion.desc,
1242                            to = ?new_desc,
1243                            "alter_ingestion_source_desc, updating"
1244                        );
1245                        ingestion.desc = new_desc;
1246                        ingestions_to_run.insert(id);
1247                    }
1248                }
1249                o => {
1250                    tracing::warn!("alter_ingestion_source_desc called on {:?}", o);
1251                    Err(StorageError::IdentifierInvalid(id))?;
1252                }
1253            }
1254        }
1255
1256        for id in ingestions_to_run {
1257            self.run_ingestion(id)?;
1258        }
1259        Ok(())
1260    }
1261
1262    async fn alter_ingestion_connections(
1263        &mut self,
1264        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
1265    ) -> Result<(), StorageError> {
1266        let mut ingestions_to_run = BTreeSet::new();
1267
1268        for (id, conn) in source_connections {
1269            let collection = self
1270                .collections
1271                .get_mut(&id)
1272                .ok_or_else(|| StorageError::IdentifierMissing(id))?;
1273
1274            match &mut collection.data_source {
1275                DataSource::Ingestion(ingestion) => {
1276                    // If the connection hasn't changed, there's no sense in
1277                    // re-rendering the dataflow.
1278                    if ingestion.desc.connection != conn {
1279                        tracing::info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
1280                        ingestion.desc.connection = conn;
1281                        ingestions_to_run.insert(id);
1282                    } else {
1283                        tracing::warn!(
1284                            "update_source_connection called on {id} but the \
1285                            connection was the same"
1286                        );
1287                    }
1288                }
1289                o => {
1290                    tracing::warn!("update_source_connection called on {:?}", o);
1291                    Err(StorageError::IdentifierInvalid(id))?;
1292                }
1293            }
1294        }
1295
1296        for id in ingestions_to_run {
1297            self.run_ingestion(id)?;
1298        }
1299        Ok(())
1300    }
1301
1302    async fn alter_ingestion_export_data_configs(
1303        &mut self,
1304        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
1305    ) -> Result<(), StorageError> {
1306        let mut ingestions_to_run = BTreeSet::new();
1307
1308        for (source_export_id, new_data_config) in source_exports {
1309            // We need to adjust the data config on the CollectionState for
1310            // the source export collection directly
1311            let source_export_collection = self
1312                .collections
1313                .get_mut(&source_export_id)
1314                .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1315            let ingestion_id = match &mut source_export_collection.data_source {
1316                DataSource::IngestionExport {
1317                    ingestion_id,
1318                    details: _,
1319                    data_config,
1320                } => {
1321                    *data_config = new_data_config.clone();
1322                    *ingestion_id
1323                }
1324                o => {
1325                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1326                    Err(StorageError::IdentifierInvalid(source_export_id))?
1327                }
1328            };
1329            // We also need to adjust the data config on the CollectionState of the
1330            // Ingestion that the export is associated with.
1331            let ingestion_collection = self
1332                .collections
1333                .get_mut(&ingestion_id)
1334                .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
1335
1336            match &mut ingestion_collection.data_source {
1337                DataSource::Ingestion(ingestion_desc) => {
1338                    let source_export = ingestion_desc
1339                        .source_exports
1340                        .get_mut(&source_export_id)
1341                        .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1342
1343                    // If the data config hasn't changed, there's no sense in
1344                    // re-rendering the dataflow.
1345                    if source_export.data_config != new_data_config {
1346                        tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
1347                        source_export.data_config = new_data_config;
1348
1349                        ingestions_to_run.insert(ingestion_id);
1350                    } else {
1351                        tracing::warn!(
1352                            "alter_ingestion_export_data_configs called on \
1353                                    export {source_export_id} of {ingestion_id} but \
1354                                    the data config was the same"
1355                        );
1356                    }
1357                }
1358                o => {
1359                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1360                    Err(StorageError::IdentifierInvalid(ingestion_id))?
1361                }
1362            }
1363        }
1364
1365        for id in ingestions_to_run {
1366            self.run_ingestion(id)?;
1367        }
1368        Ok(())
1369    }
1370
1371    async fn alter_table_desc(
1372        &mut self,
1373        existing_collection: GlobalId,
1374        new_collection: GlobalId,
1375        new_desc: RelationDesc,
1376        expected_version: RelationVersion,
1377        register_ts: Timestamp,
1378    ) -> Result<(), StorageError> {
1379        let data_shard = {
1380            let Controller {
1381                collections,
1382                storage_collections,
1383                ..
1384            } = self;
1385
1386            let existing = collections
1387                .get(&existing_collection)
1388                .ok_or(StorageError::IdentifierMissing(existing_collection))?;
1389            if existing.data_source != DataSource::Table {
1390                return Err(StorageError::IdentifierInvalid(existing_collection));
1391            }
1392
1393            // Let StorageCollections know!
1394            storage_collections
1395                .alter_table_desc(
1396                    existing_collection,
1397                    new_collection,
1398                    new_desc.clone(),
1399                    expected_version,
1400                )
1401                .await?;
1402
1403            existing.collection_metadata.data_shard.clone()
1404        };
1405
1406        let persist_client = self
1407            .persist
1408            .open(self.persist_location.clone())
1409            .await
1410            .expect("invalid persist location");
1411        let write_handle = self
1412            .open_data_handles(
1413                &existing_collection,
1414                data_shard,
1415                new_desc.clone(),
1416                &persist_client,
1417            )
1418            .await;
1419
1420        let collection_meta = CollectionMetadata {
1421            persist_location: self.persist_location.clone(),
1422            data_shard,
1423            relation_desc: new_desc.clone(),
1424            // TODO(alter_table): Support schema evolution on sources.
1425            txns_shard: Some(self.txns_read.txns_id().clone()),
1426        };
1427        // TODO(alter_table): Support schema evolution on sources.
1428        let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(new_collection, None);
1429        let collection_state = CollectionState::new(
1430            DataSource::Table,
1431            collection_meta,
1432            CollectionStateExtra::None,
1433            wallclock_lag_metrics,
1434        );
1435
1436        // Great! We have successfully evolved the schema of our Table, now we need to update our
1437        // in-memory data structures.
1438        self.collections.insert(new_collection, collection_state);
1439
1440        self.persist_table_worker
1441            .register(register_ts, vec![(new_collection, write_handle)])
1442            .await
1443            .expect("table worker unexpectedly shut down");
1444
1445        self.append_shard_mappings([new_collection].into_iter(), Diff::ONE);
1446
1447        Ok(())
1448    }
1449
1450    fn export(&self, id: GlobalId) -> Result<&ExportState, StorageError> {
1451        self.collections
1452            .get(&id)
1453            .and_then(|c| match &c.extra_state {
1454                CollectionStateExtra::Export(state) => Some(state),
1455                _ => None,
1456            })
1457            .ok_or(StorageError::IdentifierMissing(id))
1458    }
1459
1460    fn export_mut(&mut self, id: GlobalId) -> Result<&mut ExportState, StorageError> {
1461        self.collections
1462            .get_mut(&id)
1463            .and_then(|c| match &mut c.extra_state {
1464                CollectionStateExtra::Export(state) => Some(state),
1465                _ => None,
1466            })
1467            .ok_or(StorageError::IdentifierMissing(id))
1468    }
1469
1470    /// Create a oneshot ingestion.
1471    async fn create_oneshot_ingestion(
1472        &mut self,
1473        ingestion_id: uuid::Uuid,
1474        collection_id: GlobalId,
1475        instance_id: StorageInstanceId,
1476        request: OneshotIngestionRequest,
1477        result_tx: OneshotResultCallback<ProtoBatch>,
1478    ) -> Result<(), StorageError> {
1479        let collection_meta = self
1480            .collections
1481            .get(&collection_id)
1482            .ok_or_else(|| StorageError::IdentifierMissing(collection_id))?
1483            .collection_metadata
1484            .clone();
1485        let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1486            // TODO(cf2): Refine this error.
1487            StorageError::Generic(anyhow::anyhow!("missing cluster {instance_id}"))
1488        })?;
1489        let oneshot_cmd = RunOneshotIngestion {
1490            ingestion_id,
1491            collection_id,
1492            collection_meta,
1493            request,
1494        };
1495
1496        if !self.read_only {
1497            instance.send(StorageCommand::RunOneshotIngestion(Box::new(oneshot_cmd)));
1498            let pending = PendingOneshotIngestion {
1499                result_tx,
1500                cluster_id: instance_id,
1501            };
1502            let novel = self
1503                .pending_oneshot_ingestions
1504                .insert(ingestion_id, pending);
1505            assert_none!(novel);
1506            Ok(())
1507        } else {
1508            Err(StorageError::ReadOnly)
1509        }
1510    }
1511
1512    fn cancel_oneshot_ingestion(&mut self, ingestion_id: uuid::Uuid) -> Result<(), StorageError> {
1513        if self.read_only {
1514            return Err(StorageError::ReadOnly);
1515        }
1516
1517        let pending = self
1518            .pending_oneshot_ingestions
1519            .remove(&ingestion_id)
1520            .ok_or_else(|| {
1521                // TODO(cf2): Refine this error.
1522                StorageError::Generic(anyhow::anyhow!("missing oneshot ingestion {ingestion_id}"))
1523            })?;
1524
1525        match self.instances.get_mut(&pending.cluster_id) {
1526            Some(instance) => {
1527                instance.send(StorageCommand::CancelOneshotIngestion(ingestion_id));
1528            }
1529            None => {
1530                mz_ore::soft_panic_or_log!(
1531                    "canceling oneshot ingestion on non-existent cluster, ingestion {:?}, instance {}",
1532                    ingestion_id,
1533                    pending.cluster_id,
1534                );
1535            }
1536        }
1537        // Respond to the user that the request has been canceled.
1538        pending.cancel();
1539
1540        Ok(())
1541    }
1542
1543    async fn alter_export(
1544        &mut self,
1545        id: GlobalId,
1546        new_description: ExportDescription,
1547    ) -> Result<(), StorageError> {
1548        let from_id = new_description.sink.from;
1549
1550        // Acquire read holds at StorageCollections to ensure that the
1551        // sinked collection is not dropped while we're sinking it.
1552        let desired_read_holds = vec![from_id.clone(), id.clone()];
1553        let [input_hold, self_hold] = self
1554            .storage_collections
1555            .acquire_read_holds(desired_read_holds)
1556            .expect("missing dependency")
1557            .try_into()
1558            .expect("expected number of holds");
1559        let from_storage_metadata = self.storage_collections.collection_metadata(from_id)?;
1560        let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1561
1562        // Check whether the sink's write frontier is beyond the read hold we got
1563        let cur_export = self.export_mut(id)?;
1564        let input_readable = cur_export
1565            .write_frontier
1566            .iter()
1567            .all(|t| input_hold.since().less_than(t));
1568        if !input_readable {
1569            return Err(StorageError::ReadBeforeSince(from_id));
1570        }
1571
1572        let new_export = ExportState {
1573            read_capabilities: cur_export.read_capabilities.clone(),
1574            cluster_id: new_description.instance_id,
1575            derived_since: cur_export.derived_since.clone(),
1576            read_holds: [input_hold, self_hold],
1577            read_policy: cur_export.read_policy.clone(),
1578            write_frontier: cur_export.write_frontier.clone(),
1579        };
1580        *cur_export = new_export;
1581
1582        // For `ALTER SINK`, the snapshot should only occur if the sink has not made any progress.
1583        // This prevents unnecessary decoding in the sink.
1584        // If the write frontier of the sink is strictly larger than its read hold, it must have at
1585        // least written out its snapshot, and we can skip reading it; otherwise assume we may have
1586        // to replay from the beginning.
1587        // TODO(database-issues#10002): unify this with run_export, if possible
1588        let with_snapshot = new_description.sink.with_snapshot
1589            && !PartialOrder::less_than(&new_description.sink.as_of, &cur_export.write_frontier);
1590
1591        let cmd = RunSinkCommand {
1592            id,
1593            description: StorageSinkDesc {
1594                from: from_id,
1595                from_desc: new_description.sink.from_desc,
1596                connection: new_description.sink.connection,
1597                envelope: new_description.sink.envelope,
1598                as_of: new_description.sink.as_of,
1599                version: new_description.sink.version,
1600                from_storage_metadata,
1601                with_snapshot,
1602                to_storage_metadata,
1603                commit_interval: new_description.sink.commit_interval,
1604            },
1605        };
1606
1607        // Fetch the client for this export's cluster.
1608        let instance = self
1609            .instances
1610            .get_mut(&new_description.instance_id)
1611            .ok_or_else(|| StorageError::ExportInstanceMissing {
1612                storage_instance_id: new_description.instance_id,
1613                export_id: id,
1614            })?;
1615
1616        instance.send(StorageCommand::RunSink(Box::new(cmd)));
1617        Ok(())
1618    }
1619
1620    /// Create the sinks described by the `ExportDescription`.
1621    async fn alter_export_connections(
1622        &mut self,
1623        exports: BTreeMap<GlobalId, StorageSinkConnection>,
1624    ) -> Result<(), StorageError> {
1625        let mut updates_by_instance =
1626            BTreeMap::<StorageInstanceId, Vec<(RunSinkCommand, ExportDescription)>>::new();
1627
1628        for (id, connection) in exports {
1629            // We stage changes in new_export_description and then apply all
1630            // updates to exports at the end.
1631            //
1632            // We don't just go ahead and clone the `ExportState` itself and
1633            // update that because `ExportState` is not clone, because it holds
1634            // a `ReadHandle` and cloning that would cause additional work for
1635            // whoever guarantees those read holds.
1636            let (mut new_export_description, as_of): (ExportDescription, _) = {
1637                let export = &self.collections[&id];
1638                let DataSource::Sink { desc } = &export.data_source else {
1639                    panic!("export exists")
1640                };
1641                let CollectionStateExtra::Export(state) = &export.extra_state else {
1642                    panic!("export exists")
1643                };
1644                let export_description = desc.clone();
1645                let as_of = state.input_hold().since().clone();
1646
1647                (export_description, as_of)
1648            };
1649            let current_sink = new_export_description.sink.clone();
1650
1651            new_export_description.sink.connection = connection;
1652
1653            // Ensure compatibility
1654            current_sink.alter_compatible(id, &new_export_description.sink)?;
1655
1656            let from_storage_metadata = self
1657                .storage_collections
1658                .collection_metadata(new_export_description.sink.from)?;
1659            let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1660
1661            let cmd = RunSinkCommand {
1662                id,
1663                description: StorageSinkDesc {
1664                    from: new_export_description.sink.from,
1665                    from_desc: new_export_description.sink.from_desc.clone(),
1666                    connection: new_export_description.sink.connection.clone(),
1667                    envelope: new_export_description.sink.envelope,
1668                    with_snapshot: new_export_description.sink.with_snapshot,
1669                    version: new_export_description.sink.version,
1670                    // Here we are about to send a RunSinkCommand with the current read capaibility
1671                    // held by this sink. However, clusters are already running a version of the
1672                    // sink and nothing guarantees that by the time this command arrives at the
1673                    // clusters they won't have made additional progress such that this read
1674                    // capability is invalidated.
1675                    // The solution to this problem is for the controller to track specific
1676                    // executions of dataflows such that it can track the shutdown of the current
1677                    // instance and the initialization of the new instance separately and ensure
1678                    // read holds are held for the correct amount of time.
1679                    // TODO(petrosagg): change the controller to explicitly track dataflow executions
1680                    as_of: as_of.to_owned(),
1681                    from_storage_metadata,
1682                    to_storage_metadata,
1683                    commit_interval: new_export_description.sink.commit_interval,
1684                },
1685            };
1686
1687            let update = updates_by_instance
1688                .entry(new_export_description.instance_id)
1689                .or_default();
1690            update.push((cmd, new_export_description));
1691        }
1692
1693        for (instance_id, updates) in updates_by_instance {
1694            let mut export_updates = BTreeMap::new();
1695            let mut cmds = Vec::with_capacity(updates.len());
1696
1697            for (cmd, export_state) in updates {
1698                export_updates.insert(cmd.id, export_state);
1699                cmds.push(cmd);
1700            }
1701
1702            // Fetch the client for this exports's cluster.
1703            let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1704                StorageError::ExportInstanceMissing {
1705                    storage_instance_id: instance_id,
1706                    export_id: *export_updates
1707                        .keys()
1708                        .next()
1709                        .expect("set of exports not empty"),
1710                }
1711            })?;
1712
1713            for cmd in cmds {
1714                instance.send(StorageCommand::RunSink(Box::new(cmd)));
1715            }
1716
1717            // Update state only after all possible errors have occurred.
1718            for (id, new_export_description) in export_updates {
1719                let Some(state) = self.collections.get_mut(&id) else {
1720                    panic!("export known to exist")
1721                };
1722                let DataSource::Sink { desc } = &mut state.data_source else {
1723                    panic!("export known to exist")
1724                };
1725                *desc = new_export_description;
1726            }
1727        }
1728
1729        Ok(())
1730    }
1731
1732    // Dropping a table takes roughly the following flow:
1733    //
1734    // First determine if this is a TableWrites table or a source-fed table (an IngestionExport):
1735    //
1736    // If this is a TableWrites table:
1737    //   1. We remove the table from the persist table write worker.
1738    //   2. The table removal is awaited in an async task.
1739    //   3. A message is sent to the storage controller that the table has been removed from the
1740    //      table write worker.
1741    //   4. The controller drains all table drop messages during `process`.
1742    //   5. `process` calls `drop_sources` with the dropped tables.
1743    //
1744    // If this is an IngestionExport table:
1745    //   1. We validate the ids and then call drop_sources_unvalidated to proceed dropping.
1746    fn drop_tables(
1747        &mut self,
1748        storage_metadata: &StorageMetadata,
1749        identifiers: Vec<GlobalId>,
1750        ts: Timestamp,
1751    ) -> Result<(), StorageError> {
1752        // Collect tables by their data_source
1753        let (table_write_ids, data_source_ids): (Vec<_>, Vec<_>) = identifiers
1754            .into_iter()
1755            .partition(|id| match self.collections[id].data_source {
1756                DataSource::Table => true,
1757                DataSource::IngestionExport { .. } | DataSource::Webhook => false,
1758                _ => panic!("identifier is not a table: {}", id),
1759            });
1760
1761        // Drop table write tables
1762        if table_write_ids.len() > 0 {
1763            let drop_notif = self
1764                .persist_table_worker
1765                .drop_handles(table_write_ids.clone(), ts);
1766            let tx = self.pending_table_handle_drops_tx.clone();
1767            mz_ore::task::spawn(|| "table-cleanup".to_string(), async move {
1768                drop_notif.await;
1769                for identifier in table_write_ids {
1770                    let _ = tx.send(identifier);
1771                }
1772            });
1773        }
1774
1775        // Drop source-fed tables
1776        if data_source_ids.len() > 0 {
1777            self.validate_collection_ids(data_source_ids.iter().cloned())?;
1778            self.drop_sources_unvalidated(storage_metadata, data_source_ids)?;
1779        }
1780
1781        Ok(())
1782    }
1783
1784    fn drop_sources(
1785        &mut self,
1786        storage_metadata: &StorageMetadata,
1787        identifiers: Vec<GlobalId>,
1788    ) -> Result<(), StorageError> {
1789        self.validate_collection_ids(identifiers.iter().cloned())?;
1790        self.drop_sources_unvalidated(storage_metadata, identifiers)
1791    }
1792
1793    fn drop_sources_unvalidated(
1794        &mut self,
1795        storage_metadata: &StorageMetadata,
1796        ids: Vec<GlobalId>,
1797    ) -> Result<(), StorageError> {
1798        // Keep track of which ingestions we have to execute still, and which we
1799        // have to change because of dropped subsources.
1800        let mut ingestions_to_execute = BTreeSet::new();
1801        let mut ingestions_to_drop = BTreeSet::new();
1802        let mut source_statistics_to_drop = Vec::new();
1803
1804        // Ingestions (and their exports) are also collections, but we keep
1805        // track of non-ingestion collections separately, because they have
1806        // slightly different cleanup logic below.
1807        let mut collections_to_drop = Vec::new();
1808
1809        for id in ids.iter() {
1810            let collection_state = self.collections.get(id);
1811
1812            if let Some(collection_state) = collection_state {
1813                match collection_state.data_source {
1814                    DataSource::Webhook => {
1815                        // TODO(parkmycar): The Collection Manager and PersistMonotonicWriter
1816                        // could probably use some love and maybe get merged together?
1817                        let fut = self.collection_manager.unregister_collection(*id);
1818                        mz_ore::task::spawn(|| format!("storage-webhook-cleanup-{id}"), fut);
1819
1820                        collections_to_drop.push(*id);
1821                        source_statistics_to_drop.push(*id);
1822                    }
1823                    DataSource::Ingestion(_) => {
1824                        ingestions_to_drop.insert(*id);
1825                        source_statistics_to_drop.push(*id);
1826                    }
1827                    DataSource::IngestionExport { ingestion_id, .. } => {
1828                        // If we are dropping source exports, we need to modify the
1829                        // ingestion that it runs on.
1830                        //
1831                        // If we remove this export, we need to stop producing data to
1832                        // it, so plan to re-execute the ingestion with the amended
1833                        // description.
1834                        ingestions_to_execute.insert(ingestion_id);
1835
1836                        // Adjust the source to remove this export.
1837                        let ingestion_state = match self.collections.get_mut(&ingestion_id) {
1838                            Some(ingestion_collection) => ingestion_collection,
1839                            // Primary ingestion already dropped.
1840                            None => {
1841                                tracing::error!(
1842                                    "primary source {ingestion_id} seemingly dropped before subsource {id}"
1843                                );
1844                                continue;
1845                            }
1846                        };
1847
1848                        match &mut ingestion_state.data_source {
1849                            DataSource::Ingestion(ingestion_desc) => {
1850                                let removed = ingestion_desc.source_exports.remove(id);
1851                                mz_ore::soft_assert_or_log!(
1852                                    removed.is_some(),
1853                                    "dropped subsource {id} already removed from source exports"
1854                                );
1855                            }
1856                            _ => unreachable!(
1857                                "SourceExport must only refer to primary sources that already exist"
1858                            ),
1859                        };
1860
1861                        // Ingestion exports also have ReadHolds that we need to
1862                        // downgrade, and much of their drop machinery is the
1863                        // same as for the "main" ingestion.
1864                        ingestions_to_drop.insert(*id);
1865                        source_statistics_to_drop.push(*id);
1866                    }
1867                    DataSource::Progress | DataSource::Table | DataSource::Other => {
1868                        collections_to_drop.push(*id);
1869                    }
1870                    DataSource::Introspection(_) | DataSource::Sink { .. } => {
1871                        // Collections of these types are either not sources and should be dropped
1872                        // through other means, or are sources but should never be dropped.
1873                        soft_panic_or_log!(
1874                            "drop_sources called on a {:?} (id={id}))",
1875                            collection_state.data_source,
1876                        );
1877                    }
1878                }
1879            }
1880        }
1881
1882        // Do not bother re-executing ingestions we know we plan to drop.
1883        ingestions_to_execute.retain(|id| !ingestions_to_drop.contains(id));
1884        for ingestion_id in ingestions_to_execute {
1885            self.run_ingestion(ingestion_id)?;
1886        }
1887
1888        // For ingestions, we fabricate a new hold that will propagate through
1889        // the cluster and then back to us.
1890
1891        // We don't explicitly remove read capabilities! Downgrading the
1892        // frontier of the source to `[]` (the empty Antichain), will propagate
1893        // to the storage dependencies.
1894        let ingestion_policies = ingestions_to_drop
1895            .iter()
1896            .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
1897            .collect();
1898
1899        tracing::debug!(
1900            ?ingestion_policies,
1901            "dropping sources by setting read hold policies"
1902        );
1903        self.set_hold_policies(ingestion_policies);
1904
1905        // Delete all collection->shard mappings
1906        let shards_to_update: BTreeSet<_> = ingestions_to_drop
1907            .iter()
1908            .chain(collections_to_drop.iter())
1909            .cloned()
1910            .collect();
1911        self.append_shard_mappings(shards_to_update.into_iter(), Diff::MINUS_ONE);
1912
1913        let status_now = mz_ore::now::to_datetime((self.now)());
1914        let mut status_updates = vec![];
1915        for id in ingestions_to_drop.iter() {
1916            status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
1917        }
1918
1919        if !self.read_only {
1920            self.append_status_introspection_updates(
1921                IntrospectionType::SourceStatusHistory,
1922                status_updates,
1923            );
1924        }
1925
1926        {
1927            let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1928            for id in source_statistics_to_drop {
1929                source_statistics
1930                    .source_statistics
1931                    .retain(|(stats_id, _), _| stats_id != &id);
1932                source_statistics
1933                    .webhook_statistics
1934                    .retain(|stats_id, _| stats_id != &id);
1935            }
1936        }
1937
1938        // Remove collection state
1939        for id in ingestions_to_drop.iter().chain(collections_to_drop.iter()) {
1940            tracing::info!(%id, "dropping collection state");
1941            let collection = self
1942                .collections
1943                .remove(id)
1944                .expect("list populated after checking that self.collections contains it");
1945
1946            let instance = match &collection.extra_state {
1947                CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
1948                CollectionStateExtra::Export(export) => Some(export.cluster_id()),
1949                CollectionStateExtra::None => None,
1950            }
1951            .and_then(|i| self.instances.get(&i));
1952
1953            // Record which replicas were running a collection, so that we can
1954            // match DroppedId messages against them and eventually remove state
1955            // from self.dropped_objects
1956            if let Some(instance) = instance {
1957                let active_replicas = instance.get_active_replicas_for_object(id);
1958                if !active_replicas.is_empty() {
1959                    // The remap collection of an ingestion doesn't have extra
1960                    // state and doesn't have an instance_id, but we still get
1961                    // upper updates for them, so want to make sure to populate
1962                    // dropped_ids.
1963                    // TODO(aljoscha): All this is a bit icky. But here we are
1964                    // for now...
1965                    match &collection.data_source {
1966                        DataSource::Ingestion(ingestion_desc) => {
1967                            if *id != ingestion_desc.remap_collection_id {
1968                                self.dropped_objects.insert(
1969                                    ingestion_desc.remap_collection_id,
1970                                    active_replicas.clone(),
1971                                );
1972                            }
1973                        }
1974                        _ => {}
1975                    }
1976
1977                    self.dropped_objects.insert(*id, active_replicas);
1978                }
1979            }
1980        }
1981
1982        // Also let StorageCollections know!
1983        self.storage_collections
1984            .drop_collections_unvalidated(storage_metadata, ids);
1985
1986        Ok(())
1987    }
1988
1989    /// Drops the read capability for the sinks and allows their resources to be reclaimed.
1990    fn drop_sinks(
1991        &mut self,
1992        storage_metadata: &StorageMetadata,
1993        identifiers: Vec<GlobalId>,
1994    ) -> Result<(), StorageError> {
1995        self.validate_export_ids(identifiers.iter().cloned())?;
1996        self.drop_sinks_unvalidated(storage_metadata, identifiers);
1997        Ok(())
1998    }
1999
2000    fn drop_sinks_unvalidated(
2001        &mut self,
2002        storage_metadata: &StorageMetadata,
2003        mut sinks_to_drop: Vec<GlobalId>,
2004    ) {
2005        // Ignore exports that have already been removed.
2006        sinks_to_drop.retain(|id| self.export(*id).is_ok());
2007
2008        // TODO: ideally we'd advance the write frontier ourselves here, but this function's
2009        // not yet marked async.
2010
2011        // We don't explicitly remove read capabilities! Downgrading the
2012        // frontier of the source to `[]` (the empty Antichain), will propagate
2013        // to the storage dependencies.
2014        let drop_policy = sinks_to_drop
2015            .iter()
2016            .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
2017            .collect();
2018
2019        tracing::debug!(
2020            ?drop_policy,
2021            "dropping sources by setting read hold policies"
2022        );
2023        self.set_hold_policies(drop_policy);
2024
2025        // Record the drop status for all sink drops.
2026        //
2027        // We also delete the items' statistics objects.
2028        //
2029        // The locks are held for a short time, only while we do some removals from a map.
2030
2031        let status_now = mz_ore::now::to_datetime((self.now)());
2032
2033        // Record the drop status for all pending sink drops.
2034        let mut status_updates = vec![];
2035        {
2036            let mut sink_statistics = self.sink_statistics.lock().expect("poisoned");
2037            for id in sinks_to_drop.iter() {
2038                status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
2039                sink_statistics.retain(|(stats_id, _), _| stats_id != id);
2040            }
2041        }
2042
2043        if !self.read_only {
2044            self.append_status_introspection_updates(
2045                IntrospectionType::SinkStatusHistory,
2046                status_updates,
2047            );
2048        }
2049
2050        // Remove collection/export state
2051        for id in sinks_to_drop.iter() {
2052            tracing::info!(%id, "dropping export state");
2053            let collection = self
2054                .collections
2055                .remove(id)
2056                .expect("list populated after checking that self.collections contains it");
2057
2058            let instance = match &collection.extra_state {
2059                CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
2060                CollectionStateExtra::Export(export) => Some(export.cluster_id()),
2061                CollectionStateExtra::None => None,
2062            }
2063            .and_then(|i| self.instances.get(&i));
2064
2065            // Record how many replicas were running an export, so that we can
2066            // match `DroppedId` messages against it and eventually remove state
2067            // from `self.dropped_objects`.
2068            if let Some(instance) = instance {
2069                let active_replicas = instance.get_active_replicas_for_object(id);
2070                if !active_replicas.is_empty() {
2071                    self.dropped_objects.insert(*id, active_replicas);
2072                }
2073            }
2074        }
2075
2076        // Also let StorageCollections know!
2077        self.storage_collections
2078            .drop_collections_unvalidated(storage_metadata, sinks_to_drop);
2079    }
2080
2081    #[instrument(level = "debug")]
2082    fn append_table(
2083        &mut self,
2084        write_ts: Timestamp,
2085        advance_to: Timestamp,
2086        commands: Vec<(GlobalId, Vec<TableData>)>,
2087    ) -> Result<tokio::sync::oneshot::Receiver<Result<(), StorageError>>, StorageError> {
2088        if self.read_only {
2089            // While in read only mode, ONLY collections that have been migrated
2090            // and need to be re-hydrated in read only mode can be written to.
2091            if !commands
2092                .iter()
2093                .all(|(id, _)| id.is_system() && self.migrated_storage_collections.contains(id))
2094            {
2095                return Err(StorageError::ReadOnly);
2096            }
2097        }
2098
2099        // TODO(petrosagg): validate appends against the expected RelationDesc of the collection
2100        for (id, updates) in commands.iter() {
2101            if !updates.is_empty() {
2102                if !write_ts.less_than(&advance_to) {
2103                    return Err(StorageError::UpdateBeyondUpper(*id));
2104                }
2105            }
2106        }
2107
2108        Ok(self
2109            .persist_table_worker
2110            .append(write_ts, advance_to, commands))
2111    }
2112
2113    fn monotonic_appender(&self, id: GlobalId) -> Result<MonotonicAppender, StorageError> {
2114        self.collection_manager.monotonic_appender(id)
2115    }
2116
2117    fn webhook_statistics(&self, id: GlobalId) -> Result<Arc<WebhookStatistics>, StorageError> {
2118        // Call to this method are usually cached so the lock is not in the critical path.
2119        let source_statistics = self.source_statistics.lock().expect("poisoned");
2120        source_statistics
2121            .webhook_statistics
2122            .get(&id)
2123            .cloned()
2124            .ok_or(StorageError::IdentifierMissing(id))
2125    }
2126
2127    async fn ready(&mut self) {
2128        if self.maintenance_scheduled {
2129            return;
2130        }
2131
2132        if !self.pending_table_handle_drops_rx.is_empty() {
2133            return;
2134        }
2135
2136        tokio::select! {
2137            Some(m) = self.instance_response_rx.recv() => {
2138                self.stashed_responses.push(m);
2139                while let Ok(m) = self.instance_response_rx.try_recv() {
2140                    self.stashed_responses.push(m);
2141                }
2142            }
2143            _ = self.maintenance_ticker.tick() => {
2144                self.maintenance_scheduled = true;
2145            },
2146        };
2147    }
2148
2149    #[instrument(level = "debug")]
2150    fn process(
2151        &mut self,
2152        storage_metadata: &StorageMetadata,
2153    ) -> Result<Option<Response>, anyhow::Error> {
2154        // Perform periodic maintenance work.
2155        if self.maintenance_scheduled {
2156            self.maintain();
2157            self.maintenance_scheduled = false;
2158        }
2159
2160        for instance in self.instances.values_mut() {
2161            instance.rehydrate_failed_replicas();
2162        }
2163
2164        let mut status_updates = vec![];
2165        let mut updated_frontiers = BTreeMap::new();
2166
2167        // Take the currently stashed responses so that we can call mut receiver functions in the loop.
2168        let stashed_responses = std::mem::take(&mut self.stashed_responses);
2169        for resp in stashed_responses {
2170            match resp {
2171                (_replica_id, StorageResponse::FrontierUpper(id, upper)) => {
2172                    self.update_write_frontier(id, &upper);
2173                    updated_frontiers.insert(id, upper);
2174                }
2175                (replica_id, StorageResponse::DroppedId(id)) => {
2176                    let replica_id = replica_id.expect("DroppedId from unknown replica");
2177                    if let Some(remaining_replicas) = self.dropped_objects.get_mut(&id) {
2178                        remaining_replicas.remove(&replica_id);
2179                        if remaining_replicas.is_empty() {
2180                            self.dropped_objects.remove(&id);
2181                        }
2182                    } else {
2183                        soft_panic_or_log!("unexpected DroppedId for {id}");
2184                    }
2185                }
2186                (replica_id, StorageResponse::StatisticsUpdates(source_stats, sink_stats)) => {
2187                    // Note we only hold the locks while moving some plain-old-data around here.
2188                    {
2189                        // NOTE(aljoscha): We explicitly unwrap the `Option`,
2190                        // because we expect that stats coming from replicas
2191                        // have a replica id.
2192                        //
2193                        // If this is `None` and we would use that to access
2194                        // state below we might clobber something unexpectedly.
2195                        let replica_id = if let Some(replica_id) = replica_id {
2196                            replica_id
2197                        } else {
2198                            tracing::error!(
2199                                ?source_stats,
2200                                "missing replica_id for source statistics update"
2201                            );
2202                            continue;
2203                        };
2204
2205                        let mut shared_stats = self.source_statistics.lock().expect("poisoned");
2206
2207                        for stat in source_stats {
2208                            let collection_id = stat.id.clone();
2209
2210                            if self.collection(collection_id).is_err() {
2211                                // We can get updates for collections that have
2212                                // already been deleted, ignore those.
2213                                continue;
2214                            }
2215
2216                            let entry = shared_stats
2217                                .source_statistics
2218                                .entry((stat.id, Some(replica_id)));
2219
2220                            match entry {
2221                                btree_map::Entry::Vacant(vacant_entry) => {
2222                                    let mut stats = ControllerSourceStatistics::new(
2223                                        collection_id,
2224                                        Some(replica_id),
2225                                    );
2226                                    stats.incorporate(stat);
2227                                    vacant_entry.insert(stats);
2228                                }
2229                                btree_map::Entry::Occupied(mut occupied_entry) => {
2230                                    occupied_entry.get_mut().incorporate(stat);
2231                                }
2232                            }
2233                        }
2234                    }
2235
2236                    {
2237                        // NOTE(aljoscha); Same as above. We want to be
2238                        // explicit.
2239                        //
2240                        // Also, technically for sinks there is no webhook
2241                        // "sources" that would force us to use an `Option`. But
2242                        // we still have to use an option for other reasons: the
2243                        // scraper expects a trait for working with stats, and
2244                        // that in the end forces the sink stats map to also
2245                        // have `Option` in the key. Do I like that? No, but
2246                        // here we are.
2247                        let replica_id = if let Some(replica_id) = replica_id {
2248                            replica_id
2249                        } else {
2250                            tracing::error!(
2251                                ?sink_stats,
2252                                "missing replica_id for sink statistics update"
2253                            );
2254                            continue;
2255                        };
2256
2257                        let mut shared_stats = self.sink_statistics.lock().expect("poisoned");
2258
2259                        for stat in sink_stats {
2260                            let collection_id = stat.id.clone();
2261
2262                            if self.collection(collection_id).is_err() {
2263                                // We can get updates for collections that have
2264                                // already been deleted, ignore those.
2265                                continue;
2266                            }
2267
2268                            let entry = shared_stats.entry((stat.id, Some(replica_id)));
2269
2270                            match entry {
2271                                btree_map::Entry::Vacant(vacant_entry) => {
2272                                    let mut stats =
2273                                        ControllerSinkStatistics::new(collection_id, replica_id);
2274                                    stats.incorporate(stat);
2275                                    vacant_entry.insert(stats);
2276                                }
2277                                btree_map::Entry::Occupied(mut occupied_entry) => {
2278                                    occupied_entry.get_mut().incorporate(stat);
2279                                }
2280                            }
2281                        }
2282                    }
2283                }
2284                (replica_id, StorageResponse::StatusUpdate(mut status_update)) => {
2285                    // NOTE(aljoscha): We sniff out the hydration status for
2286                    // ingestions from status updates. This is the easiest we
2287                    // can do right now, without going deeper into changing the
2288                    // comms protocol between controller and cluster. We cannot,
2289                    // for example use `StorageResponse::FrontierUpper`,
2290                    // because those will already get sent when the ingestion is
2291                    // just being created.
2292                    //
2293                    // Sources differ in when they will report as Running. Kafka
2294                    // UPSERT sources will only switch to `Running` once their
2295                    // state has been initialized from persist, which is the
2296                    // first case that we care about right now.
2297                    //
2298                    // I wouldn't say it's ideal, but it's workable until we
2299                    // find something better.
2300                    match status_update.status {
2301                        Status::Running => {
2302                            let collection = self.collections.get_mut(&status_update.id);
2303                            match collection {
2304                                Some(collection) => {
2305                                    match collection.extra_state {
2306                                        CollectionStateExtra::Ingestion(
2307                                            ref mut ingestion_state,
2308                                        ) => {
2309                                            if ingestion_state.hydrated_on.is_empty() {
2310                                                tracing::debug!(ingestion_id = %status_update.id, "ingestion is hydrated");
2311                                            }
2312                                            ingestion_state.hydrated_on.insert(replica_id.expect(
2313                                                "replica id should be present for status running",
2314                                            ));
2315                                        }
2316                                        CollectionStateExtra::Export(_) => {
2317                                            // TODO(sinks): track sink hydration?
2318                                        }
2319                                        CollectionStateExtra::None => {
2320                                            // Nothing to do
2321                                        }
2322                                    }
2323                                }
2324                                None => (), // no collection, let's say that's fine
2325                                            // here
2326                            }
2327                        }
2328                        Status::Paused => {
2329                            let collection = self.collections.get_mut(&status_update.id);
2330                            match collection {
2331                                Some(collection) => {
2332                                    match collection.extra_state {
2333                                        CollectionStateExtra::Ingestion(
2334                                            ref mut ingestion_state,
2335                                        ) => {
2336                                            // TODO: Paused gets send when there
2337                                            // are no active replicas. We should
2338                                            // change this to send a targeted
2339                                            // Pause for each replica, and do
2340                                            // more fine-grained hydration
2341                                            // tracking here.
2342                                            tracing::debug!(ingestion_id = %status_update.id, "ingestion is now paused");
2343                                            ingestion_state.hydrated_on.clear();
2344                                        }
2345                                        CollectionStateExtra::Export(_) => {
2346                                            // TODO(sinks): track sink hydration?
2347                                        }
2348                                        CollectionStateExtra::None => {
2349                                            // Nothing to do
2350                                        }
2351                                    }
2352                                }
2353                                None => (), // no collection, let's say that's fine
2354                                            // here
2355                            }
2356                        }
2357                        _ => (),
2358                    }
2359
2360                    // Set replica_id in the status update if available
2361                    if let Some(id) = replica_id {
2362                        status_update.replica_id = Some(id);
2363                    }
2364                    status_updates.push(status_update);
2365                }
2366                (_replica_id, StorageResponse::StagedBatches(batches)) => {
2367                    for (ingestion_id, batches) in batches {
2368                        match self.pending_oneshot_ingestions.remove(&ingestion_id) {
2369                            Some(pending) => {
2370                                // Send a cancel command so our command history is correct. And to
2371                                // avoid duplicate work once we have active replication.
2372                                if let Some(instance) = self.instances.get_mut(&pending.cluster_id)
2373                                {
2374                                    instance
2375                                        .send(StorageCommand::CancelOneshotIngestion(ingestion_id));
2376                                }
2377                                // Send the results down our channel.
2378                                (pending.result_tx)(batches)
2379                            }
2380                            None => {
2381                                // We might not be tracking this oneshot ingestion anymore because
2382                                // it was canceled.
2383                            }
2384                        }
2385                    }
2386                }
2387            }
2388        }
2389
2390        self.record_status_updates(status_updates);
2391
2392        // Process dropped tables in a single batch.
2393        let mut dropped_table_ids = Vec::new();
2394        while let Ok(dropped_id) = self.pending_table_handle_drops_rx.try_recv() {
2395            dropped_table_ids.push(dropped_id);
2396        }
2397        if !dropped_table_ids.is_empty() {
2398            self.drop_sources(storage_metadata, dropped_table_ids)?;
2399        }
2400
2401        if updated_frontiers.is_empty() {
2402            Ok(None)
2403        } else {
2404            Ok(Some(Response::FrontierUpdates(
2405                updated_frontiers.into_iter().collect(),
2406            )))
2407        }
2408    }
2409
2410    async fn inspect_persist_state(
2411        &self,
2412        id: GlobalId,
2413    ) -> Result<serde_json::Value, anyhow::Error> {
2414        let collection = &self.storage_collections.collection_metadata(id)?;
2415        let client = self
2416            .persist
2417            .open(collection.persist_location.clone())
2418            .await?;
2419        let shard_state = client
2420            .inspect_shard::<Timestamp>(&collection.data_shard)
2421            .await?;
2422        let json_state = serde_json::to_value(shard_state)?;
2423        Ok(json_state)
2424    }
2425
2426    fn append_introspection_updates(
2427        &mut self,
2428        type_: IntrospectionType,
2429        updates: Vec<(Row, Diff)>,
2430    ) {
2431        let id = self.introspection_ids[&type_];
2432        let updates = updates.into_iter().map(|update| update.into()).collect();
2433        self.collection_manager.blind_write(id, updates);
2434    }
2435
2436    fn append_status_introspection_updates(
2437        &mut self,
2438        type_: IntrospectionType,
2439        updates: Vec<StatusUpdate>,
2440    ) {
2441        let id = self.introspection_ids[&type_];
2442        let updates: Vec<_> = updates.into_iter().map(|update| update.into()).collect();
2443        if !updates.is_empty() {
2444            self.collection_manager.blind_write(id, updates);
2445        }
2446    }
2447
2448    fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp) {
2449        let id = self.introspection_ids[&type_];
2450        self.collection_manager.differential_write(id, op);
2451    }
2452
2453    fn append_only_introspection_tx(
2454        &self,
2455        type_: IntrospectionType,
2456    ) -> mpsc::UnboundedSender<(
2457        Vec<AppendOnlyUpdate>,
2458        oneshot::Sender<Result<(), StorageError>>,
2459    )> {
2460        let id = self.introspection_ids[&type_];
2461        self.collection_manager.append_only_write_sender(id)
2462    }
2463
2464    fn differential_introspection_tx(
2465        &self,
2466        type_: IntrospectionType,
2467    ) -> mpsc::UnboundedSender<(StorageWriteOp, oneshot::Sender<Result<(), StorageError>>)> {
2468        let id = self.introspection_ids[&type_];
2469        self.collection_manager.differential_write_sender(id)
2470    }
2471
2472    async fn real_time_recent_timestamp(
2473        &self,
2474        timestamp_objects: BTreeSet<GlobalId>,
2475        timeout: Duration,
2476    ) -> Result<BoxFuture<Result<Timestamp, StorageError>>, StorageError> {
2477        use mz_storage_types::sources::GenericSourceConnection;
2478
2479        let mut rtr_futures = BTreeMap::new();
2480
2481        // Only user sources can be read from w/ RTR.
2482        for id in timestamp_objects.into_iter().filter(GlobalId::is_user) {
2483            let collection = match self.collection(id) {
2484                Ok(c) => c,
2485                // Not a storage item, which we accept.
2486                Err(_) => continue,
2487            };
2488
2489            let (source_conn, remap_id) = match &collection.data_source {
2490                DataSource::Ingestion(IngestionDescription {
2491                    desc: SourceDesc { connection, .. },
2492                    remap_collection_id,
2493                    ..
2494                }) => match connection {
2495                    GenericSourceConnection::Kafka(_)
2496                    | GenericSourceConnection::Postgres(_)
2497                    | GenericSourceConnection::MySql(_)
2498                    | GenericSourceConnection::SqlServer(_) => {
2499                        (connection.clone(), *remap_collection_id)
2500                    }
2501
2502                    // These internal sources do not yet (and might never)
2503                    // support RTR. However, erroring if they're selected from
2504                    // poses an annoying user experience, so instead just skip
2505                    // over them.
2506                    GenericSourceConnection::LoadGenerator(_) => continue,
2507                },
2508                // Skip over all other objects
2509                _ => {
2510                    continue;
2511                }
2512            };
2513
2514            // Prepare for getting the external system's frontier.
2515            let config = self.config().clone();
2516
2517            // Determine the remap collection we plan to read from.
2518            //
2519            // Note that the process of reading from the remap shard is the same
2520            // as other areas in this code that do the same thing, but we inline
2521            // it here because we must prove that we have not taken ownership of
2522            // `self` to move the stream of data from the remap shard into a
2523            // future.
2524            let read_handle = self.read_handle_for_snapshot(remap_id).await?;
2525
2526            // Have to acquire a read hold to prevent the since from advancing
2527            // while we read.
2528            let remap_read_hold = self
2529                .storage_collections
2530                .acquire_read_holds(vec![remap_id])
2531                .map_err(|_e| StorageError::ReadBeforeSince(remap_id))?
2532                .expect_element(|| "known to be exactly one");
2533
2534            let remap_as_of = remap_read_hold
2535                .since()
2536                .to_owned()
2537                .into_option()
2538                .ok_or(StorageError::ReadBeforeSince(remap_id))?;
2539
2540            rtr_futures.insert(
2541                id,
2542                tokio::time::timeout(timeout, async move {
2543                    use mz_storage_types::sources::SourceConnection as _;
2544
2545                    // Fetch the remap shard's contents; we must do this first so
2546                    // that the `as_of` doesn't change.
2547                    let as_of = Antichain::from_elem(remap_as_of);
2548                    let remap_subscribe = read_handle
2549                        .subscribe(as_of.clone())
2550                        .await
2551                        .map_err(|_| StorageError::ReadBeforeSince(remap_id))?;
2552
2553                    tracing::debug!(?id, type_ = source_conn.name(), upstream = ?source_conn.external_reference(), "fetching real time recency");
2554
2555                    let result = rtr::real_time_recency_ts(
2556                        source_conn,
2557                        id,
2558                        config,
2559                        as_of,
2560                        remap_subscribe,
2561                    )
2562                    .await
2563                    .map_err(|e| {
2564                            tracing::debug!(?id, "real time recency error: {:?}", e);
2565                            e
2566                        });
2567
2568                    // Drop once we have read succesfully.
2569                    drop(remap_read_hold);
2570
2571                    result
2572                }),
2573            );
2574        }
2575
2576        Ok(Box::pin(async move {
2577            let (ids, futs): (Vec<_>, Vec<_>) = rtr_futures.into_iter().unzip();
2578            ids.into_iter()
2579                .zip_eq(futures::future::join_all(futs).await)
2580                .try_fold(Timestamp::MIN, |curr, (id, per_source_res)| {
2581                    let new =
2582                        per_source_res.map_err(|_e: Elapsed| StorageError::RtrTimeout(id))??;
2583                    Ok::<_, StorageError>(std::cmp::max(curr, new))
2584                })
2585        }))
2586    }
2587
2588    fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2589        // Destructure `self` here so we don't forget to consider dumping newly added fields.
2590        let Self {
2591            build_info: _,
2592            now: _,
2593            read_only,
2594            collections,
2595            dropped_objects,
2596            persist_table_worker: _,
2597            txns_read: _,
2598            txns_metrics: _,
2599            stashed_responses,
2600            pending_table_handle_drops_tx: _,
2601            pending_table_handle_drops_rx: _,
2602            pending_oneshot_ingestions,
2603            collection_manager: _,
2604            introspection_ids,
2605            introspection_tokens: _,
2606            source_statistics: _,
2607            sink_statistics: _,
2608            statistics_interval_sender: _,
2609            instances,
2610            initialized,
2611            config,
2612            persist_location,
2613            persist: _,
2614            metrics: _,
2615            recorded_frontiers,
2616            recorded_replica_frontiers,
2617            wallclock_lag: _,
2618            wallclock_lag_last_recorded,
2619            storage_collections: _,
2620            migrated_storage_collections,
2621            maintenance_ticker: _,
2622            maintenance_scheduled,
2623            instance_response_tx: _,
2624            instance_response_rx: _,
2625            persist_warm_task: _,
2626        } = self;
2627
2628        let collections: BTreeMap<_, _> = collections
2629            .iter()
2630            .map(|(id, c)| (id.to_string(), format!("{c:?}")))
2631            .collect();
2632        let dropped_objects: BTreeMap<_, _> = dropped_objects
2633            .iter()
2634            .map(|(id, rs)| (id.to_string(), format!("{rs:?}")))
2635            .collect();
2636        let stashed_responses: Vec<_> =
2637            stashed_responses.iter().map(|r| format!("{r:?}")).collect();
2638        let pending_oneshot_ingestions: BTreeMap<_, _> = pending_oneshot_ingestions
2639            .iter()
2640            .map(|(uuid, i)| (uuid.to_string(), format!("{i:?}")))
2641            .collect();
2642        let introspection_ids: BTreeMap<_, _> = introspection_ids
2643            .iter()
2644            .map(|(typ, id)| (format!("{typ:?}"), id.to_string()))
2645            .collect();
2646        let instances: BTreeMap<_, _> = instances
2647            .iter()
2648            .map(|(id, i)| (id.to_string(), format!("{i:?}")))
2649            .collect();
2650        let recorded_frontiers: BTreeMap<_, _> = recorded_frontiers
2651            .iter()
2652            .map(|(id, fs)| (id.to_string(), format!("{fs:?}")))
2653            .collect();
2654        let recorded_replica_frontiers: Vec<_> = recorded_replica_frontiers
2655            .iter()
2656            .map(|((gid, rid), f)| (gid.to_string(), rid.to_string(), format!("{f:?}")))
2657            .collect();
2658        let migrated_storage_collections: Vec<_> = migrated_storage_collections
2659            .iter()
2660            .map(|id| id.to_string())
2661            .collect();
2662
2663        Ok(serde_json::json!({
2664            "read_only": read_only,
2665            "collections": collections,
2666            "dropped_objects": dropped_objects,
2667            "stashed_responses": stashed_responses,
2668            "pending_oneshot_ingestions": pending_oneshot_ingestions,
2669            "introspection_ids": introspection_ids,
2670            "instances": instances,
2671            "initialized": initialized,
2672            "config": format!("{config:?}"),
2673            "persist_location": format!("{persist_location:?}"),
2674            "recorded_frontiers": recorded_frontiers,
2675            "recorded_replica_frontiers": recorded_replica_frontiers,
2676            "wallclock_lag_last_recorded": format!("{wallclock_lag_last_recorded:?}"),
2677            "migrated_storage_collections": migrated_storage_collections,
2678            "maintenance_scheduled": maintenance_scheduled,
2679        }))
2680    }
2681}
2682
2683/// Seed [`StorageTxn`] with any state required to instantiate a
2684/// [`StorageController`].
2685///
2686/// This cannot be a member of [`StorageController`] because it cannot take a
2687/// `self` parameter.
2688///
2689pub fn prepare_initialization(txn: &mut dyn StorageTxn) -> Result<(), StorageError> {
2690    if txn.get_txn_wal_shard().is_none() {
2691        let txns_id = ShardId::new();
2692        txn.write_txn_wal_shard(txns_id)?;
2693    }
2694
2695    Ok(())
2696}
2697
2698impl Controller
2699where
2700    Self: StorageController,
2701{
2702    /// Create a new storage controller from a client it should wrap.
2703    ///
2704    /// Note that when creating a new storage controller, you must also
2705    /// reconcile it with the previous state.
2706    ///
2707    /// # Panics
2708    /// If this function is called before [`prepare_initialization`].
2709    pub async fn new(
2710        build_info: &'static BuildInfo,
2711        persist_location: PersistLocation,
2712        persist_clients: Arc<PersistClientCache>,
2713        now: NowFn,
2714        wallclock_lag: WallclockLagFn<Timestamp>,
2715        txns_metrics: Arc<TxnMetrics>,
2716        read_only: bool,
2717        metrics_registry: &MetricsRegistry,
2718        controller_metrics: ControllerMetrics,
2719        connection_context: ConnectionContext,
2720        txn: &dyn StorageTxn,
2721        storage_collections: Arc<dyn StorageCollections + Send + Sync>,
2722    ) -> Self {
2723        let txns_client = persist_clients
2724            .open(persist_location.clone())
2725            .await
2726            .expect("location should be valid");
2727
2728        let persist_warm_task = warm_persist_state_in_background(
2729            txns_client.clone(),
2730            txn.get_collection_metadata().into_values(),
2731        );
2732        let persist_warm_task = Some(persist_warm_task.abort_on_drop());
2733
2734        // This value must be already installed because we must ensure it's
2735        // durably recorded before it is used, otherwise we risk leaking persist
2736        // state.
2737        let txns_id = txn
2738            .get_txn_wal_shard()
2739            .expect("must call prepare initialization before creating storage controller");
2740
2741        let persist_table_worker = if read_only {
2742            let txns_write = txns_client
2743                .open_writer(
2744                    txns_id,
2745                    Arc::new(TxnsCodecRow::desc()),
2746                    Arc::new(UnitSchema),
2747                    Diagnostics {
2748                        shard_name: "txns".to_owned(),
2749                        handle_purpose: "follow txns upper".to_owned(),
2750                    },
2751                )
2752                .await
2753                .expect("txns schema shouldn't change");
2754            persist_handles::PersistTableWriteWorker::new_read_only_mode(txns_write)
2755        } else {
2756            let mut txns = TxnsHandle::open(
2757                Timestamp::MIN,
2758                txns_client.clone(),
2759                txns_client.dyncfgs().clone(),
2760                Arc::clone(&txns_metrics),
2761                txns_id,
2762                Opaque::encode(&PersistEpoch::default()),
2763            )
2764            .await;
2765            txns.upgrade_version().await;
2766            persist_handles::PersistTableWriteWorker::new_txns(txns)
2767        };
2768        let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
2769
2770        let collection_manager = collection_mgmt::CollectionManager::new(read_only, now.clone());
2771
2772        let introspection_ids = BTreeMap::new();
2773        let introspection_tokens = Arc::new(Mutex::new(BTreeMap::new()));
2774
2775        let (statistics_interval_sender, _) =
2776            channel(mz_storage_types::parameters::STATISTICS_INTERVAL_DEFAULT);
2777
2778        let (pending_table_handle_drops_tx, pending_table_handle_drops_rx) =
2779            tokio::sync::mpsc::unbounded_channel();
2780
2781        let mut maintenance_ticker = tokio::time::interval(Duration::from_secs(1));
2782        maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2783
2784        let (instance_response_tx, instance_response_rx) = mpsc::unbounded_channel();
2785
2786        let metrics = StorageControllerMetrics::new(metrics_registry, controller_metrics);
2787
2788        let now_dt = mz_ore::now::to_datetime(now());
2789
2790        Self {
2791            build_info,
2792            collections: BTreeMap::default(),
2793            dropped_objects: Default::default(),
2794            persist_table_worker,
2795            txns_read,
2796            txns_metrics,
2797            stashed_responses: vec![],
2798            pending_table_handle_drops_tx,
2799            pending_table_handle_drops_rx,
2800            pending_oneshot_ingestions: BTreeMap::default(),
2801            collection_manager,
2802            introspection_ids,
2803            introspection_tokens,
2804            now,
2805            read_only,
2806            source_statistics: Arc::new(Mutex::new(statistics::SourceStatistics {
2807                source_statistics: BTreeMap::new(),
2808                webhook_statistics: BTreeMap::new(),
2809            })),
2810            sink_statistics: Arc::new(Mutex::new(BTreeMap::new())),
2811            statistics_interval_sender,
2812            instances: BTreeMap::new(),
2813            initialized: false,
2814            config: StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs()),
2815            persist_location,
2816            persist: persist_clients,
2817            metrics,
2818            recorded_frontiers: BTreeMap::new(),
2819            recorded_replica_frontiers: BTreeMap::new(),
2820            wallclock_lag,
2821            wallclock_lag_last_recorded: now_dt,
2822            storage_collections,
2823            migrated_storage_collections: BTreeSet::new(),
2824            maintenance_ticker,
2825            maintenance_scheduled: false,
2826            instance_response_rx,
2827            instance_response_tx,
2828            persist_warm_task,
2829        }
2830    }
2831
2832    // This is different from `set_read_policies`, which is for external users.
2833    // This method is for setting the policy that the controller uses when
2834    // maintaining the read holds that it has for collections/exports at the
2835    // StorageCollections.
2836    //
2837    // This is really only used when dropping things, where we set the
2838    // ReadPolicy to the empty Antichain.
2839    #[instrument(level = "debug")]
2840    fn set_hold_policies(&mut self, policies: Vec<(GlobalId, ReadPolicy)>) {
2841        let mut read_capability_changes = BTreeMap::default();
2842
2843        for (id, policy) in policies.into_iter() {
2844            if let Some(collection) = self.collections.get_mut(&id) {
2845                let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state
2846                {
2847                    CollectionStateExtra::Ingestion(ingestion) => (
2848                        ingestion.write_frontier.borrow(),
2849                        &mut ingestion.derived_since,
2850                        &mut ingestion.hold_policy,
2851                    ),
2852                    CollectionStateExtra::None => {
2853                        unreachable!("set_hold_policies is only called for ingestions");
2854                    }
2855                    CollectionStateExtra::Export(export) => (
2856                        export.write_frontier.borrow(),
2857                        &mut export.derived_since,
2858                        &mut export.read_policy,
2859                    ),
2860                };
2861
2862                let new_derived_since = policy.frontier(write_frontier);
2863                let mut update = swap_updates(derived_since, new_derived_since);
2864                if !update.is_empty() {
2865                    read_capability_changes.insert(id, update);
2866                }
2867
2868                *hold_policy = policy;
2869            }
2870        }
2871
2872        if !read_capability_changes.is_empty() {
2873            self.update_hold_capabilities(&mut read_capability_changes);
2874        }
2875    }
2876
2877    #[instrument(level = "debug", fields(updates))]
2878    fn update_write_frontier(&mut self, id: GlobalId, new_upper: &Antichain<Timestamp>) {
2879        let mut read_capability_changes = BTreeMap::default();
2880
2881        if let Some(collection) = self.collections.get_mut(&id) {
2882            let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state {
2883                CollectionStateExtra::Ingestion(ingestion) => (
2884                    &mut ingestion.write_frontier,
2885                    &mut ingestion.derived_since,
2886                    &ingestion.hold_policy,
2887                ),
2888                CollectionStateExtra::None => {
2889                    if matches!(collection.data_source, DataSource::Progress) {
2890                        // We do get these, but can't do anything with it!
2891                    } else {
2892                        tracing::error!(
2893                            ?collection,
2894                            ?new_upper,
2895                            "updated write frontier for collection which is not an ingestion"
2896                        );
2897                    }
2898                    return;
2899                }
2900                CollectionStateExtra::Export(export) => (
2901                    &mut export.write_frontier,
2902                    &mut export.derived_since,
2903                    &export.read_policy,
2904                ),
2905            };
2906
2907            if PartialOrder::less_than(write_frontier, new_upper) {
2908                write_frontier.clone_from(new_upper);
2909            }
2910
2911            let new_derived_since = hold_policy.frontier(write_frontier.borrow());
2912            let mut update = swap_updates(derived_since, new_derived_since);
2913            if !update.is_empty() {
2914                read_capability_changes.insert(id, update);
2915            }
2916        } else if self.dropped_objects.contains_key(&id) {
2917            // We dropped an object but might still get updates from cluster
2918            // side, before it notices the drop. This is expected and fine.
2919        } else {
2920            soft_panic_or_log!("spurious upper update for {id}: {new_upper:?}");
2921        }
2922
2923        if !read_capability_changes.is_empty() {
2924            self.update_hold_capabilities(&mut read_capability_changes);
2925        }
2926    }
2927
2928    // This is different from `update_read_capabilities`, which is for external users.
2929    // This method is for maintaining the read holds that the controller has at
2930    // the StorageCollections, for storage dependencies.
2931    #[instrument(level = "debug", fields(updates))]
2932    fn update_hold_capabilities(
2933        &mut self,
2934        updates: &mut BTreeMap<GlobalId, ChangeBatch<Timestamp>>,
2935    ) {
2936        // Location to record consequences that we need to act on.
2937        let mut collections_net = BTreeMap::new();
2938
2939        // We must not rely on any specific relative ordering of `GlobalId`s.
2940        // That said, it is reasonable to assume that collections generally have
2941        // greater IDs than their dependencies, so starting with the largest is
2942        // a useful optimization.
2943        while let Some(key) = updates.keys().rev().next().cloned() {
2944            let mut update = updates.remove(&key).unwrap();
2945
2946            if key.is_user() {
2947                debug!(id = %key, ?update, "update_hold_capability");
2948            }
2949
2950            if let Some(collection) = self.collections.get_mut(&key) {
2951                match &mut collection.extra_state {
2952                    CollectionStateExtra::Ingestion(ingestion) => {
2953                        let changes = ingestion.read_capabilities.update_iter(update.drain());
2954                        update.extend(changes);
2955
2956                        let (changes, frontier, _cluster_id) =
2957                            collections_net.entry(key).or_insert_with(|| {
2958                                (
2959                                    <ChangeBatch<_>>::new(),
2960                                    Antichain::new(),
2961                                    ingestion.instance_id,
2962                                )
2963                            });
2964
2965                        changes.extend(update.drain());
2966                        *frontier = ingestion.read_capabilities.frontier().to_owned();
2967                    }
2968                    CollectionStateExtra::None => {
2969                        // WIP: See if this ever panics in ci.
2970                        soft_panic_or_log!(
2971                            "trying to update holds for collection {collection:?} which is not \
2972                             an ingestion: {update:?}"
2973                        );
2974                        continue;
2975                    }
2976                    CollectionStateExtra::Export(export) => {
2977                        let changes = export.read_capabilities.update_iter(update.drain());
2978                        update.extend(changes);
2979
2980                        let (changes, frontier, _cluster_id) =
2981                            collections_net.entry(key).or_insert_with(|| {
2982                                (<ChangeBatch<_>>::new(), Antichain::new(), export.cluster_id)
2983                            });
2984
2985                        changes.extend(update.drain());
2986                        *frontier = export.read_capabilities.frontier().to_owned();
2987                    }
2988                }
2989            } else {
2990                // This is confusing and we should probably error.
2991                tracing::warn!(id = ?key, ?update, "update_hold_capabilities for unknown object");
2992            }
2993        }
2994
2995        // Translate our net compute actions into `AllowCompaction` commands and
2996        // downgrade persist sinces.
2997        for (key, (mut changes, frontier, cluster_id)) in collections_net {
2998            if !changes.is_empty() {
2999                if key.is_user() {
3000                    debug!(id = %key, ?frontier, "downgrading ingestion read holds!");
3001                }
3002
3003                let collection = self
3004                    .collections
3005                    .get_mut(&key)
3006                    .expect("missing collection state");
3007
3008                let read_holds = match &mut collection.extra_state {
3009                    CollectionStateExtra::Ingestion(ingestion) => {
3010                        ingestion.dependency_read_holds.as_mut_slice()
3011                    }
3012                    CollectionStateExtra::Export(export) => export.read_holds.as_mut_slice(),
3013                    CollectionStateExtra::None => {
3014                        soft_panic_or_log!(
3015                            "trying to downgrade read holds for collection which is not an \
3016                             ingestion: {collection:?}"
3017                        );
3018                        continue;
3019                    }
3020                };
3021
3022                for read_hold in read_holds.iter_mut() {
3023                    read_hold
3024                        .try_downgrade(frontier.clone())
3025                        .expect("we only advance the frontier");
3026                }
3027
3028                // Send AllowCompaction command directly to the instance
3029                if let Some(instance) = self.instances.get_mut(&cluster_id) {
3030                    instance.send(StorageCommand::AllowCompaction(key, frontier.clone()));
3031                } else {
3032                    soft_panic_or_log!(
3033                        "missing instance client for cluster {cluster_id} while we still have outstanding AllowCompaction command {frontier:?} for {key}"
3034                    );
3035                }
3036            }
3037        }
3038    }
3039
3040    /// Validate that a collection exists for all identifiers, and error if any do not.
3041    fn validate_collection_ids(
3042        &self,
3043        ids: impl Iterator<Item = GlobalId>,
3044    ) -> Result<(), StorageError> {
3045        for id in ids {
3046            self.storage_collections.check_exists(id)?;
3047        }
3048        Ok(())
3049    }
3050
3051    /// Validate that a collection exists for all identifiers, and error if any do not.
3052    fn validate_export_ids(&self, ids: impl Iterator<Item = GlobalId>) -> Result<(), StorageError> {
3053        for id in ids {
3054            self.export(id)?;
3055        }
3056        Ok(())
3057    }
3058
3059    /// Opens a write and critical since handles for the given `shard`.
3060    ///
3061    /// `since` is an optional `since` that the read handle will be forwarded to if it is less than
3062    /// its current since.
3063    ///
3064    /// This will `halt!` the process if we cannot successfully acquire a critical handle with our
3065    /// current epoch.
3066    async fn open_data_handles(
3067        &self,
3068        id: &GlobalId,
3069        shard: ShardId,
3070        relation_desc: RelationDesc,
3071        persist_client: &PersistClient,
3072    ) -> WriteHandle<SourceData, (), Timestamp, StorageDiff> {
3073        let diagnostics = Diagnostics {
3074            shard_name: id.to_string(),
3075            handle_purpose: format!("controller data for {}", id),
3076        };
3077
3078        let mut write = persist_client
3079            .open_writer(
3080                shard,
3081                Arc::new(relation_desc),
3082                Arc::new(UnitSchema),
3083                diagnostics.clone(),
3084            )
3085            .await
3086            .expect("invalid persist usage");
3087
3088        // N.B.
3089        // Fetch the most recent upper for the write handle. Otherwise, this may be behind
3090        // the since of the since handle. Its vital this happens AFTER we create
3091        // the since handle as it needs to be linearized with that operation. It may be true
3092        // that creating the write handle after the since handle already ensures this, but we
3093        // do this out of an abundance of caution.
3094        //
3095        // Note that this returns the upper, but also sets it on the handle to be fetched later.
3096        write.fetch_recent_upper().await;
3097
3098        write
3099    }
3100
3101    /// Registers the given introspection collection and does any preparatory
3102    /// work that we have to do before we start writing to it. This
3103    /// preparatory work will include partial truncation or other cleanup
3104    /// schemes, depending on introspection type.
3105    fn register_introspection_collection(
3106        &mut self,
3107        id: GlobalId,
3108        introspection_type: IntrospectionType,
3109        write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
3110        persist_client: PersistClient,
3111    ) -> Result<(), StorageError> {
3112        tracing::info!(%id, ?introspection_type, "registering introspection collection");
3113
3114        // In read-only mode we create a new shard for all migrated storage collections. So we
3115        // "trick" the write task into thinking that it's not in read-only mode so something is
3116        // advancing this new shard.
3117        let force_writable = self.read_only && self.migrated_storage_collections.contains(&id);
3118        if force_writable {
3119            assert!(id.is_system(), "unexpected non-system global id: {id:?}");
3120            info!("writing to migrated storage collection {id} in read-only mode");
3121        }
3122
3123        let prev = self.introspection_ids.insert(introspection_type, id);
3124        assert!(
3125            prev.is_none(),
3126            "cannot have multiple IDs for introspection type"
3127        );
3128
3129        let metadata = self.storage_collections.collection_metadata(id)?.clone();
3130
3131        let read_handle_fn = move || {
3132            let persist_client = persist_client.clone();
3133            let metadata = metadata.clone();
3134
3135            let fut = async move {
3136                let read_handle = persist_client
3137                    .open_leased_reader::<SourceData, (), Timestamp, StorageDiff>(
3138                        metadata.data_shard,
3139                        Arc::new(metadata.relation_desc.clone()),
3140                        Arc::new(UnitSchema),
3141                        Diagnostics {
3142                            shard_name: id.to_string(),
3143                            handle_purpose: format!("snapshot {}", id),
3144                        },
3145                        USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3146                    )
3147                    .await
3148                    .expect("invalid persist usage");
3149                read_handle
3150            };
3151
3152            fut.boxed()
3153        };
3154
3155        let recent_upper = write_handle.shared_upper();
3156
3157        match CollectionManagerKind::from(&introspection_type) {
3158            // For these, we first register the collection and then prepare it,
3159            // because the code that prepares differential collection expects to
3160            // be able to update desired state via the collection manager
3161            // already.
3162            CollectionManagerKind::Differential => {
3163                let statistics_retention_duration =
3164                    dyncfgs::STATISTICS_RETENTION_DURATION.get(self.config().config_set());
3165
3166                // These do a shallow copy.
3167                let introspection_config = DifferentialIntrospectionConfig {
3168                    recent_upper,
3169                    introspection_type,
3170                    storage_collections: Arc::clone(&self.storage_collections),
3171                    collection_manager: self.collection_manager.clone(),
3172                    source_statistics: Arc::clone(&self.source_statistics),
3173                    sink_statistics: Arc::clone(&self.sink_statistics),
3174                    statistics_interval: self.config.parameters.statistics_interval.clone(),
3175                    statistics_interval_receiver: self.statistics_interval_sender.subscribe(),
3176                    statistics_retention_duration,
3177                    metrics: self.metrics.clone(),
3178                    introspection_tokens: Arc::clone(&self.introspection_tokens),
3179                };
3180                self.collection_manager.register_differential_collection(
3181                    id,
3182                    write_handle,
3183                    read_handle_fn,
3184                    force_writable,
3185                    introspection_config,
3186                );
3187            }
3188            // For these, we first have to prepare and then register with
3189            // collection manager, because the preparation logic wants to read
3190            // the shard's contents and then do uncontested writes.
3191            //
3192            // TODO(aljoscha): We should make the truncation/cleanup work that
3193            // happens when we take over instead be a periodic thing, and make
3194            // it resilient to the upper moving concurrently.
3195            CollectionManagerKind::AppendOnly => {
3196                let introspection_config = AppendOnlyIntrospectionConfig {
3197                    introspection_type,
3198                    config_set: Arc::clone(self.config.config_set()),
3199                    parameters: self.config.parameters.clone(),
3200                    storage_collections: Arc::clone(&self.storage_collections),
3201                };
3202                self.collection_manager.register_append_only_collection(
3203                    id,
3204                    write_handle,
3205                    force_writable,
3206                    Some(introspection_config),
3207                );
3208            }
3209        }
3210
3211        Ok(())
3212    }
3213
3214    /// Remove statistics for sources/sinks that were dropped but still have statistics rows
3215    /// hanging around.
3216    fn reconcile_dangling_statistics(&self) {
3217        self.source_statistics
3218            .lock()
3219            .expect("poisoned")
3220            .source_statistics
3221            // collections should also contain subsources.
3222            .retain(|(k, _replica_id), _| self.storage_collections.check_exists(*k).is_ok());
3223        self.sink_statistics
3224            .lock()
3225            .expect("poisoned")
3226            .retain(|(k, _replica_id), _| self.export(*k).is_ok());
3227    }
3228
3229    /// Appends a new global ID, shard ID pair to the appropriate collection.
3230    /// Use a `diff` of 1 to append a new entry; -1 to retract an existing
3231    /// entry.
3232    ///
3233    /// # Panics
3234    /// - If `self.collections` does not have an entry for `global_id`.
3235    /// - If `IntrospectionType::ShardMapping`'s `GlobalId` is not registered as
3236    ///   a managed collection.
3237    /// - If diff is any value other than `1` or `-1`.
3238    #[instrument(level = "debug")]
3239    fn append_shard_mappings<I>(&self, global_ids: I, diff: Diff)
3240    where
3241        I: Iterator<Item = GlobalId>,
3242    {
3243        mz_ore::soft_assert_or_log!(
3244            diff == Diff::MINUS_ONE || diff == Diff::ONE,
3245            "use 1 for insert or -1 for delete"
3246        );
3247
3248        let id = *self
3249            .introspection_ids
3250            .get(&IntrospectionType::ShardMapping)
3251            .expect("should be registered before this call");
3252
3253        let mut updates = vec![];
3254        // Pack updates into rows
3255        let mut row_buf = Row::default();
3256
3257        for global_id in global_ids {
3258            let shard_id = if let Some(collection) = self.collections.get(&global_id) {
3259                collection.collection_metadata.data_shard.clone()
3260            } else {
3261                panic!("unknown global id: {}", global_id);
3262            };
3263
3264            let mut packer = row_buf.packer();
3265            packer.push(Datum::from(global_id.to_string().as_str()));
3266            packer.push(Datum::from(shard_id.to_string().as_str()));
3267            updates.push((row_buf.clone(), diff));
3268        }
3269
3270        self.collection_manager.differential_append(id, updates);
3271    }
3272
3273    /// Determines and returns this collection's dependencies, if any.
3274    fn determine_collection_dependencies(
3275        &self,
3276        self_id: GlobalId,
3277        collection_desc: &CollectionDescription,
3278    ) -> Result<Vec<GlobalId>, StorageError> {
3279        let mut dependencies = Vec::new();
3280
3281        if let Some(id) = collection_desc.primary {
3282            dependencies.push(id);
3283        }
3284
3285        match &collection_desc.data_source {
3286            DataSource::Introspection(_)
3287            | DataSource::Webhook
3288            | DataSource::Table
3289            | DataSource::Progress
3290            | DataSource::Other => (),
3291            DataSource::IngestionExport { ingestion_id, .. } => {
3292                // Ingestion exports depend on their primary source's remap
3293                // collection.
3294                let source_collection = self.collection(*ingestion_id)?;
3295                let ingestion_remap_collection_id = match &source_collection.data_source {
3296                    DataSource::Ingestion(ingestion) => ingestion.remap_collection_id,
3297                    _ => unreachable!(
3298                        "SourceExport must only refer to primary sources that already exist"
3299                    ),
3300                };
3301
3302                // Ingestion exports (aka. subsources) must make sure that 1)
3303                // their own collection's since stays one step behind the upper,
3304                // and, 2) that the remap shard's since stays one step behind
3305                // their upper. Hence they track themselves and the remap shard
3306                // as dependencies.
3307                dependencies.extend([self_id, ingestion_remap_collection_id]);
3308            }
3309            // Ingestions depend on their remap collection.
3310            DataSource::Ingestion(ingestion) => {
3311                // Ingestions must make sure that 1) their own collection's
3312                // since stays one step behind the upper, and, 2) that the remap
3313                // shard's since stays one step behind their upper. Hence they
3314                // track themselves and the remap shard as dependencies.
3315                dependencies.push(self_id);
3316                if self_id != ingestion.remap_collection_id {
3317                    dependencies.push(ingestion.remap_collection_id);
3318                }
3319            }
3320            DataSource::Sink { desc } => {
3321                // Sinks hold back their own frontier and the frontier of their input.
3322                dependencies.extend([self_id, desc.sink.from]);
3323            }
3324        };
3325
3326        Ok(dependencies)
3327    }
3328
3329    async fn read_handle_for_snapshot(
3330        &self,
3331        id: GlobalId,
3332    ) -> Result<ReadHandle<SourceData, (), Timestamp, StorageDiff>, StorageError> {
3333        let metadata = self.storage_collections.collection_metadata(id)?;
3334        read_handle_for_snapshot(&self.persist, id, &metadata).await
3335    }
3336
3337    /// Handles writing of status updates for sources/sinks to the appropriate
3338    /// status relation
3339    fn record_status_updates(&mut self, updates: Vec<StatusUpdate>) {
3340        if self.read_only {
3341            return;
3342        }
3343
3344        let mut sink_status_updates = vec![];
3345        let mut source_status_updates = vec![];
3346
3347        for update in updates {
3348            let id = update.id;
3349            if self.export(id).is_ok() {
3350                sink_status_updates.push(update);
3351            } else if self.storage_collections.check_exists(id).is_ok() {
3352                source_status_updates.push(update);
3353            }
3354        }
3355
3356        self.append_status_introspection_updates(
3357            IntrospectionType::SourceStatusHistory,
3358            source_status_updates,
3359        );
3360        self.append_status_introspection_updates(
3361            IntrospectionType::SinkStatusHistory,
3362            sink_status_updates,
3363        );
3364    }
3365
3366    fn collection(&self, id: GlobalId) -> Result<&CollectionState, StorageError> {
3367        self.collections
3368            .get(&id)
3369            .ok_or(StorageError::IdentifierMissing(id))
3370    }
3371
3372    /// Runs the identified ingestion using the current definition of the
3373    /// ingestion in-memory.
3374    fn run_ingestion(&mut self, id: GlobalId) -> Result<(), StorageError> {
3375        tracing::info!(%id, "starting ingestion");
3376
3377        let collection = self.collection(id)?;
3378        let ingestion_description = match &collection.data_source {
3379            DataSource::Ingestion(i) => i.clone(),
3380            _ => {
3381                tracing::warn!("run_ingestion called on non-ingestion ID {}", id);
3382                Err(StorageError::IdentifierInvalid(id))?
3383            }
3384        };
3385
3386        // Enrich all of the exports with their metadata
3387        let mut source_exports = BTreeMap::new();
3388        for (export_id, export) in ingestion_description.source_exports.clone() {
3389            let export_storage_metadata = self.collection(export_id)?.collection_metadata.clone();
3390            source_exports.insert(
3391                export_id,
3392                SourceExport {
3393                    storage_metadata: export_storage_metadata,
3394                    details: export.details,
3395                    data_config: export.data_config,
3396                },
3397            );
3398        }
3399
3400        let remap_collection = self.collection(ingestion_description.remap_collection_id)?;
3401
3402        let description = IngestionDescription::<CollectionMetadata> {
3403            source_exports,
3404            remap_metadata: remap_collection.collection_metadata.clone(),
3405            // The rest of the fields are identical
3406            desc: ingestion_description.desc.clone(),
3407            instance_id: ingestion_description.instance_id,
3408            remap_collection_id: ingestion_description.remap_collection_id,
3409        };
3410
3411        let storage_instance_id = description.instance_id;
3412        // Fetch the client for this ingestion's instance.
3413        let instance = self
3414            .instances
3415            .get_mut(&storage_instance_id)
3416            .ok_or_else(|| StorageError::IngestionInstanceMissing {
3417                storage_instance_id,
3418                ingestion_id: id,
3419            })?;
3420
3421        let augmented_ingestion = Box::new(RunIngestionCommand { id, description });
3422        instance.send(StorageCommand::RunIngestion(augmented_ingestion));
3423
3424        Ok(())
3425    }
3426
3427    /// Runs the identified export using the current definition of the export
3428    /// that we have in memory.
3429    fn run_export(&mut self, id: GlobalId) -> Result<(), StorageError> {
3430        let DataSource::Sink { desc: description } = &self.collections[&id].data_source else {
3431            return Err(StorageError::IdentifierMissing(id));
3432        };
3433
3434        let from_storage_metadata = self
3435            .storage_collections
3436            .collection_metadata(description.sink.from)?;
3437        let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
3438
3439        // Choose an as-of frontier for this execution of the sink. If the write frontier of the sink
3440        // is strictly larger than its read hold, it must have at least written out its snapshot, and we can skip
3441        // reading it; otherwise assume we may have to replay from the beginning.
3442        let export_state = self.storage_collections.collection_frontiers(id)?;
3443        let mut as_of = description.sink.as_of.clone();
3444        as_of.join_assign(&export_state.implied_capability);
3445        let with_snapshot = description.sink.with_snapshot
3446            && !PartialOrder::less_than(&as_of, &export_state.write_frontier);
3447
3448        info!(
3449            sink_id = %id,
3450            from_id = %description.sink.from,
3451            write_frontier = ?export_state.write_frontier,
3452            ?as_of,
3453            ?with_snapshot,
3454            "run_export"
3455        );
3456
3457        let cmd = RunSinkCommand {
3458            id,
3459            description: StorageSinkDesc {
3460                from: description.sink.from,
3461                from_desc: description.sink.from_desc.clone(),
3462                connection: description.sink.connection.clone(),
3463                envelope: description.sink.envelope,
3464                as_of,
3465                version: description.sink.version,
3466                from_storage_metadata,
3467                with_snapshot,
3468                to_storage_metadata,
3469                commit_interval: description.sink.commit_interval,
3470            },
3471        };
3472
3473        let storage_instance_id = description.instance_id.clone();
3474
3475        let instance = self
3476            .instances
3477            .get_mut(&storage_instance_id)
3478            .ok_or_else(|| StorageError::ExportInstanceMissing {
3479                storage_instance_id,
3480                export_id: id,
3481            })?;
3482
3483        instance.send(StorageCommand::RunSink(Box::new(cmd)));
3484
3485        Ok(())
3486    }
3487
3488    /// Update introspection with the current frontiers of storage objects.
3489    ///
3490    /// This method is invoked by `Controller::maintain`, which we expect to be called once per
3491    /// second during normal operation.
3492    fn update_frontier_introspection(&mut self) {
3493        let mut global_frontiers = BTreeMap::new();
3494        let mut replica_frontiers = BTreeMap::new();
3495
3496        for collection_frontiers in self.storage_collections.active_collection_frontiers() {
3497            let id = collection_frontiers.id;
3498            let since = collection_frontiers.read_capabilities;
3499            let upper = collection_frontiers.write_frontier;
3500
3501            let instance = self
3502                .collections
3503                .get(&id)
3504                .and_then(|collection_state| match &collection_state.extra_state {
3505                    CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
3506                    CollectionStateExtra::Export(export) => Some(export.cluster_id()),
3507                    CollectionStateExtra::None => None,
3508                })
3509                .and_then(|i| self.instances.get(&i));
3510
3511            if let Some(instance) = instance {
3512                for replica_id in instance.replica_ids() {
3513                    replica_frontiers.insert((id, replica_id), upper.clone());
3514                }
3515            }
3516
3517            global_frontiers.insert(id, (since, upper));
3518        }
3519
3520        let mut global_updates = Vec::new();
3521        let mut replica_updates = Vec::new();
3522
3523        let mut push_global_update =
3524            |id: GlobalId,
3525             (since, upper): (Antichain<Timestamp>, Antichain<Timestamp>),
3526             diff: Diff| {
3527                let read_frontier = since.into_option().map_or(Datum::Null, |t| t.into());
3528                let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3529                let row = Row::pack_slice(&[
3530                    Datum::String(&id.to_string()),
3531                    read_frontier,
3532                    write_frontier,
3533                ]);
3534                global_updates.push((row, diff));
3535            };
3536
3537        let mut push_replica_update =
3538            |(id, replica_id): (GlobalId, ReplicaId), upper: Antichain<Timestamp>, diff: Diff| {
3539                let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3540                let row = Row::pack_slice(&[
3541                    Datum::String(&id.to_string()),
3542                    Datum::String(&replica_id.to_string()),
3543                    write_frontier,
3544                ]);
3545                replica_updates.push((row, diff));
3546            };
3547
3548        let mut old_global_frontiers =
3549            std::mem::replace(&mut self.recorded_frontiers, global_frontiers);
3550        for (&id, new) in &self.recorded_frontiers {
3551            match old_global_frontiers.remove(&id) {
3552                Some(old) if &old != new => {
3553                    push_global_update(id, new.clone(), Diff::ONE);
3554                    push_global_update(id, old, Diff::MINUS_ONE);
3555                }
3556                Some(_) => (),
3557                None => push_global_update(id, new.clone(), Diff::ONE),
3558            }
3559        }
3560        for (id, old) in old_global_frontiers {
3561            push_global_update(id, old, Diff::MINUS_ONE);
3562        }
3563
3564        let mut old_replica_frontiers =
3565            std::mem::replace(&mut self.recorded_replica_frontiers, replica_frontiers);
3566        for (&key, new) in &self.recorded_replica_frontiers {
3567            match old_replica_frontiers.remove(&key) {
3568                Some(old) if &old != new => {
3569                    push_replica_update(key, new.clone(), Diff::ONE);
3570                    push_replica_update(key, old, Diff::MINUS_ONE);
3571                }
3572                Some(_) => (),
3573                None => push_replica_update(key, new.clone(), Diff::ONE),
3574            }
3575        }
3576        for (key, old) in old_replica_frontiers {
3577            push_replica_update(key, old, Diff::MINUS_ONE);
3578        }
3579
3580        let id = self.introspection_ids[&IntrospectionType::Frontiers];
3581        self.collection_manager
3582            .differential_append(id, global_updates);
3583
3584        let id = self.introspection_ids[&IntrospectionType::ReplicaFrontiers];
3585        self.collection_manager
3586            .differential_append(id, replica_updates);
3587    }
3588
3589    /// Refresh the wallclock lag introspection and metrics with the current lag values.
3590    ///
3591    /// This method produces wallclock lag metrics of two different shapes:
3592    ///
3593    /// * Histories: For each replica and each collection, we measure the lag of the write frontier
3594    ///   behind the wallclock time every second. Every minute we emit the maximum lag observed
3595    ///   over the last minute, together with the current time.
3596    /// * Histograms: For each collection, we measure the lag of the write frontier behind
3597    ///   wallclock time every second. Every minute we emit all lags observed over the last minute,
3598    ///   together with the current histogram period.
3599    ///
3600    /// Histories are emitted to both Mz introspection and Prometheus, histograms only to
3601    /// introspection. We treat lags of unreadable collections (i.e. collections that contain no
3602    /// readable times) as undefined and set them to NULL in introspection and `u64::MAX` in
3603    /// Prometheus.
3604    ///
3605    /// This method is invoked by `Controller::maintain`, which we expect to be called once per
3606    /// second during normal operation.
3607    fn refresh_wallclock_lag(&mut self) {
3608        let now_ms = (self.now)();
3609        let histogram_period =
3610            WallclockLagHistogramPeriod::from_epoch_millis(now_ms, self.config.config_set());
3611
3612        let frontier_lag = |frontier: &Antichain<Timestamp>| match frontier.as_option() {
3613            Some(ts) => (self.wallclock_lag)(*ts),
3614            None => Duration::ZERO,
3615        };
3616
3617        for frontiers in self.storage_collections.active_collection_frontiers() {
3618            let id = frontiers.id;
3619            let Some(collection) = self.collections.get_mut(&id) else {
3620                continue;
3621            };
3622
3623            let collection_unreadable =
3624                PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
3625            let lag = if collection_unreadable {
3626                WallclockLag::Undefined
3627            } else {
3628                let lag = frontier_lag(&frontiers.write_frontier);
3629                WallclockLag::Seconds(lag.as_secs())
3630            };
3631
3632            collection.wallclock_lag_max = collection.wallclock_lag_max.max(lag);
3633
3634            // No way to specify values as undefined in Prometheus metrics, so we use the
3635            // maximum value instead.
3636            let secs = lag.unwrap_seconds_or(u64::MAX);
3637            collection.wallclock_lag_metrics.observe(secs);
3638
3639            if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
3640                let bucket = lag.map_seconds(|secs| secs.next_power_of_two());
3641
3642                let instance_id = match &collection.extra_state {
3643                    CollectionStateExtra::Ingestion(i) => Some(i.instance_id),
3644                    CollectionStateExtra::Export(e) => Some(e.cluster_id()),
3645                    CollectionStateExtra::None => None,
3646                };
3647                let workload_class = instance_id
3648                    .and_then(|id| self.instances.get(&id))
3649                    .and_then(|i| i.workload_class.clone());
3650                let labels = match workload_class {
3651                    Some(wc) => [("workload_class", wc.clone())].into(),
3652                    None => BTreeMap::new(),
3653                };
3654
3655                let key = (histogram_period, bucket, labels);
3656                *stash.entry(key).or_default() += Diff::ONE;
3657            }
3658        }
3659
3660        // Record lags to persist, if it's time.
3661        self.maybe_record_wallclock_lag();
3662    }
3663
3664    /// Produce new wallclock lag introspection updates, provided enough time has passed since the
3665    /// last recording.
3666    ///
3667    /// We emit new introspection updates if the system time has passed into a new multiple of the
3668    /// recording interval (typically 1 minute) since the last refresh. The compute controller uses
3669    /// the same approach, ensuring that both controllers commit their lags at roughly the same
3670    /// time, avoiding confusion caused by inconsistencies.
3671    fn maybe_record_wallclock_lag(&mut self) {
3672        if self.read_only {
3673            return;
3674        }
3675
3676        let duration_trunc = |datetime: DateTime<_>, interval| {
3677            let td = TimeDelta::from_std(interval).ok()?;
3678            datetime.duration_trunc(td).ok()
3679        };
3680
3681        let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(self.config.config_set());
3682        let now_dt = mz_ore::now::to_datetime((self.now)());
3683        let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
3684            soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
3685            let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
3686            duration_trunc(now_dt, *default).unwrap()
3687        });
3688        if now_trunc <= self.wallclock_lag_last_recorded {
3689            return;
3690        }
3691
3692        let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
3693
3694        let mut history_updates = Vec::new();
3695        let mut histogram_updates = Vec::new();
3696        let mut row_buf = Row::default();
3697        for frontiers in self.storage_collections.active_collection_frontiers() {
3698            let id = frontiers.id;
3699            let Some(collection) = self.collections.get_mut(&id) else {
3700                continue;
3701            };
3702
3703            let max_lag = std::mem::replace(&mut collection.wallclock_lag_max, WallclockLag::MIN);
3704            let row = Row::pack_slice(&[
3705                Datum::String(&id.to_string()),
3706                Datum::Null,
3707                max_lag.into_interval_datum(),
3708                Datum::TimestampTz(now_ts),
3709            ]);
3710            history_updates.push((row, Diff::ONE));
3711
3712            let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
3713                continue;
3714            };
3715
3716            for ((period, lag, labels), count) in std::mem::take(stash) {
3717                let mut packer = row_buf.packer();
3718                packer.extend([
3719                    Datum::TimestampTz(period.start),
3720                    Datum::TimestampTz(period.end),
3721                    Datum::String(&id.to_string()),
3722                    lag.into_uint64_datum(),
3723                ]);
3724                let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
3725                packer.push_dict(labels);
3726
3727                histogram_updates.push((row_buf.clone(), count));
3728            }
3729        }
3730
3731        if !history_updates.is_empty() {
3732            self.append_introspection_updates(
3733                IntrospectionType::WallclockLagHistory,
3734                history_updates,
3735            );
3736        }
3737        if !histogram_updates.is_empty() {
3738            self.append_introspection_updates(
3739                IntrospectionType::WallclockLagHistogram,
3740                histogram_updates,
3741            );
3742        }
3743
3744        self.wallclock_lag_last_recorded = now_trunc;
3745    }
3746
3747    /// Run periodic tasks.
3748    ///
3749    /// This method is invoked roughly once per second during normal operation. It is a good place
3750    /// for tasks that need to run periodically, such as state cleanup or updating of metrics.
3751    fn maintain(&mut self) {
3752        self.update_frontier_introspection();
3753        self.refresh_wallclock_lag();
3754
3755        // Perform instance maintenance work.
3756        for instance in self.instances.values_mut() {
3757            instance.refresh_state_metrics();
3758        }
3759    }
3760}
3761
3762impl From<&IntrospectionType> for CollectionManagerKind {
3763    fn from(value: &IntrospectionType) -> Self {
3764        match value {
3765            IntrospectionType::ShardMapping
3766            | IntrospectionType::Frontiers
3767            | IntrospectionType::ReplicaFrontiers
3768            | IntrospectionType::StorageSourceStatistics
3769            | IntrospectionType::StorageSinkStatistics
3770            | IntrospectionType::ComputeDependencies
3771            | IntrospectionType::ComputeOperatorHydrationStatus
3772            | IntrospectionType::ComputeMaterializedViewRefreshes
3773            | IntrospectionType::ComputeErrorCounts
3774            | IntrospectionType::ComputeHydrationTimes => CollectionManagerKind::Differential,
3775
3776            IntrospectionType::SourceStatusHistory
3777            | IntrospectionType::SinkStatusHistory
3778            | IntrospectionType::PrivatelinkConnectionStatusHistory
3779            | IntrospectionType::ReplicaStatusHistory
3780            | IntrospectionType::ReplicaMetricsHistory
3781            | IntrospectionType::WallclockLagHistory
3782            | IntrospectionType::WallclockLagHistogram
3783            | IntrospectionType::PreparedStatementHistory
3784            | IntrospectionType::StatementExecutionHistory
3785            | IntrospectionType::SessionHistory
3786            | IntrospectionType::StatementLifecycleHistory
3787            | IntrospectionType::SqlText => CollectionManagerKind::AppendOnly,
3788        }
3789    }
3790}
3791
3792/// Get the current rows in the given statistics table. This is used to bootstrap
3793/// the statistics tasks.
3794///
3795// TODO(guswynn): we need to be more careful about the update time we get here:
3796// <https://github.com/MaterializeInc/database-issues/issues/7564>
3797async fn snapshot_statistics(
3798    id: GlobalId,
3799    upper: Antichain<Timestamp>,
3800    storage_collections: &Arc<dyn StorageCollections + Send + Sync>,
3801) -> Vec<Row> {
3802    match upper.as_option() {
3803        Some(f) if f > &Timestamp::MIN => {
3804            let as_of = f.step_back().unwrap();
3805
3806            let snapshot = storage_collections.snapshot(id, as_of).await.unwrap();
3807            snapshot
3808                .into_iter()
3809                .map(|(row, diff)| {
3810                    assert_eq!(diff, 1);
3811                    row
3812                })
3813                .collect()
3814        }
3815        // If collection is closed or the frontier is the minimum, we cannot
3816        // or don't need to truncate (respectively).
3817        _ => Vec::new(),
3818    }
3819}
3820
3821async fn read_handle_for_snapshot(
3822    persist: &PersistClientCache,
3823    id: GlobalId,
3824    metadata: &CollectionMetadata,
3825) -> Result<ReadHandle<SourceData, (), Timestamp, StorageDiff>, StorageError> {
3826    let persist_client = persist
3827        .open(metadata.persist_location.clone())
3828        .await
3829        .unwrap();
3830
3831    // We create a new read handle every time someone requests a snapshot and then immediately
3832    // expire it instead of keeping a read handle permanently in our state to avoid having it
3833    // heartbeat continously. The assumption is that calls to snapshot are rare and therefore
3834    // worth it to always create a new handle.
3835    let read_handle = persist_client
3836        .open_leased_reader::<SourceData, (), _, _>(
3837            metadata.data_shard,
3838            Arc::new(metadata.relation_desc.clone()),
3839            Arc::new(UnitSchema),
3840            Diagnostics {
3841                shard_name: id.to_string(),
3842                handle_purpose: format!("snapshot {}", id),
3843            },
3844            USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3845        )
3846        .await
3847        .expect("invalid persist usage");
3848    Ok(read_handle)
3849}
3850
3851/// State maintained about individual collections.
3852#[derive(Debug)]
3853struct CollectionState {
3854    /// The source of this collection's data.
3855    pub data_source: DataSource,
3856
3857    pub collection_metadata: CollectionMetadata,
3858
3859    pub extra_state: CollectionStateExtra,
3860
3861    /// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update.
3862    wallclock_lag_max: WallclockLag,
3863    /// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
3864    /// introspection update.
3865    ///
3866    /// Keys are `(period, lag, labels)` triples, values are counts.
3867    ///
3868    /// If this is `None`, wallclock lag is not tracked for this collection.
3869    wallclock_lag_histogram_stash: Option<
3870        BTreeMap<
3871            (
3872                WallclockLagHistogramPeriod,
3873                WallclockLag,
3874                BTreeMap<&'static str, String>,
3875            ),
3876            Diff,
3877        >,
3878    >,
3879    /// Frontier wallclock lag metrics tracked for this collection.
3880    wallclock_lag_metrics: WallclockLagMetrics,
3881}
3882
3883impl CollectionState {
3884    fn new(
3885        data_source: DataSource,
3886        collection_metadata: CollectionMetadata,
3887        extra_state: CollectionStateExtra,
3888        wallclock_lag_metrics: WallclockLagMetrics,
3889    ) -> Self {
3890        // Only collect wallclock lag histogram data for collections written by storage, to avoid
3891        // duplicate measurements. Collections written by other components (e.g. compute) have
3892        // their wallclock lags recorded by these components.
3893        let wallclock_lag_histogram_stash = match &data_source {
3894            DataSource::Other => None,
3895            _ => Some(Default::default()),
3896        };
3897
3898        Self {
3899            data_source,
3900            collection_metadata,
3901            extra_state,
3902            wallclock_lag_max: WallclockLag::MIN,
3903            wallclock_lag_histogram_stash,
3904            wallclock_lag_metrics,
3905        }
3906    }
3907}
3908
3909/// Additional state that the controller maintains for select collection types.
3910#[derive(Debug)]
3911enum CollectionStateExtra {
3912    Ingestion(IngestionState),
3913    Export(ExportState),
3914    None,
3915}
3916
3917/// State maintained about ingestions and ingestion exports
3918#[derive(Debug)]
3919struct IngestionState {
3920    /// Really only for keeping track of changes to the `derived_since`.
3921    pub read_capabilities: MutableAntichain<Timestamp>,
3922
3923    /// The current since frontier, derived from `write_frontier` using
3924    /// `hold_policy`.
3925    pub derived_since: Antichain<Timestamp>,
3926
3927    /// Holds that this ingestion (or ingestion export) has on its dependencies.
3928    pub dependency_read_holds: Vec<ReadHold>,
3929
3930    /// Reported write frontier.
3931    pub write_frontier: Antichain<Timestamp>,
3932
3933    /// The policy that drives how we downgrade our read hold. That is how we
3934    /// derive our since from our upper.
3935    ///
3936    /// This is a _storage-controller-internal_ policy used to derive its
3937    /// personal read hold on the collection. It should not be confused with any
3938    /// read policies that the adapter might install at [StorageCollections].
3939    pub hold_policy: ReadPolicy,
3940
3941    /// The ID of the instance in which the ingestion is running.
3942    pub instance_id: StorageInstanceId,
3943
3944    /// Set of replica IDs on which this ingestion is hydrated.
3945    pub hydrated_on: BTreeSet<ReplicaId>,
3946}
3947
3948/// A description of a status history collection.
3949///
3950/// Used to inform partial truncation, see
3951/// [`collection_mgmt::partially_truncate_status_history`].
3952struct StatusHistoryDesc<K> {
3953    retention_policy: StatusHistoryRetentionPolicy,
3954    extract_key: Box<dyn Fn(&[Datum]) -> K + Send>,
3955    extract_time: Box<dyn Fn(&[Datum]) -> CheckedTimestamp<DateTime<Utc>> + Send>,
3956}
3957enum StatusHistoryRetentionPolicy {
3958    // Truncates everything but the last N updates for each key.
3959    LastN(usize),
3960    // Truncates everything past the time window for each key.
3961    TimeWindow(Duration),
3962}
3963
3964fn source_status_history_desc(
3965    params: &StorageParameters,
3966) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3967    let desc = &MZ_SOURCE_STATUS_HISTORY_DESC;
3968    let (source_id_idx, _) = desc.get_by_name(&"source_id".into()).expect("exists");
3969    let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3970    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3971
3972    StatusHistoryDesc {
3973        retention_policy: StatusHistoryRetentionPolicy::LastN(
3974            params.keep_n_source_status_history_entries,
3975        ),
3976        extract_key: Box::new(move |datums| {
3977            (
3978                GlobalId::from_str(datums[source_id_idx].unwrap_str()).expect("GlobalId column"),
3979                if datums[replica_id_idx].is_null() {
3980                    None
3981                } else {
3982                    Some(
3983                        ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3984                            .expect("ReplicaId column"),
3985                    )
3986                },
3987            )
3988        }),
3989        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3990    }
3991}
3992
3993fn sink_status_history_desc(
3994    params: &StorageParameters,
3995) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3996    let desc = &MZ_SINK_STATUS_HISTORY_DESC;
3997    let (sink_id_idx, _) = desc.get_by_name(&"sink_id".into()).expect("exists");
3998    let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3999    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4000
4001    StatusHistoryDesc {
4002        retention_policy: StatusHistoryRetentionPolicy::LastN(
4003            params.keep_n_sink_status_history_entries,
4004        ),
4005        extract_key: Box::new(move |datums| {
4006            (
4007                GlobalId::from_str(datums[sink_id_idx].unwrap_str()).expect("GlobalId column"),
4008                if datums[replica_id_idx].is_null() {
4009                    None
4010                } else {
4011                    Some(
4012                        ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
4013                            .expect("ReplicaId column"),
4014                    )
4015                },
4016            )
4017        }),
4018        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4019    }
4020}
4021
4022fn privatelink_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<GlobalId> {
4023    let desc = &MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC;
4024    let (key_idx, _) = desc.get_by_name(&"connection_id".into()).expect("exists");
4025    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4026
4027    StatusHistoryDesc {
4028        retention_policy: StatusHistoryRetentionPolicy::LastN(
4029            params.keep_n_privatelink_status_history_entries,
4030        ),
4031        extract_key: Box::new(move |datums| {
4032            GlobalId::from_str(datums[key_idx].unwrap_str()).expect("GlobalId column")
4033        }),
4034        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4035    }
4036}
4037
4038fn replica_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<(GlobalId, u64)> {
4039    let desc = &REPLICA_STATUS_HISTORY_DESC;
4040    let (replica_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
4041    let (process_idx, _) = desc.get_by_name(&"process_id".into()).expect("exists");
4042    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4043
4044    StatusHistoryDesc {
4045        retention_policy: StatusHistoryRetentionPolicy::TimeWindow(
4046            params.replica_status_history_retention_window,
4047        ),
4048        extract_key: Box::new(move |datums| {
4049            (
4050                GlobalId::from_str(datums[replica_idx].unwrap_str()).expect("GlobalId column"),
4051                datums[process_idx].unwrap_uint64(),
4052            )
4053        }),
4054        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4055    }
4056}
4057
4058/// Replace one antichain with another, tracking the overall changes in the returned `ChangeBatch`.
4059fn swap_updates(
4060    from: &mut Antichain<Timestamp>,
4061    mut replace_with: Antichain<Timestamp>,
4062) -> ChangeBatch<Timestamp> {
4063    let mut update = ChangeBatch::new();
4064    if PartialOrder::less_equal(from, &replace_with) {
4065        update.extend(replace_with.iter().map(|time| (*time, 1)));
4066        std::mem::swap(from, &mut replace_with);
4067        update.extend(replace_with.iter().map(|time| (*time, -1)));
4068    }
4069    update
4070}