mz_storage_controller/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of the storage controller trait.
11
12use std::any::Any;
13use std::collections::btree_map;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt::{Debug, Display};
16use std::str::FromStr;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use async_trait::async_trait;
21use chrono::{DateTime, DurationRound, TimeDelta, Utc};
22use derivative::Derivative;
23use differential_dataflow::lattice::Lattice;
24use futures::FutureExt;
25use futures::StreamExt;
26use itertools::Itertools;
27use mz_build_info::BuildInfo;
28use mz_cluster_client::client::ClusterReplicaLocation;
29use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
30use mz_cluster_client::{ReplicaId, WallclockLagFn};
31use mz_controller_types::dyncfgs::{
32    ENABLE_0DT_DEPLOYMENT_SOURCES, WALLCLOCK_LAG_RECORDING_INTERVAL,
33};
34use mz_ore::collections::CollectionExt;
35use mz_ore::metrics::MetricsRegistry;
36use mz_ore::now::{EpochMillis, NowFn};
37use mz_ore::task::AbortOnDropHandle;
38use mz_ore::{assert_none, halt, instrument, soft_panic_or_log};
39use mz_persist_client::batch::ProtoBatch;
40use mz_persist_client::cache::PersistClientCache;
41use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
42use mz_persist_client::read::ReadHandle;
43use mz_persist_client::schema::CaESchema;
44use mz_persist_client::write::WriteHandle;
45use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
46use mz_persist_types::Codec64;
47use mz_persist_types::codec_impls::UnitSchema;
48use mz_repr::adt::timestamp::CheckedTimestamp;
49use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
50use mz_storage_client::client::{
51    AppendOnlyUpdate, RunIngestionCommand, RunOneshotIngestion, RunSinkCommand, Status,
52    StatusUpdate, StorageCommand, StorageResponse, TableData,
53};
54use mz_storage_client::controller::{
55    BoxFuture, CollectionDescription, DataSource, ExportDescription, ExportState,
56    IntrospectionType, MonotonicAppender, PersistEpoch, Response, StorageController,
57    StorageMetadata, StorageTxn, StorageWriteOp, WallclockLag, WallclockLagHistogramPeriod,
58};
59use mz_storage_client::healthcheck::{
60    MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC,
61    MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_STATUS_HISTORY_DESC,
62};
63use mz_storage_client::metrics::StorageControllerMetrics;
64use mz_storage_client::statistics::{
65    ControllerSinkStatistics, ControllerSourceStatistics, WebhookStatistics,
66};
67use mz_storage_client::storage_collections::StorageCollections;
68use mz_storage_types::configuration::StorageConfiguration;
69use mz_storage_types::connections::ConnectionContext;
70use mz_storage_types::connections::inline::InlinedConnection;
71use mz_storage_types::controller::{AlterError, CollectionMetadata, StorageError, TxnsCodecRow};
72use mz_storage_types::instances::StorageInstanceId;
73use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
74use mz_storage_types::parameters::StorageParameters;
75use mz_storage_types::read_holds::ReadHold;
76use mz_storage_types::read_policy::ReadPolicy;
77use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
78use mz_storage_types::sources::{
79    GenericSourceConnection, IngestionDescription, SourceConnection, SourceData, SourceDesc,
80    SourceExport, SourceExportDataConfig,
81};
82use mz_storage_types::{AlterCompatible, StorageDiff, dyncfgs};
83use mz_txn_wal::metrics::Metrics as TxnMetrics;
84use mz_txn_wal::txn_read::TxnsRead;
85use mz_txn_wal::txns::TxnsHandle;
86use timely::order::{PartialOrder, TotalOrder};
87use timely::progress::Timestamp as TimelyTimestamp;
88use timely::progress::frontier::MutableAntichain;
89use timely::progress::{Antichain, ChangeBatch, Timestamp};
90use tokio::sync::watch::{Sender, channel};
91use tokio::sync::{mpsc, oneshot};
92use tokio::time::MissedTickBehavior;
93use tokio::time::error::Elapsed;
94use tracing::{debug, info, warn};
95
96use crate::collection_mgmt::{
97    AppendOnlyIntrospectionConfig, CollectionManagerKind, DifferentialIntrospectionConfig,
98};
99use crate::instance::{Instance, ReplicaConfig};
100
101mod collection_mgmt;
102mod history;
103mod instance;
104mod persist_handles;
105mod rtr;
106mod statistics;
107
108#[derive(Derivative)]
109#[derivative(Debug)]
110struct PendingOneshotIngestion {
111    /// Callback used to provide results of the ingestion.
112    #[derivative(Debug = "ignore")]
113    result_tx: OneshotResultCallback<ProtoBatch>,
114    /// Cluster currently running this ingestion
115    cluster_id: StorageInstanceId,
116}
117
118impl PendingOneshotIngestion {
119    /// Consume the pending ingestion, responding with a cancelation message.
120    ///
121    /// TODO(cf2): Refine these error messages so they're not stringly typed.
122    pub(crate) fn cancel(self) {
123        (self.result_tx)(vec![Err("canceled".to_string())])
124    }
125}
126
127/// A storage controller for a storage instance.
128#[derive(Derivative)]
129#[derivative(Debug)]
130pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation>
131{
132    /// The build information for this process.
133    build_info: &'static BuildInfo,
134    /// A function that returns the current time.
135    now: NowFn,
136
137    /// Whether or not this controller is in read-only mode.
138    ///
139    /// When in read-only mode, neither this controller nor the instances
140    /// controlled by it are allowed to affect changes to external systems
141    /// (largely persist).
142    read_only: bool,
143
144    /// Collections maintained by the storage controller.
145    ///
146    /// This collection only grows, although individual collections may be rendered unusable.
147    /// This is to prevent the re-binding of identifiers to other descriptions.
148    pub(crate) collections: BTreeMap<GlobalId, CollectionState<T>>,
149
150    /// Map from IDs of objects that have been dropped to replicas we are still
151    /// expecting DroppedId messages from. This is cleared out once all replicas
152    /// have responded.
153    ///
154    /// We use this only to catch problems in the protocol between controller
155    /// and replicas, for example we can differentiate between late messages for
156    /// objects that have already been dropped and unexpected (read erroneous)
157    /// messages from the replica.
158    dropped_objects: BTreeMap<GlobalId, BTreeSet<ReplicaId>>,
159
160    /// Write handle for table shards.
161    pub(crate) persist_table_worker: persist_handles::PersistTableWriteWorker<T>,
162    /// A shared TxnsCache running in a task and communicated with over a channel.
163    txns_read: TxnsRead<T>,
164    txns_metrics: Arc<TxnMetrics>,
165    stashed_responses: Vec<(Option<ReplicaId>, StorageResponse<T>)>,
166    /// Channel for sending table handle drops.
167    #[derivative(Debug = "ignore")]
168    pending_table_handle_drops_tx: mpsc::UnboundedSender<GlobalId>,
169    /// Channel for receiving table handle drops.
170    #[derivative(Debug = "ignore")]
171    pending_table_handle_drops_rx: mpsc::UnboundedReceiver<GlobalId>,
172    /// Closures that can be used to send responses from oneshot ingestions.
173    #[derivative(Debug = "ignore")]
174    pending_oneshot_ingestions: BTreeMap<uuid::Uuid, PendingOneshotIngestion>,
175
176    /// Interface for managed collections
177    pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
178
179    /// Tracks which collection is responsible for which [`IntrospectionType`].
180    pub(crate) introspection_ids: BTreeMap<IntrospectionType, GlobalId>,
181    /// Tokens for tasks that drive updating introspection collections. Dropping
182    /// this will make sure that any tasks (or other resources) will stop when
183    /// needed.
184    // TODO(aljoscha): Should these live somewhere else?
185    introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
186
187    // The following two fields must always be locked in order.
188    /// Consolidated metrics updates to periodically write. We do not eagerly initialize this,
189    /// and its contents are entirely driven by `StorageResponse::StatisticsUpdates`'s, as well
190    /// as webhook statistics.
191    source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
192    /// Consolidated metrics updates to periodically write. We do not eagerly initialize this,
193    /// and its contents are entirely driven by `StorageResponse::StatisticsUpdates`'s.
194    sink_statistics: Arc<Mutex<BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSinkStatistics>>>,
195    /// A way to update the statistics interval in the statistics tasks.
196    statistics_interval_sender: Sender<Duration>,
197
198    /// Clients for all known storage instances.
199    instances: BTreeMap<StorageInstanceId, Instance<T>>,
200    /// Set to `true` once `initialization_complete` has been called.
201    initialized: bool,
202    /// Storage configuration to apply to newly provisioned instances, and use during purification.
203    config: StorageConfiguration,
204    /// The persist location where all storage collections are being written to
205    persist_location: PersistLocation,
206    /// A persist client used to write to storage collections
207    persist: Arc<PersistClientCache>,
208    /// Metrics of the Storage controller
209    metrics: StorageControllerMetrics,
210    /// `(read, write)` frontiers that have been recorded in the `Frontiers` collection, kept to be
211    /// able to retract old rows.
212    recorded_frontiers: BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)>,
213    /// Write frontiers that have been recorded in the `ReplicaFrontiers` collection, kept to be
214    /// able to retract old rows.
215    recorded_replica_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<T>>,
216
217    /// A function that computes the lag between the given time and wallclock time.
218    #[derivative(Debug = "ignore")]
219    wallclock_lag: WallclockLagFn<T>,
220    /// The last time wallclock lag introspection was recorded.
221    wallclock_lag_last_recorded: DateTime<Utc>,
222
223    /// Handle to a [StorageCollections].
224    storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
225    /// Migrated storage collections that can be written even in read only mode.
226    migrated_storage_collections: BTreeSet<GlobalId>,
227
228    /// Ticker for scheduling periodic maintenance work.
229    maintenance_ticker: tokio::time::Interval,
230    /// Whether maintenance work was scheduled.
231    maintenance_scheduled: bool,
232
233    /// Shared transmit channel for replicas to send responses.
234    instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
235    /// Receive end for replica responses.
236    instance_response_rx: mpsc::UnboundedReceiver<(Option<ReplicaId>, StorageResponse<T>)>,
237
238    /// Background task run at startup to warm persist state.
239    persist_warm_task: Option<AbortOnDropHandle<Box<dyn Debug + Send>>>,
240}
241
242/// Warm up persist state for `shard_ids` in a background task.
243///
244/// With better parallelism during startup this would likely be unnecessary, but empirically we see
245/// some nice speedups with this relatively simple function.
246fn warm_persist_state_in_background(
247    client: PersistClient,
248    shard_ids: impl Iterator<Item = ShardId> + Send + 'static,
249) -> mz_ore::task::JoinHandle<Box<dyn Debug + Send>> {
250    /// Bound the number of shards that we warm at a single time, to limit our overall resource use.
251    const MAX_CONCURRENT_WARMS: usize = 16;
252    let logic = async move {
253        let fetchers: Vec<_> = tokio_stream::iter(shard_ids)
254            .map(|shard_id| {
255                let client = client.clone();
256                async move {
257                    client
258                        .create_batch_fetcher::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
259                            shard_id,
260                            Arc::new(RelationDesc::empty()),
261                            Arc::new(UnitSchema),
262                            true,
263                            Diagnostics::from_purpose("warm persist load state"),
264                        )
265                        .await
266                }
267            })
268            .buffer_unordered(MAX_CONCURRENT_WARMS)
269            .collect()
270            .await;
271        let fetchers: Box<dyn Debug + Send> = Box::new(fetchers);
272        fetchers
273    };
274    mz_ore::task::spawn(|| "warm_persist_load_state", logic)
275}
276
277#[async_trait(?Send)]
278impl<T> StorageController for Controller<T>
279where
280    T: Timestamp
281        + Lattice
282        + TotalOrder
283        + Codec64
284        + From<EpochMillis>
285        + TimestampManipulation
286        + Into<Datum<'static>>
287        + Display,
288{
289    type Timestamp = T;
290
291    fn initialization_complete(&mut self) {
292        self.reconcile_dangling_statistics();
293        self.initialized = true;
294
295        for instance in self.instances.values_mut() {
296            instance.send(StorageCommand::InitializationComplete);
297        }
298    }
299
300    fn update_parameters(&mut self, config_params: StorageParameters) {
301        self.storage_collections
302            .update_parameters(config_params.clone());
303
304        // We serialize the dyncfg updates in StorageParameters, but configure
305        // persist separately.
306        self.persist.cfg().apply_from(&config_params.dyncfg_updates);
307
308        for instance in self.instances.values_mut() {
309            let params = Box::new(config_params.clone());
310            instance.send(StorageCommand::UpdateConfiguration(params));
311        }
312        self.config.update(config_params);
313        self.statistics_interval_sender
314            .send_replace(self.config.parameters.statistics_interval);
315        self.collection_manager.update_user_batch_duration(
316            self.config
317                .parameters
318                .user_storage_managed_collections_batch_duration,
319        );
320    }
321
322    /// Get the current configuration
323    fn config(&self) -> &StorageConfiguration {
324        &self.config
325    }
326
327    fn collection_metadata(
328        &self,
329        id: GlobalId,
330    ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>> {
331        self.storage_collections.collection_metadata(id)
332    }
333
334    fn collection_hydrated(
335        &self,
336        collection_id: GlobalId,
337    ) -> Result<bool, StorageError<Self::Timestamp>> {
338        let collection = self.collection(collection_id)?;
339
340        let instance_id = match &collection.data_source {
341            DataSource::Ingestion(ingestion_description) => ingestion_description.instance_id,
342            DataSource::IngestionExport { ingestion_id, .. } => {
343                let ingestion_state = self.collections.get(ingestion_id).expect("known to exist");
344
345                let instance_id = match &ingestion_state.data_source {
346                    DataSource::Ingestion(ingestion_desc) => ingestion_desc.instance_id,
347                    _ => unreachable!("SourceExport must only refer to primary source"),
348                };
349
350                instance_id
351            }
352            _ => return Ok(true),
353        };
354
355        let instance = self.instances.get(&instance_id).ok_or_else(|| {
356            StorageError::IngestionInstanceMissing {
357                storage_instance_id: instance_id,
358                ingestion_id: collection_id,
359            }
360        })?;
361
362        if instance.replica_ids().next().is_none() {
363            // Ingestions on zero-replica clusters are always considered
364            // hydrated.
365            return Ok(true);
366        }
367
368        match &collection.extra_state {
369            CollectionStateExtra::Ingestion(ingestion_state) => {
370                // An ingestion is hydrated if it is hydrated on at least one replica.
371                Ok(ingestion_state.hydrated_on.len() >= 1)
372            }
373            CollectionStateExtra::Export(_) => {
374                // For now, sinks are always considered hydrated. We rely on
375                // them starting up "instantly" and don't wait for them when
376                // checking hydration status of a replica.  TODO(sinks): base
377                // this off of the sink shard's frontier?
378                Ok(true)
379            }
380            CollectionStateExtra::None => {
381                // For now, objects that are not ingestions are always
382                // considered hydrated. This is tables and webhooks, as of
383                // today.
384                Ok(true)
385            }
386        }
387    }
388
389    #[mz_ore::instrument(level = "debug")]
390    fn collections_hydrated_on_replicas(
391        &self,
392        target_replica_ids: Option<Vec<ReplicaId>>,
393        target_cluster_id: &StorageInstanceId,
394        exclude_collections: &BTreeSet<GlobalId>,
395    ) -> Result<bool, StorageError<Self::Timestamp>> {
396        // If an empty set of replicas is provided there can be
397        // no collections on them, we'll count this as hydrated.
398        if target_replica_ids.as_ref().is_some_and(|v| v.is_empty()) {
399            return Ok(true);
400        }
401
402        // If target_replica_ids is provided, use it as the set of target
403        // replicas. Otherwise check for hydration on any replica.
404        let target_replicas: Option<BTreeSet<ReplicaId>> =
405            target_replica_ids.map(|ids| ids.into_iter().collect());
406
407        let mut all_hydrated = true;
408        for (collection_id, collection_state) in self.collections.iter() {
409            if collection_id.is_transient() || exclude_collections.contains(collection_id) {
410                continue;
411            }
412            let hydrated = match &collection_state.extra_state {
413                CollectionStateExtra::Ingestion(state) => {
414                    if &state.instance_id != target_cluster_id {
415                        continue;
416                    }
417                    match &target_replicas {
418                        Some(target_replicas) => !state.hydrated_on.is_disjoint(target_replicas),
419                        None => {
420                            // Not target replicas, so check that it's hydrated
421                            // on at least one replica.
422                            state.hydrated_on.len() >= 1
423                        }
424                    }
425                }
426                CollectionStateExtra::Export(_) => {
427                    // For now, sinks are always considered hydrated. We rely on
428                    // them starting up "instantly" and don't wait for them when
429                    // checking hydration status of a replica.  TODO(sinks):
430                    // base this off of the sink shard's frontier?
431                    true
432                }
433                CollectionStateExtra::None => {
434                    // For now, objects that are not ingestions are always
435                    // considered hydrated. This is tables and webhooks, as of
436                    // today.
437                    true
438                }
439            };
440            if !hydrated {
441                tracing::info!(%collection_id, "collection is not hydrated on any replica");
442                all_hydrated = false;
443                // We continue with our loop instead of breaking out early, so
444                // that we log all non-hydrated replicas.
445            }
446        }
447        Ok(all_hydrated)
448    }
449
450    fn collection_frontiers(
451        &self,
452        id: GlobalId,
453    ) -> Result<
454        (Antichain<Self::Timestamp>, Antichain<Self::Timestamp>),
455        StorageError<Self::Timestamp>,
456    > {
457        let frontiers = self.storage_collections.collection_frontiers(id)?;
458        Ok((frontiers.implied_capability, frontiers.write_frontier))
459    }
460
461    fn collections_frontiers(
462        &self,
463        mut ids: Vec<GlobalId>,
464    ) -> Result<Vec<(GlobalId, Antichain<T>, Antichain<T>)>, StorageError<Self::Timestamp>> {
465        let mut result = vec![];
466        // In theory, we could pull all our frontiers from storage collections...
467        // but in practice those frontiers may not be identical. For historical reasons, we use the
468        // locally-tracked frontier for sinks but the storage-collections-maintained frontier for
469        // sources.
470        ids.retain(|&id| match self.export(id) {
471            Ok(export) => {
472                result.push((
473                    id,
474                    export.input_hold().since().clone(),
475                    export.write_frontier.clone(),
476                ));
477                false
478            }
479            Err(_) => true,
480        });
481        result.extend(
482            self.storage_collections
483                .collections_frontiers(ids)?
484                .into_iter()
485                .map(|frontiers| {
486                    (
487                        frontiers.id,
488                        frontiers.implied_capability,
489                        frontiers.write_frontier,
490                    )
491                }),
492        );
493
494        Ok(result)
495    }
496
497    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
498        self.storage_collections.active_collection_metadatas()
499    }
500
501    fn active_ingestion_exports(
502        &self,
503        instance_id: StorageInstanceId,
504    ) -> Box<dyn Iterator<Item = &GlobalId> + '_> {
505        let active_storage_collections: BTreeMap<_, _> = self
506            .storage_collections
507            .active_collection_frontiers()
508            .into_iter()
509            .map(|c| (c.id, c))
510            .collect();
511
512        let active_exports = self.instances[&instance_id]
513            .active_ingestion_exports()
514            .filter(move |id| {
515                let frontiers = active_storage_collections.get(id);
516                match frontiers {
517                    Some(frontiers) => !frontiers.write_frontier.is_empty(),
518                    None => {
519                        // Not "active", so we don't care here.
520                        false
521                    }
522                }
523            });
524
525        Box::new(active_exports)
526    }
527
528    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
529        self.storage_collections.check_exists(id)
530    }
531
532    fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>) {
533        let metrics = self.metrics.for_instance(id);
534        let mut instance = Instance::new(
535            workload_class,
536            metrics,
537            self.now.clone(),
538            self.instance_response_tx.clone(),
539        );
540        if self.initialized {
541            instance.send(StorageCommand::InitializationComplete);
542        }
543        if !self.read_only {
544            instance.send(StorageCommand::AllowWrites);
545        }
546
547        let params = Box::new(self.config.parameters.clone());
548        instance.send(StorageCommand::UpdateConfiguration(params));
549
550        let old_instance = self.instances.insert(id, instance);
551        assert_none!(old_instance, "storage instance {id} already exists");
552    }
553
554    fn drop_instance(&mut self, id: StorageInstanceId) {
555        let instance = self.instances.remove(&id);
556        assert!(instance.is_some(), "storage instance {id} does not exist");
557    }
558
559    fn update_instance_workload_class(
560        &mut self,
561        id: StorageInstanceId,
562        workload_class: Option<String>,
563    ) {
564        let instance = self
565            .instances
566            .get_mut(&id)
567            .unwrap_or_else(|| panic!("instance {id} does not exist"));
568
569        instance.workload_class = workload_class;
570    }
571
572    fn connect_replica(
573        &mut self,
574        instance_id: StorageInstanceId,
575        replica_id: ReplicaId,
576        location: ClusterReplicaLocation,
577    ) {
578        let instance = self
579            .instances
580            .get_mut(&instance_id)
581            .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
582
583        let config = ReplicaConfig {
584            build_info: self.build_info,
585            location,
586            grpc_client: self.config.parameters.grpc_client.clone(),
587        };
588        instance.add_replica(replica_id, config);
589    }
590
591    fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId) {
592        let instance = self
593            .instances
594            .get_mut(&instance_id)
595            .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
596
597        let status_now = mz_ore::now::to_datetime((self.now)());
598        let mut source_status_updates = vec![];
599        let mut sink_status_updates = vec![];
600
601        let make_update = |id, object_type| StatusUpdate {
602            id,
603            status: Status::Paused,
604            timestamp: status_now,
605            error: None,
606            hints: BTreeSet::from([format!(
607                "The replica running this {object_type} has been dropped"
608            )]),
609            namespaced_errors: Default::default(),
610            replica_id: Some(replica_id),
611        };
612
613        for ingestion_id in instance.active_ingestions() {
614            if let Some(active_replicas) = self.dropped_objects.get_mut(ingestion_id) {
615                active_replicas.remove(&replica_id);
616                if active_replicas.is_empty() {
617                    self.dropped_objects.remove(ingestion_id);
618                }
619            }
620
621            let ingestion = self
622                .collections
623                .get_mut(ingestion_id)
624                .expect("instance contains unknown ingestion");
625
626            let ingestion_description = match &ingestion.data_source {
627                DataSource::Ingestion(ingestion_description) => ingestion_description.clone(),
628                _ => panic!(
629                    "unexpected data source for ingestion: {:?}",
630                    ingestion.data_source
631                ),
632            };
633
634            let old_style_ingestion = *ingestion_id != ingestion_description.remap_collection_id;
635            let subsource_ids = ingestion_description.collection_ids().filter(|id| {
636                // NOTE(aljoscha): We filter out the remap collection for old style
637                // ingestions because it doesn't get any status updates about it from the
638                // replica side. So we don't want to synthesize a 'paused' status here.
639                // New style ingestion do, since the source itself contains the remap data.
640                let should_discard =
641                    old_style_ingestion && id == &ingestion_description.remap_collection_id;
642                !should_discard
643            });
644            for id in subsource_ids {
645                source_status_updates.push(make_update(id, "source"));
646            }
647        }
648
649        for id in instance.active_exports() {
650            if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
651                active_replicas.remove(&replica_id);
652                if active_replicas.is_empty() {
653                    self.dropped_objects.remove(id);
654                }
655            }
656
657            sink_status_updates.push(make_update(*id, "sink"));
658        }
659
660        instance.drop_replica(replica_id);
661
662        if !self.read_only {
663            if !source_status_updates.is_empty() {
664                self.append_status_introspection_updates(
665                    IntrospectionType::SourceStatusHistory,
666                    source_status_updates,
667                );
668            }
669            if !sink_status_updates.is_empty() {
670                self.append_status_introspection_updates(
671                    IntrospectionType::SinkStatusHistory,
672                    sink_status_updates,
673                );
674            }
675        }
676    }
677
678    async fn evolve_nullability_for_bootstrap(
679        &mut self,
680        storage_metadata: &StorageMetadata,
681        collections: Vec<(GlobalId, RelationDesc)>,
682    ) -> Result<(), StorageError<Self::Timestamp>> {
683        let persist_client = self
684            .persist
685            .open(self.persist_location.clone())
686            .await
687            .unwrap();
688
689        for (global_id, relation_desc) in collections {
690            let shard_id = storage_metadata.get_collection_shard(global_id)?;
691            let diagnostics = Diagnostics {
692                shard_name: global_id.to_string(),
693                handle_purpose: "evolve nullability for bootstrap".to_string(),
694            };
695            let latest_schema = persist_client
696                .latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
697                .await
698                .expect("invalid persist usage");
699            let Some((schema_id, current_schema, _)) = latest_schema else {
700                tracing::debug!(?global_id, "no schema registered");
701                continue;
702            };
703            tracing::debug!(?global_id, ?current_schema, new_schema = ?relation_desc, "migrating schema");
704
705            let diagnostics = Diagnostics {
706                shard_name: global_id.to_string(),
707                handle_purpose: "evolve nullability for bootstrap".to_string(),
708            };
709            let evolve_result = persist_client
710                .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
711                    shard_id,
712                    schema_id,
713                    &relation_desc,
714                    &UnitSchema,
715                    diagnostics,
716                )
717                .await
718                .expect("invalid persist usage");
719            match evolve_result {
720                CaESchema::Ok(_) => (),
721                CaESchema::ExpectedMismatch {
722                    schema_id,
723                    key,
724                    val: _,
725                } => {
726                    return Err(StorageError::PersistSchemaEvolveRace {
727                        global_id,
728                        shard_id,
729                        schema_id,
730                        relation_desc: key,
731                    });
732                }
733                CaESchema::Incompatible => {
734                    return Err(StorageError::PersistInvalidSchemaEvolve {
735                        global_id,
736                        shard_id,
737                    });
738                }
739            };
740        }
741
742        Ok(())
743    }
744
745    /// Create and "execute" the described collection.
746    ///
747    /// "Execute" is in scare quotes because what executing a collection means
748    /// varies widely based on the type of collection you're creating.
749    ///
750    /// The general process creating a collection undergoes is:
751    /// 1. Enrich the description we get from the user with the metadata only
752    ///    the storage controller's metadata. This is mostly a matter of
753    ///    separating concerns.
754    /// 2. Generate write and read persist handles for the collection.
755    /// 3. Store the collection's metadata in the appropriate field.
756    /// 4. "Execute" the collection. What that means is contingent on the type of
757    ///    collection. so consult the code for more details.
758    ///
759    // TODO(aljoscha): It would be swell if we could refactor this Leviathan of
760    // a method/move individual parts to their own methods. @guswynn observes
761    // that a number of these operations could be moved into fns on
762    // `DataSource`.
763    #[instrument(name = "storage::create_collections")]
764    async fn create_collections_for_bootstrap(
765        &mut self,
766        storage_metadata: &StorageMetadata,
767        register_ts: Option<Self::Timestamp>,
768        mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
769        migrated_storage_collections: &BTreeSet<GlobalId>,
770    ) -> Result<(), StorageError<Self::Timestamp>> {
771        self.migrated_storage_collections
772            .extend(migrated_storage_collections.iter().cloned());
773
774        self.storage_collections
775            .create_collections_for_bootstrap(
776                storage_metadata,
777                register_ts.clone(),
778                collections.clone(),
779                migrated_storage_collections,
780            )
781            .await?;
782
783        // At this point we're connected to all the collection shards in persist. Our warming task
784        // is no longer useful, so abort it if it's still running.
785        drop(self.persist_warm_task.take());
786
787        // Validate first, to avoid corrupting state.
788        // 1. create a dropped identifier, or
789        // 2. create an existing identifier with a new description.
790        // Make sure to check for errors within `ingestions` as well.
791        collections.sort_by_key(|(id, _)| *id);
792        collections.dedup();
793        for pos in 1..collections.len() {
794            if collections[pos - 1].0 == collections[pos].0 {
795                return Err(StorageError::CollectionIdReused(collections[pos].0));
796            }
797        }
798
799        // We first enrich each collection description with some additional metadata...
800        let enriched_with_metadata = collections
801            .into_iter()
802            .map(|(id, description)| {
803                let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
804
805                // If the shard is being managed by txn-wal (initially, tables), then we need to
806                // pass along the shard id for the txns shard to dataflow rendering.
807                let txns_shard = description
808                    .data_source
809                    .in_txns()
810                    .then(|| *self.txns_read.txns_id());
811
812                let metadata = CollectionMetadata {
813                    persist_location: self.persist_location.clone(),
814                    data_shard,
815                    relation_desc: description.desc.clone(),
816                    txns_shard,
817                };
818
819                Ok((id, description, metadata))
820            })
821            .collect_vec();
822
823        // So that we can open persist handles for each collections concurrently.
824        let persist_client = self
825            .persist
826            .open(self.persist_location.clone())
827            .await
828            .unwrap();
829        let persist_client = &persist_client;
830
831        // Reborrow the `&mut self` as immutable, as all the concurrent work to be processed in
832        // this stream cannot all have exclusive access.
833        use futures::stream::{StreamExt, TryStreamExt};
834        let this = &*self;
835        let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
836            .map(|data: Result<_, StorageError<Self::Timestamp>>| {
837                async move {
838                    let (id, description, metadata) = data?;
839
840                    // should be replaced with real introspection (https://github.com/MaterializeInc/database-issues/issues/4078)
841                    // but for now, it's helpful to have this mapping written down somewhere
842                    debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
843
844                    let write = this
845                        .open_data_handles(
846                            &id,
847                            metadata.data_shard,
848                            metadata.relation_desc.clone(),
849                            persist_client,
850                        )
851                        .await;
852
853                    Ok::<_, StorageError<T>>((id, description, write, metadata))
854                }
855            })
856            // Poll each future for each collection concurrently, maximum of 50 at a time.
857            .buffer_unordered(50)
858            // HERE BE DRAGONS:
859            //
860            // There are at least 2 subtleties in using `FuturesUnordered` (which
861            // `buffer_unordered` uses underneath:
862            // - One is captured here <https://github.com/rust-lang/futures-rs/issues/2387>
863            // - And the other is deadlocking if processing an OUTPUT of a `FuturesUnordered`
864            // stream attempts to obtain an async mutex that is also obtained in the futures
865            // being polled.
866            //
867            // Both of these could potentially be issues in all usages of `buffer_unordered` in
868            // this method, so we stick the standard advice: only use `try_collect` or
869            // `collect`!
870            .try_collect()
871            .await?;
872
873        // The set of collections that we should render at the end of this
874        // function.
875        let mut to_execute = BTreeSet::new();
876        // New collections that are being created; this is distinct from the set
877        // of collections we plan to execute because
878        // `DataSource::IngestionExport` is added as a new collection, but is
879        // not executed directly.
880        let mut new_collections = BTreeSet::new();
881        let mut table_registers = Vec::with_capacity(to_register.len());
882
883        // Reorder in dependency order.
884        to_register.sort_by_key(|(id, ..)| *id);
885
886        // Register tables first, but register them in reverse order since earlier tables
887        // can depend on later tables.
888        //
889        // Note: We could do more complex sorting to avoid the allocations, but IMO it's
890        // easier to reason about it this way.
891        let (tables_to_register, collections_to_register): (Vec<_>, Vec<_>) = to_register
892            .into_iter()
893            .partition(|(_id, desc, ..)| matches!(desc.data_source, DataSource::Table { .. }));
894        let to_register = tables_to_register
895            .into_iter()
896            .rev()
897            .chain(collections_to_register.into_iter());
898
899        // Statistics need a level of indirection so we can mutably borrow
900        // `self` when registering collections and when we are inserting
901        // statistics.
902        let mut new_source_statistic_entries = BTreeSet::new();
903        let mut new_webhook_statistic_entries = BTreeSet::new();
904        let mut new_sink_statistic_entries = BTreeSet::new();
905
906        for (id, description, write, metadata) in to_register {
907            let is_in_txns = |id, metadata: &CollectionMetadata| {
908                metadata.txns_shard.is_some()
909                    && !(self.read_only && migrated_storage_collections.contains(&id))
910            };
911
912            let data_source = description.data_source;
913
914            to_execute.insert(id);
915            new_collections.insert(id);
916
917            let write_frontier = write.upper();
918
919            // Determine if this collection has another dependency.
920            let storage_dependencies = self.determine_collection_dependencies(id, &data_source)?;
921
922            let dependency_read_holds = self
923                .storage_collections
924                .acquire_read_holds(storage_dependencies)
925                .expect("can acquire read holds");
926
927            let mut dependency_since = Antichain::from_elem(T::minimum());
928            for read_hold in dependency_read_holds.iter() {
929                dependency_since.join_assign(read_hold.since());
930            }
931
932            // Assert some invariants.
933            //
934            // TODO(alter_table): Include Tables (is_in_txns) in this check. After
935            // supporting ALTER TABLE, it's now possible for a table to have a dependency
936            // and thus run this check. But tables are managed by txn-wal and thus the
937            // upper of the shard's `write_handle` generally isn't the logical upper of
938            // the shard. Instead we need to thread through the upper of the `txn` shard
939            // here so we can check this invariant.
940            if !dependency_read_holds.is_empty()
941                && !is_in_txns(id, &metadata)
942                && !matches!(&data_source, DataSource::Sink { .. })
943            {
944                // As the halt message says, this can happen when trying to come
945                // up in read-only mode and the current read-write
946                // environmentd/controller deletes a collection. We exit
947                // gracefully, which means we'll get restarted and get to try
948                // again.
949                if dependency_since.is_empty() {
950                    halt!(
951                        "dependency since frontier is empty while dependent upper \
952                        is not empty (dependent id={id}, write_frontier={:?}, dependency_read_holds={:?}), \
953                        this indicates concurrent deletion of a collection",
954                        write_frontier,
955                        dependency_read_holds,
956                    );
957                }
958
959                // The dependency since cannot be beyond the dependent (our)
960                // upper unless the collection is new. In practice, the
961                // depdenency is the remap shard of a source (export), and if
962                // the since is allowed to "catch up" to the upper, that is
963                // `upper <= since`, a restarting ingestion cannot differentiate
964                // between updates that have already been written out to the
965                // backing persist shard and updates that have yet to be
966                // written. We would write duplicate updates.
967                //
968                // If this check fails, it means that the read hold installed on
969                // the dependency was probably not upheld –– if it were, the
970                // dependency's since could not have advanced as far the
971                // dependent's upper.
972                //
973                // We don't care about the dependency since when the write
974                // frontier is empty. In that case, no-one can write down any
975                // more updates.
976                mz_ore::soft_assert_or_log!(
977                    write_frontier.elements() == &[T::minimum()]
978                        || write_frontier.is_empty()
979                        || PartialOrder::less_than(&dependency_since, write_frontier),
980                    "dependency since has advanced past dependent ({id}) upper \n
981                            dependent ({id}): upper {:?} \n
982                            dependency since {:?} \n
983                            dependency read holds: {:?}",
984                    write_frontier,
985                    dependency_since,
986                    dependency_read_holds,
987                );
988            }
989
990            // Perform data source-specific setup.
991            let mut extra_state = CollectionStateExtra::None;
992            let mut maybe_instance_id = None;
993            match &data_source {
994                DataSource::Introspection(typ) => {
995                    debug!(
996                        ?data_source, meta = ?metadata,
997                        "registering {id} with persist monotonic worker",
998                    );
999                    // We always register the collection with the collection manager,
1000                    // regardless of read-only mode. The CollectionManager itself is
1001                    // aware of read-only mode and will not attempt to write before told
1002                    // to do so.
1003                    //
1004                    self.register_introspection_collection(
1005                        id,
1006                        *typ,
1007                        write,
1008                        persist_client.clone(),
1009                    )?;
1010                }
1011                DataSource::Webhook => {
1012                    debug!(
1013                        ?data_source, meta = ?metadata,
1014                        "registering {id} with persist monotonic worker",
1015                    );
1016                    new_source_statistic_entries.insert(id);
1017                    // This collection of statistics is periodically aggregated into
1018                    // `source_statistics`.
1019                    new_webhook_statistic_entries.insert(id);
1020                    // Register the collection so our manager knows about it.
1021                    //
1022                    // NOTE: Maybe this shouldn't be in the collection manager,
1023                    // and collection manager should only be responsible for
1024                    // built-in introspection collections?
1025                    self.collection_manager
1026                        .register_append_only_collection(id, write, false, None);
1027                }
1028                DataSource::IngestionExport {
1029                    ingestion_id,
1030                    details,
1031                    data_config,
1032                } => {
1033                    debug!(
1034                        ?data_source, meta = ?metadata,
1035                        "not registering {id} with a controller persist worker",
1036                    );
1037                    // Adjust the source to contain this export.
1038                    let ingestion_state = self
1039                        .collections
1040                        .get_mut(ingestion_id)
1041                        .expect("known to exist");
1042
1043                    let instance_id = match &mut ingestion_state.data_source {
1044                        DataSource::Ingestion(ingestion_desc) => {
1045                            ingestion_desc.source_exports.insert(
1046                                id,
1047                                SourceExport {
1048                                    storage_metadata: (),
1049                                    details: details.clone(),
1050                                    data_config: data_config.clone(),
1051                                },
1052                            );
1053
1054                            // Record the ingestion's cluster ID for the
1055                            // ingestion export. This way we always have a
1056                            // record of it, even if the ingestion's collection
1057                            // description disappears.
1058                            ingestion_desc.instance_id
1059                        }
1060                        _ => unreachable!(
1061                            "SourceExport must only refer to primary sources that already exist"
1062                        ),
1063                    };
1064
1065                    // Executing the source export doesn't do anything, ensure we execute the source instead.
1066                    to_execute.remove(&id);
1067                    to_execute.insert(*ingestion_id);
1068
1069                    let ingestion_state = IngestionState {
1070                        read_capabilities: MutableAntichain::from(dependency_since.clone()),
1071                        dependency_read_holds,
1072                        derived_since: dependency_since,
1073                        write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1074                        hold_policy: ReadPolicy::step_back(),
1075                        instance_id,
1076                        hydrated_on: BTreeSet::new(),
1077                    };
1078
1079                    extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1080                    maybe_instance_id = Some(instance_id);
1081
1082                    new_source_statistic_entries.insert(id);
1083                }
1084                DataSource::Table { .. } => {
1085                    debug!(
1086                        ?data_source, meta = ?metadata,
1087                        "registering {id} with persist table worker",
1088                    );
1089                    table_registers.push((id, write));
1090                }
1091                DataSource::Progress | DataSource::Other => {
1092                    debug!(
1093                        ?data_source, meta = ?metadata,
1094                        "not registering {id} with a controller persist worker",
1095                    );
1096                }
1097                DataSource::Ingestion(ingestion_desc) => {
1098                    debug!(
1099                        ?data_source, meta = ?metadata,
1100                        "not registering {id} with a controller persist worker",
1101                    );
1102
1103                    let mut dependency_since = Antichain::from_elem(T::minimum());
1104                    for read_hold in dependency_read_holds.iter() {
1105                        dependency_since.join_assign(read_hold.since());
1106                    }
1107
1108                    let ingestion_state = IngestionState {
1109                        read_capabilities: MutableAntichain::from(dependency_since.clone()),
1110                        dependency_read_holds,
1111                        derived_since: dependency_since,
1112                        write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1113                        hold_policy: ReadPolicy::step_back(),
1114                        instance_id: ingestion_desc.instance_id,
1115                        hydrated_on: BTreeSet::new(),
1116                    };
1117
1118                    extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1119                    maybe_instance_id = Some(ingestion_desc.instance_id);
1120
1121                    new_source_statistic_entries.insert(id);
1122                }
1123                DataSource::Sink { desc } => {
1124                    let mut dependency_since = Antichain::from_elem(T::minimum());
1125                    for read_hold in dependency_read_holds.iter() {
1126                        dependency_since.join_assign(read_hold.since());
1127                    }
1128
1129                    let [self_hold, read_hold] =
1130                        dependency_read_holds.try_into().expect("two holds");
1131
1132                    let state = ExportState::new(
1133                        desc.instance_id,
1134                        read_hold,
1135                        self_hold,
1136                        write_frontier.clone(),
1137                        ReadPolicy::step_back(),
1138                    );
1139                    maybe_instance_id = Some(state.cluster_id);
1140                    extra_state = CollectionStateExtra::Export(state);
1141
1142                    new_sink_statistic_entries.insert(id);
1143                }
1144            }
1145
1146            let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(id, maybe_instance_id);
1147            let collection_state =
1148                CollectionState::new(data_source, metadata, extra_state, wallclock_lag_metrics);
1149
1150            self.collections.insert(id, collection_state);
1151        }
1152
1153        {
1154            let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1155
1156            // Webhooks don't run on clusters/replicas, so we initialize their
1157            // statistics collection here.
1158            for id in new_webhook_statistic_entries {
1159                source_statistics.webhook_statistics.entry(id).or_default();
1160            }
1161
1162            // Sources and sinks only have statistics in the collection when
1163            // there is a replica that is reporting them. No need to initialize
1164            // here.
1165        }
1166
1167        // Register the tables all in one batch.
1168        if !table_registers.is_empty() {
1169            let register_ts = register_ts
1170                .expect("caller should have provided a register_ts when creating a table");
1171
1172            if self.read_only {
1173                // In read-only mode, we use a special read-only table worker
1174                // that allows writing to migrated tables and will continually
1175                // bump their shard upper so that it tracks the txn shard upper.
1176                // We do this, so that they remain readable at a recent
1177                // timestamp, which in turn allows dataflows that depend on them
1178                // to (re-)hydrate.
1179                //
1180                // We only want to register migrated tables, though, and leave
1181                // existing tables out/never write to them in read-only mode.
1182                table_registers
1183                    .retain(|(id, _write_handle)| migrated_storage_collections.contains(id));
1184
1185                self.persist_table_worker
1186                    .register(register_ts, table_registers)
1187                    .await
1188                    .expect("table worker unexpectedly shut down");
1189            } else {
1190                self.persist_table_worker
1191                    .register(register_ts, table_registers)
1192                    .await
1193                    .expect("table worker unexpectedly shut down");
1194            }
1195        }
1196
1197        self.append_shard_mappings(new_collections.into_iter(), Diff::ONE);
1198
1199        // TODO(guswynn): perform the io in this final section concurrently.
1200        for id in to_execute {
1201            match &self.collection(id)?.data_source {
1202                DataSource::Ingestion(ingestion) => {
1203                    if !self.read_only
1204                        || (ENABLE_0DT_DEPLOYMENT_SOURCES.get(self.config.config_set())
1205                            && ingestion.desc.connection.supports_read_only())
1206                    {
1207                        self.run_ingestion(id)?;
1208                    }
1209                }
1210                DataSource::IngestionExport { .. } => unreachable!(
1211                    "ingestion exports do not execute directly, but instead schedule their source to be re-executed"
1212                ),
1213                DataSource::Introspection(_)
1214                | DataSource::Webhook
1215                | DataSource::Table { .. }
1216                | DataSource::Progress
1217                | DataSource::Other => {}
1218                DataSource::Sink { .. } => {
1219                    if !self.read_only {
1220                        self.run_export(id)?;
1221                    }
1222                }
1223            };
1224        }
1225
1226        Ok(())
1227    }
1228
1229    fn check_alter_ingestion_source_desc(
1230        &mut self,
1231        ingestion_id: GlobalId,
1232        source_desc: &SourceDesc,
1233    ) -> Result<(), StorageError<Self::Timestamp>> {
1234        let source_collection = self.collection(ingestion_id)?;
1235        let data_source = &source_collection.data_source;
1236        match &data_source {
1237            DataSource::Ingestion(cur_ingestion) => {
1238                cur_ingestion
1239                    .desc
1240                    .alter_compatible(ingestion_id, source_desc)?;
1241            }
1242            o => {
1243                tracing::info!(
1244                    "{ingestion_id} inalterable because its data source is {:?} and not an ingestion",
1245                    o
1246                );
1247                Err(AlterError { id: ingestion_id })?
1248            }
1249        }
1250
1251        Ok(())
1252    }
1253
1254    async fn alter_ingestion_connections(
1255        &mut self,
1256        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
1257    ) -> Result<(), StorageError<Self::Timestamp>> {
1258        // Also have to let StorageCollections know!
1259        self.storage_collections
1260            .alter_ingestion_connections(source_connections.clone())
1261            .await?;
1262
1263        let mut ingestions_to_run = BTreeSet::new();
1264
1265        for (id, conn) in source_connections {
1266            let collection = self
1267                .collections
1268                .get_mut(&id)
1269                .ok_or_else(|| StorageError::IdentifierMissing(id))?;
1270
1271            match &mut collection.data_source {
1272                DataSource::Ingestion(ingestion) => {
1273                    // If the connection hasn't changed, there's no sense in
1274                    // re-rendering the dataflow.
1275                    if ingestion.desc.connection != conn {
1276                        tracing::info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
1277                        ingestion.desc.connection = conn;
1278                        ingestions_to_run.insert(id);
1279                    } else {
1280                        tracing::warn!(
1281                            "update_source_connection called on {id} but the \
1282                            connection was the same"
1283                        );
1284                    }
1285                }
1286                o => {
1287                    tracing::warn!("update_source_connection called on {:?}", o);
1288                    Err(StorageError::IdentifierInvalid(id))?;
1289                }
1290            }
1291        }
1292
1293        for id in ingestions_to_run {
1294            self.run_ingestion(id)?;
1295        }
1296        Ok(())
1297    }
1298
1299    async fn alter_ingestion_export_data_configs(
1300        &mut self,
1301        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
1302    ) -> Result<(), StorageError<Self::Timestamp>> {
1303        // Also have to let StorageCollections know!
1304        self.storage_collections
1305            .alter_ingestion_export_data_configs(source_exports.clone())
1306            .await?;
1307
1308        let mut ingestions_to_run = BTreeSet::new();
1309
1310        for (source_export_id, new_data_config) in source_exports {
1311            // We need to adjust the data config on the CollectionState for
1312            // the source export collection directly
1313            let source_export_collection = self
1314                .collections
1315                .get_mut(&source_export_id)
1316                .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1317            let ingestion_id = match &mut source_export_collection.data_source {
1318                DataSource::IngestionExport {
1319                    ingestion_id,
1320                    details: _,
1321                    data_config,
1322                } => {
1323                    *data_config = new_data_config.clone();
1324                    *ingestion_id
1325                }
1326                o => {
1327                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1328                    Err(StorageError::IdentifierInvalid(source_export_id))?
1329                }
1330            };
1331            // We also need to adjust the data config on the CollectionState of the
1332            // Ingestion that the export is associated with.
1333            let ingestion_collection = self
1334                .collections
1335                .get_mut(&ingestion_id)
1336                .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
1337
1338            match &mut ingestion_collection.data_source {
1339                DataSource::Ingestion(ingestion_desc) => {
1340                    let source_export = ingestion_desc
1341                        .source_exports
1342                        .get_mut(&source_export_id)
1343                        .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1344
1345                    // If the data config hasn't changed, there's no sense in
1346                    // re-rendering the dataflow.
1347                    if source_export.data_config != new_data_config {
1348                        tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
1349                        source_export.data_config = new_data_config;
1350
1351                        ingestions_to_run.insert(ingestion_id);
1352                    } else {
1353                        tracing::warn!(
1354                            "alter_ingestion_export_data_configs called on \
1355                                    export {source_export_id} of {ingestion_id} but \
1356                                    the data config was the same"
1357                        );
1358                    }
1359                }
1360                o => {
1361                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1362                    Err(StorageError::IdentifierInvalid(ingestion_id))?
1363                }
1364            }
1365        }
1366
1367        for id in ingestions_to_run {
1368            self.run_ingestion(id)?;
1369        }
1370        Ok(())
1371    }
1372
1373    async fn alter_table_desc(
1374        &mut self,
1375        existing_collection: GlobalId,
1376        new_collection: GlobalId,
1377        new_desc: RelationDesc,
1378        expected_version: RelationVersion,
1379        register_ts: Self::Timestamp,
1380    ) -> Result<(), StorageError<Self::Timestamp>> {
1381        let data_shard = {
1382            let Controller {
1383                collections,
1384                storage_collections,
1385                ..
1386            } = self;
1387
1388            let existing = collections
1389                .get(&existing_collection)
1390                .ok_or(StorageError::IdentifierMissing(existing_collection))?;
1391            if !matches!(existing.data_source, DataSource::Table { .. }) {
1392                return Err(StorageError::IdentifierInvalid(existing_collection));
1393            }
1394
1395            // Let StorageCollections know!
1396            storage_collections
1397                .alter_table_desc(
1398                    existing_collection,
1399                    new_collection,
1400                    new_desc.clone(),
1401                    expected_version,
1402                )
1403                .await?;
1404
1405            existing.collection_metadata.data_shard.clone()
1406        };
1407
1408        let persist_client = self
1409            .persist
1410            .open(self.persist_location.clone())
1411            .await
1412            .expect("invalid persist location");
1413        let write_handle = self
1414            .open_data_handles(
1415                &existing_collection,
1416                data_shard,
1417                new_desc.clone(),
1418                &persist_client,
1419            )
1420            .await;
1421
1422        // Note: The new collection is now the "primary collection" so we specify `None` here.
1423        let collection_desc = CollectionDescription::<T>::for_table(new_desc.clone(), None);
1424        let collection_meta = CollectionMetadata {
1425            persist_location: self.persist_location.clone(),
1426            data_shard,
1427            relation_desc: new_desc.clone(),
1428            // TODO(alter_table): Support schema evolution on sources.
1429            txns_shard: Some(self.txns_read.txns_id().clone()),
1430        };
1431        // TODO(alter_table): Support schema evolution on sources.
1432        let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(new_collection, None);
1433        let collection_state = CollectionState::new(
1434            collection_desc.data_source.clone(),
1435            collection_meta,
1436            CollectionStateExtra::None,
1437            wallclock_lag_metrics,
1438        );
1439
1440        // Great! We have successfully evolved the schema of our Table, now we need to update our
1441        // in-memory data structures.
1442        self.collections.insert(new_collection, collection_state);
1443        let existing = self
1444            .collections
1445            .get_mut(&existing_collection)
1446            .expect("missing existing collection");
1447        assert!(matches!(
1448            existing.data_source,
1449            DataSource::Table { primary: None }
1450        ));
1451        existing.data_source = DataSource::Table {
1452            primary: Some(new_collection),
1453        };
1454
1455        self.persist_table_worker
1456            .register(register_ts, vec![(new_collection, write_handle)])
1457            .await
1458            .expect("table worker unexpectedly shut down");
1459
1460        self.append_shard_mappings([new_collection].into_iter(), Diff::ONE);
1461
1462        Ok(())
1463    }
1464
1465    fn export(
1466        &self,
1467        id: GlobalId,
1468    ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1469        self.collections
1470            .get(&id)
1471            .and_then(|c| match &c.extra_state {
1472                CollectionStateExtra::Export(state) => Some(state),
1473                _ => None,
1474            })
1475            .ok_or(StorageError::IdentifierMissing(id))
1476    }
1477
1478    fn export_mut(
1479        &mut self,
1480        id: GlobalId,
1481    ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1482        self.collections
1483            .get_mut(&id)
1484            .and_then(|c| match &mut c.extra_state {
1485                CollectionStateExtra::Export(state) => Some(state),
1486                _ => None,
1487            })
1488            .ok_or(StorageError::IdentifierMissing(id))
1489    }
1490
1491    /// Create a oneshot ingestion.
1492    async fn create_oneshot_ingestion(
1493        &mut self,
1494        ingestion_id: uuid::Uuid,
1495        collection_id: GlobalId,
1496        instance_id: StorageInstanceId,
1497        request: OneshotIngestionRequest,
1498        result_tx: OneshotResultCallback<ProtoBatch>,
1499    ) -> Result<(), StorageError<Self::Timestamp>> {
1500        let collection_meta = self
1501            .collections
1502            .get(&collection_id)
1503            .ok_or_else(|| StorageError::IdentifierMissing(collection_id))?
1504            .collection_metadata
1505            .clone();
1506        let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1507            // TODO(cf2): Refine this error.
1508            StorageError::Generic(anyhow::anyhow!("missing cluster {instance_id}"))
1509        })?;
1510        let oneshot_cmd = RunOneshotIngestion {
1511            ingestion_id,
1512            collection_id,
1513            collection_meta,
1514            request,
1515        };
1516
1517        if !self.read_only {
1518            instance.send(StorageCommand::RunOneshotIngestion(Box::new(oneshot_cmd)));
1519            let pending = PendingOneshotIngestion {
1520                result_tx,
1521                cluster_id: instance_id,
1522            };
1523            let novel = self
1524                .pending_oneshot_ingestions
1525                .insert(ingestion_id, pending);
1526            assert_none!(novel);
1527            Ok(())
1528        } else {
1529            Err(StorageError::ReadOnly)
1530        }
1531    }
1532
1533    fn cancel_oneshot_ingestion(
1534        &mut self,
1535        ingestion_id: uuid::Uuid,
1536    ) -> Result<(), StorageError<Self::Timestamp>> {
1537        if self.read_only {
1538            return Err(StorageError::ReadOnly);
1539        }
1540
1541        let pending = self
1542            .pending_oneshot_ingestions
1543            .remove(&ingestion_id)
1544            .ok_or_else(|| {
1545                // TODO(cf2): Refine this error.
1546                StorageError::Generic(anyhow::anyhow!("missing oneshot ingestion {ingestion_id}"))
1547            })?;
1548
1549        match self.instances.get_mut(&pending.cluster_id) {
1550            Some(instance) => {
1551                instance.send(StorageCommand::CancelOneshotIngestion(ingestion_id));
1552            }
1553            None => {
1554                mz_ore::soft_panic_or_log!(
1555                    "canceling oneshot ingestion on non-existent cluster, ingestion {:?}, instance {}",
1556                    ingestion_id,
1557                    pending.cluster_id,
1558                );
1559            }
1560        }
1561        // Respond to the user that the request has been canceled.
1562        pending.cancel();
1563
1564        Ok(())
1565    }
1566
1567    async fn alter_export(
1568        &mut self,
1569        id: GlobalId,
1570        new_description: ExportDescription<Self::Timestamp>,
1571    ) -> Result<(), StorageError<Self::Timestamp>> {
1572        let from_id = new_description.sink.from;
1573
1574        // Acquire read holds at StorageCollections to ensure that the
1575        // sinked collection is not dropped while we're sinking it.
1576        let desired_read_holds = vec![from_id.clone(), id.clone()];
1577        let [input_hold, self_hold] = self
1578            .storage_collections
1579            .acquire_read_holds(desired_read_holds)
1580            .expect("missing dependency")
1581            .try_into()
1582            .expect("expected number of holds");
1583        let from_storage_metadata = self.storage_collections.collection_metadata(from_id)?;
1584        let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1585
1586        // Check whether the sink's write frontier is beyond the read hold we got
1587        let cur_export = self.export_mut(id)?;
1588        let input_readable = cur_export
1589            .write_frontier
1590            .iter()
1591            .all(|t| input_hold.since().less_than(t));
1592        if !input_readable {
1593            return Err(StorageError::ReadBeforeSince(from_id));
1594        }
1595
1596        let new_export = ExportState {
1597            read_capabilities: cur_export.read_capabilities.clone(),
1598            cluster_id: new_description.instance_id,
1599            derived_since: cur_export.derived_since.clone(),
1600            read_holds: [input_hold, self_hold],
1601            read_policy: cur_export.read_policy.clone(),
1602            write_frontier: cur_export.write_frontier.clone(),
1603        };
1604        *cur_export = new_export;
1605
1606        let cmd = RunSinkCommand {
1607            id,
1608            description: StorageSinkDesc {
1609                from: from_id,
1610                from_desc: new_description.sink.from_desc,
1611                connection: new_description.sink.connection,
1612                envelope: new_description.sink.envelope,
1613                as_of: new_description.sink.as_of,
1614                version: new_description.sink.version,
1615                from_storage_metadata,
1616                with_snapshot: new_description.sink.with_snapshot,
1617                to_storage_metadata,
1618            },
1619        };
1620
1621        // Fetch the client for this export's cluster.
1622        let instance = self
1623            .instances
1624            .get_mut(&new_description.instance_id)
1625            .ok_or_else(|| StorageError::ExportInstanceMissing {
1626                storage_instance_id: new_description.instance_id,
1627                export_id: id,
1628            })?;
1629
1630        instance.send(StorageCommand::RunSink(Box::new(cmd)));
1631        Ok(())
1632    }
1633
1634    /// Create the sinks described by the `ExportDescription`.
1635    async fn alter_export_connections(
1636        &mut self,
1637        exports: BTreeMap<GlobalId, StorageSinkConnection>,
1638    ) -> Result<(), StorageError<Self::Timestamp>> {
1639        let mut updates_by_instance =
1640            BTreeMap::<StorageInstanceId, Vec<(RunSinkCommand<T>, ExportDescription<T>)>>::new();
1641
1642        for (id, connection) in exports {
1643            // We stage changes in new_export_description and then apply all
1644            // updates to exports at the end.
1645            //
1646            // We don't just go ahead and clone the `ExportState` itself and
1647            // update that because `ExportState` is not clone, because it holds
1648            // a `ReadHandle` and cloning that would cause additional work for
1649            // whoever guarantees those read holds.
1650            let (mut new_export_description, as_of): (ExportDescription<Self::Timestamp>, _) = {
1651                let export = &self.collections[&id];
1652                let DataSource::Sink { desc } = &export.data_source else {
1653                    panic!("export exists")
1654                };
1655                let CollectionStateExtra::Export(state) = &export.extra_state else {
1656                    panic!("export exists")
1657                };
1658                let export_description = desc.clone();
1659                let as_of = state.input_hold().since().clone();
1660
1661                (export_description, as_of)
1662            };
1663            let current_sink = new_export_description.sink.clone();
1664
1665            new_export_description.sink.connection = connection;
1666
1667            // Ensure compatibility
1668            current_sink.alter_compatible(id, &new_export_description.sink)?;
1669
1670            let from_storage_metadata = self
1671                .storage_collections
1672                .collection_metadata(new_export_description.sink.from)?;
1673            let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1674
1675            let cmd = RunSinkCommand {
1676                id,
1677                description: StorageSinkDesc {
1678                    from: new_export_description.sink.from,
1679                    from_desc: new_export_description.sink.from_desc.clone(),
1680                    connection: new_export_description.sink.connection.clone(),
1681                    envelope: new_export_description.sink.envelope,
1682                    with_snapshot: new_export_description.sink.with_snapshot,
1683                    version: new_export_description.sink.version,
1684                    // Here we are about to send a RunSinkCommand with the current read capaibility
1685                    // held by this sink. However, clusters are already running a version of the
1686                    // sink and nothing guarantees that by the time this command arrives at the
1687                    // clusters they won't have made additional progress such that this read
1688                    // capability is invalidated.
1689                    // The solution to this problem is for the controller to track specific
1690                    // executions of dataflows such that it can track the shutdown of the current
1691                    // instance and the initialization of the new instance separately and ensure
1692                    // read holds are held for the correct amount of time.
1693                    // TODO(petrosagg): change the controller to explicitly track dataflow executions
1694                    as_of: as_of.to_owned(),
1695                    from_storage_metadata,
1696                    to_storage_metadata,
1697                },
1698            };
1699
1700            let update = updates_by_instance
1701                .entry(new_export_description.instance_id)
1702                .or_default();
1703            update.push((cmd, new_export_description));
1704        }
1705
1706        for (instance_id, updates) in updates_by_instance {
1707            let mut export_updates = BTreeMap::new();
1708            let mut cmds = Vec::with_capacity(updates.len());
1709
1710            for (cmd, export_state) in updates {
1711                export_updates.insert(cmd.id, export_state);
1712                cmds.push(cmd);
1713            }
1714
1715            // Fetch the client for this exports's cluster.
1716            let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1717                StorageError::ExportInstanceMissing {
1718                    storage_instance_id: instance_id,
1719                    export_id: *export_updates
1720                        .keys()
1721                        .next()
1722                        .expect("set of exports not empty"),
1723                }
1724            })?;
1725
1726            for cmd in cmds {
1727                instance.send(StorageCommand::RunSink(Box::new(cmd)));
1728            }
1729
1730            // Update state only after all possible errors have occurred.
1731            for (id, new_export_description) in export_updates {
1732                let Some(state) = self.collections.get_mut(&id) else {
1733                    panic!("export known to exist")
1734                };
1735                let DataSource::Sink { desc } = &mut state.data_source else {
1736                    panic!("export known to exist")
1737                };
1738                *desc = new_export_description;
1739            }
1740        }
1741
1742        Ok(())
1743    }
1744
1745    // Dropping a table takes roughly the following flow:
1746    //
1747    // First determine if this is a TableWrites table or a source-fed table (an IngestionExport):
1748    //
1749    // If this is a TableWrites table:
1750    //   1. We remove the table from the persist table write worker.
1751    //   2. The table removal is awaited in an async task.
1752    //   3. A message is sent to the storage controller that the table has been removed from the
1753    //      table write worker.
1754    //   4. The controller drains all table drop messages during `process`.
1755    //   5. `process` calls `drop_sources` with the dropped tables.
1756    //
1757    // If this is an IngestionExport table:
1758    //   1. We validate the ids and then call drop_sources_unvalidated to proceed dropping.
1759    fn drop_tables(
1760        &mut self,
1761        storage_metadata: &StorageMetadata,
1762        identifiers: Vec<GlobalId>,
1763        ts: Self::Timestamp,
1764    ) -> Result<(), StorageError<Self::Timestamp>> {
1765        // Collect tables by their data_source
1766        let (table_write_ids, data_source_ids): (Vec<_>, Vec<_>) = identifiers
1767            .into_iter()
1768            .partition(|id| match self.collections[id].data_source {
1769                DataSource::Table { .. } => true,
1770                DataSource::IngestionExport { .. } | DataSource::Webhook => false,
1771                _ => panic!("identifier is not a table: {}", id),
1772            });
1773
1774        // Drop table write tables
1775        if table_write_ids.len() > 0 {
1776            let drop_notif = self
1777                .persist_table_worker
1778                .drop_handles(table_write_ids.clone(), ts);
1779            let tx = self.pending_table_handle_drops_tx.clone();
1780            mz_ore::task::spawn(|| "table-cleanup".to_string(), async move {
1781                drop_notif.await;
1782                for identifier in table_write_ids {
1783                    let _ = tx.send(identifier);
1784                }
1785            });
1786        }
1787
1788        // Drop source-fed tables
1789        if data_source_ids.len() > 0 {
1790            self.validate_collection_ids(data_source_ids.iter().cloned())?;
1791            self.drop_sources_unvalidated(storage_metadata, data_source_ids)?;
1792        }
1793
1794        Ok(())
1795    }
1796
1797    fn drop_sources(
1798        &mut self,
1799        storage_metadata: &StorageMetadata,
1800        identifiers: Vec<GlobalId>,
1801    ) -> Result<(), StorageError<Self::Timestamp>> {
1802        self.validate_collection_ids(identifiers.iter().cloned())?;
1803        self.drop_sources_unvalidated(storage_metadata, identifiers)
1804    }
1805
1806    fn drop_sources_unvalidated(
1807        &mut self,
1808        storage_metadata: &StorageMetadata,
1809        ids: Vec<GlobalId>,
1810    ) -> Result<(), StorageError<Self::Timestamp>> {
1811        // Keep track of which ingestions we have to execute still, and which we
1812        // have to change because of dropped subsources.
1813        let mut ingestions_to_execute = BTreeSet::new();
1814        let mut ingestions_to_drop = BTreeSet::new();
1815        let mut source_statistics_to_drop = Vec::new();
1816
1817        // Ingestions (and their exports) are also collections, but we keep
1818        // track of non-ingestion collections separately, because they have
1819        // slightly different cleanup logic below.
1820        let mut collections_to_drop = Vec::new();
1821
1822        for id in ids.iter() {
1823            let metadata = storage_metadata.get_collection_shard::<T>(*id);
1824            mz_ore::soft_assert_or_log!(
1825                matches!(metadata, Err(StorageError::IdentifierMissing(_))),
1826                "dropping {id}, but drop was not synchronized with storage \
1827                controller via `synchronize_collections`"
1828            );
1829
1830            let collection_state = self.collections.get(id);
1831
1832            if let Some(collection_state) = collection_state {
1833                match collection_state.data_source {
1834                    DataSource::Webhook => {
1835                        // TODO(parkmycar): The Collection Manager and PersistMonotonicWriter
1836                        // could probably use some love and maybe get merged together?
1837                        let fut = self.collection_manager.unregister_collection(*id);
1838                        mz_ore::task::spawn(|| format!("storage-webhook-cleanup-{id}"), fut);
1839
1840                        collections_to_drop.push(*id);
1841                        source_statistics_to_drop.push(*id);
1842                    }
1843                    DataSource::Ingestion(_) => {
1844                        ingestions_to_drop.insert(*id);
1845                        source_statistics_to_drop.push(*id);
1846                    }
1847                    DataSource::IngestionExport { ingestion_id, .. } => {
1848                        // If we are dropping source exports, we need to modify the
1849                        // ingestion that it runs on.
1850                        //
1851                        // If we remove this export, we need to stop producing data to
1852                        // it, so plan to re-execute the ingestion with the amended
1853                        // description.
1854                        ingestions_to_execute.insert(ingestion_id);
1855
1856                        // Adjust the source to remove this export.
1857                        let ingestion_state = match self.collections.get_mut(&ingestion_id) {
1858                            Some(ingestion_collection) => ingestion_collection,
1859                            // Primary ingestion already dropped.
1860                            None => {
1861                                tracing::error!(
1862                                    "primary source {ingestion_id} seemingly dropped before subsource {id}"
1863                                );
1864                                continue;
1865                            }
1866                        };
1867
1868                        match &mut ingestion_state.data_source {
1869                            DataSource::Ingestion(ingestion_desc) => {
1870                                let removed = ingestion_desc.source_exports.remove(id);
1871                                mz_ore::soft_assert_or_log!(
1872                                    removed.is_some(),
1873                                    "dropped subsource {id} already removed from source exports"
1874                                );
1875                            }
1876                            _ => unreachable!(
1877                                "SourceExport must only refer to primary sources that already exist"
1878                            ),
1879                        };
1880
1881                        // Ingestion exports also have ReadHolds that we need to
1882                        // downgrade, and much of their drop machinery is the
1883                        // same as for the "main" ingestion.
1884                        ingestions_to_drop.insert(*id);
1885                        source_statistics_to_drop.push(*id);
1886                    }
1887                    DataSource::Progress | DataSource::Table { .. } | DataSource::Other => {
1888                        collections_to_drop.push(*id);
1889                    }
1890                    DataSource::Introspection(_) | DataSource::Sink { .. } => {
1891                        // Collections of these types are either not sources and should be dropped
1892                        // through other means, or are sources but should never be dropped.
1893                        soft_panic_or_log!(
1894                            "drop_sources called on a {:?} (id={id}))",
1895                            collection_state.data_source,
1896                        );
1897                    }
1898                }
1899            }
1900        }
1901
1902        // Do not bother re-executing ingestions we know we plan to drop.
1903        ingestions_to_execute.retain(|id| !ingestions_to_drop.contains(id));
1904        for ingestion_id in ingestions_to_execute {
1905            self.run_ingestion(ingestion_id)?;
1906        }
1907
1908        // For ingestions, we fabricate a new hold that will propagate through
1909        // the cluster and then back to us.
1910
1911        // We don't explicitly remove read capabilities! Downgrading the
1912        // frontier of the source to `[]` (the empty Antichain), will propagate
1913        // to the storage dependencies.
1914        let ingestion_policies = ingestions_to_drop
1915            .iter()
1916            .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
1917            .collect();
1918
1919        tracing::debug!(
1920            ?ingestion_policies,
1921            "dropping sources by setting read hold policies"
1922        );
1923        self.set_hold_policies(ingestion_policies);
1924
1925        // Delete all collection->shard mappings
1926        let shards_to_update: BTreeSet<_> = ingestions_to_drop
1927            .iter()
1928            .chain(collections_to_drop.iter())
1929            .cloned()
1930            .collect();
1931        self.append_shard_mappings(shards_to_update.into_iter(), Diff::MINUS_ONE);
1932
1933        let status_now = mz_ore::now::to_datetime((self.now)());
1934        let mut status_updates = vec![];
1935        for id in ingestions_to_drop.iter() {
1936            status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
1937        }
1938
1939        if !self.read_only {
1940            self.append_status_introspection_updates(
1941                IntrospectionType::SourceStatusHistory,
1942                status_updates,
1943            );
1944        }
1945
1946        {
1947            let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1948            for id in source_statistics_to_drop {
1949                source_statistics
1950                    .source_statistics
1951                    .retain(|(stats_id, _), _| stats_id != &id);
1952                source_statistics
1953                    .webhook_statistics
1954                    .retain(|stats_id, _| stats_id != &id);
1955            }
1956        }
1957
1958        // Remove collection state
1959        for id in ingestions_to_drop.iter().chain(collections_to_drop.iter()) {
1960            tracing::info!(%id, "dropping collection state");
1961            let collection = self
1962                .collections
1963                .remove(id)
1964                .expect("list populated after checking that self.collections contains it");
1965
1966            let instance = match &collection.extra_state {
1967                CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
1968                CollectionStateExtra::Export(export) => Some(export.cluster_id()),
1969                CollectionStateExtra::None => None,
1970            }
1971            .and_then(|i| self.instances.get(&i));
1972
1973            // Record which replicas were running a collection, so that we can
1974            // match DroppedId messages against them and eventually remove state
1975            // from self.dropped_objects
1976            if let Some(instance) = instance {
1977                let active_replicas = instance.get_active_replicas_for_object(id);
1978                if !active_replicas.is_empty() {
1979                    // The remap collection of an ingestion doesn't have extra
1980                    // state and doesn't have an instance_id, but we still get
1981                    // upper updates for them, so want to make sure to populate
1982                    // dropped_ids.
1983                    // TODO(aljoscha): All this is a bit icky. But here we are
1984                    // for now...
1985                    match &collection.data_source {
1986                        DataSource::Ingestion(ingestion_desc) => {
1987                            if *id != ingestion_desc.remap_collection_id {
1988                                self.dropped_objects.insert(
1989                                    ingestion_desc.remap_collection_id,
1990                                    active_replicas.clone(),
1991                                );
1992                            }
1993                        }
1994                        _ => {}
1995                    }
1996
1997                    self.dropped_objects.insert(*id, active_replicas);
1998                }
1999            }
2000        }
2001
2002        // Also let StorageCollections know!
2003        self.storage_collections
2004            .drop_collections_unvalidated(storage_metadata, ids);
2005
2006        Ok(())
2007    }
2008
2009    /// Drops the read capability for the sinks and allows their resources to be reclaimed.
2010    fn drop_sinks(
2011        &mut self,
2012        storage_metadata: &StorageMetadata,
2013        identifiers: Vec<GlobalId>,
2014    ) -> Result<(), StorageError<Self::Timestamp>> {
2015        self.validate_export_ids(identifiers.iter().cloned())?;
2016        self.drop_sinks_unvalidated(storage_metadata, identifiers);
2017        Ok(())
2018    }
2019
2020    fn drop_sinks_unvalidated(
2021        &mut self,
2022        storage_metadata: &StorageMetadata,
2023        mut sinks_to_drop: Vec<GlobalId>,
2024    ) {
2025        // Ignore exports that have already been removed.
2026        sinks_to_drop.retain(|id| self.export(*id).is_ok());
2027
2028        // TODO: ideally we'd advance the write frontier ourselves here, but this function's
2029        // not yet marked async.
2030
2031        // We don't explicitly remove read capabilities! Downgrading the
2032        // frontier of the source to `[]` (the empty Antichain), will propagate
2033        // to the storage dependencies.
2034        let drop_policy = sinks_to_drop
2035            .iter()
2036            .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
2037            .collect();
2038
2039        tracing::debug!(
2040            ?drop_policy,
2041            "dropping sources by setting read hold policies"
2042        );
2043        self.set_hold_policies(drop_policy);
2044
2045        // Record the drop status for all sink drops.
2046        //
2047        // We also delete the items' statistics objects.
2048        //
2049        // The locks are held for a short time, only while we do some removals from a map.
2050
2051        let status_now = mz_ore::now::to_datetime((self.now)());
2052
2053        // Record the drop status for all pending sink drops.
2054        let mut status_updates = vec![];
2055        {
2056            let mut sink_statistics = self.sink_statistics.lock().expect("poisoned");
2057            for id in sinks_to_drop.iter() {
2058                status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
2059                sink_statistics.retain(|(stats_id, _), _| stats_id != id);
2060            }
2061        }
2062
2063        if !self.read_only {
2064            self.append_status_introspection_updates(
2065                IntrospectionType::SinkStatusHistory,
2066                status_updates,
2067            );
2068        }
2069
2070        // Remove collection/export state
2071        for id in sinks_to_drop.iter() {
2072            tracing::info!(%id, "dropping export state");
2073            let collection = self
2074                .collections
2075                .remove(id)
2076                .expect("list populated after checking that self.collections contains it");
2077
2078            let instance = match &collection.extra_state {
2079                CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
2080                CollectionStateExtra::Export(export) => Some(export.cluster_id()),
2081                CollectionStateExtra::None => None,
2082            }
2083            .and_then(|i| self.instances.get(&i));
2084
2085            // Record how many replicas were running an export, so that we can
2086            // match `DroppedId` messages against it and eventually remove state
2087            // from `self.dropped_objects`.
2088            if let Some(instance) = instance {
2089                let active_replicas = instance.get_active_replicas_for_object(id);
2090                if !active_replicas.is_empty() {
2091                    self.dropped_objects.insert(*id, active_replicas);
2092                }
2093            }
2094        }
2095
2096        // Also let StorageCollections know!
2097        self.storage_collections
2098            .drop_collections_unvalidated(storage_metadata, sinks_to_drop);
2099    }
2100
2101    #[instrument(level = "debug")]
2102    fn append_table(
2103        &mut self,
2104        write_ts: Self::Timestamp,
2105        advance_to: Self::Timestamp,
2106        commands: Vec<(GlobalId, Vec<TableData>)>,
2107    ) -> Result<
2108        tokio::sync::oneshot::Receiver<Result<(), StorageError<Self::Timestamp>>>,
2109        StorageError<Self::Timestamp>,
2110    > {
2111        if self.read_only {
2112            // While in read only mode, ONLY collections that have been migrated
2113            // and need to be re-hydrated in read only mode can be written to.
2114            if !commands
2115                .iter()
2116                .all(|(id, _)| id.is_system() && self.migrated_storage_collections.contains(id))
2117            {
2118                return Err(StorageError::ReadOnly);
2119            }
2120        }
2121
2122        // TODO(petrosagg): validate appends against the expected RelationDesc of the collection
2123        for (id, updates) in commands.iter() {
2124            if !updates.is_empty() {
2125                if !write_ts.less_than(&advance_to) {
2126                    return Err(StorageError::UpdateBeyondUpper(*id));
2127                }
2128            }
2129        }
2130
2131        Ok(self
2132            .persist_table_worker
2133            .append(write_ts, advance_to, commands))
2134    }
2135
2136    fn monotonic_appender(
2137        &self,
2138        id: GlobalId,
2139    ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>> {
2140        self.collection_manager.monotonic_appender(id)
2141    }
2142
2143    fn webhook_statistics(
2144        &self,
2145        id: GlobalId,
2146    ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>> {
2147        // Call to this method are usually cached so the lock is not in the critical path.
2148        let source_statistics = self.source_statistics.lock().expect("poisoned");
2149        source_statistics
2150            .webhook_statistics
2151            .get(&id)
2152            .cloned()
2153            .ok_or(StorageError::IdentifierMissing(id))
2154    }
2155
2156    async fn ready(&mut self) {
2157        if self.maintenance_scheduled {
2158            return;
2159        }
2160
2161        if !self.pending_table_handle_drops_rx.is_empty() {
2162            return;
2163        }
2164
2165        tokio::select! {
2166            Some(m) = self.instance_response_rx.recv() => {
2167                self.stashed_responses.push(m);
2168                while let Ok(m) = self.instance_response_rx.try_recv() {
2169                    self.stashed_responses.push(m);
2170                }
2171            }
2172            _ = self.maintenance_ticker.tick() => {
2173                self.maintenance_scheduled = true;
2174            },
2175        };
2176    }
2177
2178    #[instrument(level = "debug")]
2179    fn process(
2180        &mut self,
2181        storage_metadata: &StorageMetadata,
2182    ) -> Result<Option<Response<T>>, anyhow::Error> {
2183        // Perform periodic maintenance work.
2184        if self.maintenance_scheduled {
2185            self.maintain();
2186            self.maintenance_scheduled = false;
2187        }
2188
2189        for instance in self.instances.values_mut() {
2190            instance.rehydrate_failed_replicas();
2191        }
2192
2193        let mut status_updates = vec![];
2194        let mut updated_frontiers = BTreeMap::new();
2195
2196        // Take the currently stashed responses so that we can call mut receiver functions in the loop.
2197        let stashed_responses = std::mem::take(&mut self.stashed_responses);
2198        for resp in stashed_responses {
2199            match resp {
2200                (_replica_id, StorageResponse::FrontierUpper(id, upper)) => {
2201                    self.update_write_frontier(id, &upper);
2202                    updated_frontiers.insert(id, upper);
2203                }
2204                (replica_id, StorageResponse::DroppedId(id)) => {
2205                    let replica_id = replica_id.expect("DroppedId from unknown replica");
2206                    if let Some(remaining_replicas) = self.dropped_objects.get_mut(&id) {
2207                        remaining_replicas.remove(&replica_id);
2208                        if remaining_replicas.is_empty() {
2209                            self.dropped_objects.remove(&id);
2210                        }
2211                    } else {
2212                        soft_panic_or_log!("unexpected DroppedId for {id}");
2213                    }
2214                }
2215                (replica_id, StorageResponse::StatisticsUpdates(source_stats, sink_stats)) => {
2216                    // Note we only hold the locks while moving some plain-old-data around here.
2217                    {
2218                        // NOTE(aljoscha): We explicitly unwrap the `Option`,
2219                        // because we expect that stats coming from replicas
2220                        // have a replica id.
2221                        //
2222                        // If this is `None` and we would use that to access
2223                        // state below we might clobber something unexpectedly.
2224                        let replica_id = if let Some(replica_id) = replica_id {
2225                            replica_id
2226                        } else {
2227                            tracing::error!(
2228                                ?source_stats,
2229                                "missing replica_id for source statistics update"
2230                            );
2231                            continue;
2232                        };
2233
2234                        let mut shared_stats = self.source_statistics.lock().expect("poisoned");
2235
2236                        for stat in source_stats {
2237                            let collection_id = stat.id.clone();
2238
2239                            if self.collection(collection_id).is_err() {
2240                                // We can get updates for collections that have
2241                                // already been deleted, ignore those.
2242                                continue;
2243                            }
2244
2245                            let entry = shared_stats
2246                                .source_statistics
2247                                .entry((stat.id, Some(replica_id)));
2248
2249                            match entry {
2250                                btree_map::Entry::Vacant(vacant_entry) => {
2251                                    let mut stats = ControllerSourceStatistics::new(
2252                                        collection_id,
2253                                        Some(replica_id),
2254                                    );
2255                                    stats.incorporate(stat);
2256                                    vacant_entry.insert(stats);
2257                                }
2258                                btree_map::Entry::Occupied(mut occupied_entry) => {
2259                                    occupied_entry.get_mut().incorporate(stat);
2260                                }
2261                            }
2262                        }
2263                    }
2264
2265                    {
2266                        // NOTE(aljoscha); Same as above. We want to be
2267                        // explicit.
2268                        //
2269                        // Also, technically for sinks there is no webhook
2270                        // "sources" that would force us to use an `Option`. But
2271                        // we still have to use an option for other reasons: the
2272                        // scraper expects a trait for working with stats, and
2273                        // that in the end forces the sink stats map to also
2274                        // have `Option` in the key. Do I like that? No, but
2275                        // here we are.
2276                        let replica_id = if let Some(replica_id) = replica_id {
2277                            replica_id
2278                        } else {
2279                            tracing::error!(
2280                                ?sink_stats,
2281                                "missing replica_id for sink statistics update"
2282                            );
2283                            continue;
2284                        };
2285
2286                        let mut shared_stats = self.sink_statistics.lock().expect("poisoned");
2287
2288                        for stat in sink_stats {
2289                            let collection_id = stat.id.clone();
2290
2291                            if self.collection(collection_id).is_err() {
2292                                // We can get updates for collections that have
2293                                // already been deleted, ignore those.
2294                                continue;
2295                            }
2296
2297                            let entry = shared_stats.entry((stat.id, Some(replica_id)));
2298
2299                            match entry {
2300                                btree_map::Entry::Vacant(vacant_entry) => {
2301                                    let mut stats =
2302                                        ControllerSinkStatistics::new(collection_id, replica_id);
2303                                    stats.incorporate(stat);
2304                                    vacant_entry.insert(stats);
2305                                }
2306                                btree_map::Entry::Occupied(mut occupied_entry) => {
2307                                    occupied_entry.get_mut().incorporate(stat);
2308                                }
2309                            }
2310                        }
2311                    }
2312                }
2313                (replica_id, StorageResponse::StatusUpdate(mut status_update)) => {
2314                    // NOTE(aljoscha): We sniff out the hydration status for
2315                    // ingestions from status updates. This is the easiest we
2316                    // can do right now, without going deeper into changing the
2317                    // comms protocol between controller and cluster. We cannot,
2318                    // for example use `StorageResponse::FrontierUpper`,
2319                    // because those will already get sent when the ingestion is
2320                    // just being created.
2321                    //
2322                    // Sources differ in when they will report as Running. Kafka
2323                    // UPSERT sources will only switch to `Running` once their
2324                    // state has been initialized from persist, which is the
2325                    // first case that we care about right now.
2326                    //
2327                    // I wouldn't say it's ideal, but it's workable until we
2328                    // find something better.
2329                    match status_update.status {
2330                        Status::Running => {
2331                            let collection = self.collections.get_mut(&status_update.id);
2332                            match collection {
2333                                Some(collection) => {
2334                                    match collection.extra_state {
2335                                        CollectionStateExtra::Ingestion(
2336                                            ref mut ingestion_state,
2337                                        ) => {
2338                                            if ingestion_state.hydrated_on.is_empty() {
2339                                                tracing::debug!(ingestion_id = %status_update.id, "ingestion is hydrated");
2340                                            }
2341                                            ingestion_state.hydrated_on.insert(replica_id.expect(
2342                                                "replica id should be present for status running",
2343                                            ));
2344                                        }
2345                                        CollectionStateExtra::Export(_) => {
2346                                            // TODO(sinks): track sink hydration?
2347                                        }
2348                                        CollectionStateExtra::None => {
2349                                            // Nothing to do
2350                                        }
2351                                    }
2352                                }
2353                                None => (), // no collection, let's say that's fine
2354                                            // here
2355                            }
2356                        }
2357                        Status::Paused => {
2358                            let collection = self.collections.get_mut(&status_update.id);
2359                            match collection {
2360                                Some(collection) => {
2361                                    match collection.extra_state {
2362                                        CollectionStateExtra::Ingestion(
2363                                            ref mut ingestion_state,
2364                                        ) => {
2365                                            // TODO: Paused gets send when there
2366                                            // are no active replicas. We should
2367                                            // change this to send a targeted
2368                                            // Pause for each replica, and do
2369                                            // more fine-grained hydration
2370                                            // tracking here.
2371                                            tracing::debug!(ingestion_id = %status_update.id, "ingestion is now paused");
2372                                            ingestion_state.hydrated_on.clear();
2373                                        }
2374                                        CollectionStateExtra::Export(_) => {
2375                                            // TODO(sinks): track sink hydration?
2376                                        }
2377                                        CollectionStateExtra::None => {
2378                                            // Nothing to do
2379                                        }
2380                                    }
2381                                }
2382                                None => (), // no collection, let's say that's fine
2383                                            // here
2384                            }
2385                        }
2386                        _ => (),
2387                    }
2388
2389                    // Set replica_id in the status update if available
2390                    if let Some(id) = replica_id {
2391                        status_update.replica_id = Some(id);
2392                    }
2393                    status_updates.push(status_update);
2394                }
2395                (_replica_id, StorageResponse::StagedBatches(batches)) => {
2396                    for (ingestion_id, batches) in batches {
2397                        match self.pending_oneshot_ingestions.remove(&ingestion_id) {
2398                            Some(pending) => {
2399                                // Send a cancel command so our command history is correct. And to
2400                                // avoid duplicate work once we have active replication.
2401                                if let Some(instance) = self.instances.get_mut(&pending.cluster_id)
2402                                {
2403                                    instance
2404                                        .send(StorageCommand::CancelOneshotIngestion(ingestion_id));
2405                                }
2406                                // Send the results down our channel.
2407                                (pending.result_tx)(batches)
2408                            }
2409                            None => {
2410                                // We might not be tracking this oneshot ingestion anymore because
2411                                // it was canceled.
2412                            }
2413                        }
2414                    }
2415                }
2416            }
2417        }
2418
2419        self.record_status_updates(status_updates);
2420
2421        // Process dropped tables in a single batch.
2422        let mut dropped_table_ids = Vec::new();
2423        while let Ok(dropped_id) = self.pending_table_handle_drops_rx.try_recv() {
2424            dropped_table_ids.push(dropped_id);
2425        }
2426        if !dropped_table_ids.is_empty() {
2427            self.drop_sources(storage_metadata, dropped_table_ids)?;
2428        }
2429
2430        if updated_frontiers.is_empty() {
2431            Ok(None)
2432        } else {
2433            Ok(Some(Response::FrontierUpdates(
2434                updated_frontiers.into_iter().collect(),
2435            )))
2436        }
2437    }
2438
2439    async fn inspect_persist_state(
2440        &self,
2441        id: GlobalId,
2442    ) -> Result<serde_json::Value, anyhow::Error> {
2443        let collection = &self.storage_collections.collection_metadata(id)?;
2444        let client = self
2445            .persist
2446            .open(collection.persist_location.clone())
2447            .await?;
2448        let shard_state = client
2449            .inspect_shard::<Self::Timestamp>(&collection.data_shard)
2450            .await?;
2451        let json_state = serde_json::to_value(shard_state)?;
2452        Ok(json_state)
2453    }
2454
2455    fn append_introspection_updates(
2456        &mut self,
2457        type_: IntrospectionType,
2458        updates: Vec<(Row, Diff)>,
2459    ) {
2460        let id = self.introspection_ids[&type_];
2461        let updates = updates.into_iter().map(|update| update.into()).collect();
2462        self.collection_manager.blind_write(id, updates);
2463    }
2464
2465    fn append_status_introspection_updates(
2466        &mut self,
2467        type_: IntrospectionType,
2468        updates: Vec<StatusUpdate>,
2469    ) {
2470        let id = self.introspection_ids[&type_];
2471        let updates: Vec<_> = updates.into_iter().map(|update| update.into()).collect();
2472        if !updates.is_empty() {
2473            self.collection_manager.blind_write(id, updates);
2474        }
2475    }
2476
2477    fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp) {
2478        let id = self.introspection_ids[&type_];
2479        self.collection_manager.differential_write(id, op);
2480    }
2481
2482    fn append_only_introspection_tx(
2483        &self,
2484        type_: IntrospectionType,
2485    ) -> mpsc::UnboundedSender<(
2486        Vec<AppendOnlyUpdate>,
2487        oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2488    )> {
2489        let id = self.introspection_ids[&type_];
2490        self.collection_manager.append_only_write_sender(id)
2491    }
2492
2493    fn differential_introspection_tx(
2494        &self,
2495        type_: IntrospectionType,
2496    ) -> mpsc::UnboundedSender<(
2497        StorageWriteOp,
2498        oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2499    )> {
2500        let id = self.introspection_ids[&type_];
2501        self.collection_manager.differential_write_sender(id)
2502    }
2503
2504    async fn real_time_recent_timestamp(
2505        &self,
2506        timestamp_objects: BTreeSet<GlobalId>,
2507        timeout: Duration,
2508    ) -> Result<
2509        BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>,
2510        StorageError<Self::Timestamp>,
2511    > {
2512        use mz_storage_types::sources::GenericSourceConnection;
2513
2514        let mut rtr_futures = BTreeMap::new();
2515
2516        // Only user sources can be read from w/ RTR.
2517        for id in timestamp_objects.into_iter().filter(GlobalId::is_user) {
2518            let collection = match self.collection(id) {
2519                Ok(c) => c,
2520                // Not a storage item, which we accept.
2521                Err(_) => continue,
2522            };
2523
2524            let (source_conn, remap_id) = match &collection.data_source {
2525                DataSource::Ingestion(IngestionDescription {
2526                    desc: SourceDesc { connection, .. },
2527                    remap_collection_id,
2528                    ..
2529                }) => match connection {
2530                    GenericSourceConnection::Kafka(_)
2531                    | GenericSourceConnection::Postgres(_)
2532                    | GenericSourceConnection::MySql(_)
2533                    | GenericSourceConnection::SqlServer(_) => {
2534                        (connection.clone(), *remap_collection_id)
2535                    }
2536
2537                    // These internal sources do not yet (and might never)
2538                    // support RTR. However, erroring if they're selected from
2539                    // poses an annoying user experience, so instead just skip
2540                    // over them.
2541                    GenericSourceConnection::LoadGenerator(_) => continue,
2542                },
2543                // Skip over all other objects
2544                _ => {
2545                    continue;
2546                }
2547            };
2548
2549            // Prepare for getting the external system's frontier.
2550            let config = self.config().clone();
2551
2552            // Determine the remap collection we plan to read from.
2553            //
2554            // Note that the process of reading from the remap shard is the same
2555            // as other areas in this code that do the same thing, but we inline
2556            // it here because we must prove that we have not taken ownership of
2557            // `self` to move the stream of data from the remap shard into a
2558            // future.
2559            let read_handle = self.read_handle_for_snapshot(remap_id).await?;
2560
2561            // Have to acquire a read hold to prevent the since from advancing
2562            // while we read.
2563            let remap_read_hold = self
2564                .storage_collections
2565                .acquire_read_holds(vec![remap_id])
2566                .map_err(|_e| StorageError::ReadBeforeSince(remap_id))?
2567                .expect_element(|| "known to be exactly one");
2568
2569            let remap_as_of = remap_read_hold
2570                .since()
2571                .to_owned()
2572                .into_option()
2573                .ok_or(StorageError::ReadBeforeSince(remap_id))?;
2574
2575            rtr_futures.insert(
2576                id,
2577                tokio::time::timeout(timeout, async move {
2578                    use mz_storage_types::sources::SourceConnection as _;
2579
2580                    // Fetch the remap shard's contents; we must do this first so
2581                    // that the `as_of` doesn't change.
2582                    let as_of = Antichain::from_elem(remap_as_of);
2583                    let remap_subscribe = read_handle
2584                        .subscribe(as_of.clone())
2585                        .await
2586                        .map_err(|_| StorageError::ReadBeforeSince(remap_id))?;
2587
2588                    tracing::debug!(?id, type_ = source_conn.name(), upstream = ?source_conn.external_reference(), "fetching real time recency");
2589
2590                    let result = rtr::real_time_recency_ts(source_conn, id, config, as_of, remap_subscribe)
2591                        .await.map_err(|e| {
2592                            tracing::debug!(?id, "real time recency error: {:?}", e);
2593                            e
2594                        });
2595
2596                    // Drop once we have read succesfully.
2597                    drop(remap_read_hold);
2598
2599                    result
2600                }),
2601            );
2602        }
2603
2604        Ok(Box::pin(async move {
2605            let (ids, futs): (Vec<_>, Vec<_>) = rtr_futures.into_iter().unzip();
2606            ids.into_iter()
2607                .zip_eq(futures::future::join_all(futs).await)
2608                .try_fold(T::minimum(), |curr, (id, per_source_res)| {
2609                    let new =
2610                        per_source_res.map_err(|_e: Elapsed| StorageError::RtrTimeout(id))??;
2611                    Ok::<_, StorageError<Self::Timestamp>>(std::cmp::max(curr, new))
2612                })
2613        }))
2614    }
2615}
2616
2617/// Seed [`StorageTxn`] with any state required to instantiate a
2618/// [`StorageController`].
2619///
2620/// This cannot be a member of [`StorageController`] because it cannot take a
2621/// `self` parameter.
2622///
2623pub fn prepare_initialization<T>(txn: &mut dyn StorageTxn<T>) -> Result<(), StorageError<T>> {
2624    if txn.get_txn_wal_shard().is_none() {
2625        let txns_id = ShardId::new();
2626        txn.write_txn_wal_shard(txns_id)?;
2627    }
2628
2629    Ok(())
2630}
2631
2632impl<T> Controller<T>
2633where
2634    T: Timestamp
2635        + Lattice
2636        + TotalOrder
2637        + Codec64
2638        + From<EpochMillis>
2639        + TimestampManipulation
2640        + Into<Datum<'static>>,
2641    Self: StorageController<Timestamp = T>,
2642{
2643    /// Create a new storage controller from a client it should wrap.
2644    ///
2645    /// Note that when creating a new storage controller, you must also
2646    /// reconcile it with the previous state.
2647    ///
2648    /// # Panics
2649    /// If this function is called before [`prepare_initialization`].
2650    pub async fn new(
2651        build_info: &'static BuildInfo,
2652        persist_location: PersistLocation,
2653        persist_clients: Arc<PersistClientCache>,
2654        now: NowFn,
2655        wallclock_lag: WallclockLagFn<T>,
2656        txns_metrics: Arc<TxnMetrics>,
2657        read_only: bool,
2658        metrics_registry: &MetricsRegistry,
2659        controller_metrics: ControllerMetrics,
2660        connection_context: ConnectionContext,
2661        txn: &dyn StorageTxn<T>,
2662        storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
2663    ) -> Self {
2664        let txns_client = persist_clients
2665            .open(persist_location.clone())
2666            .await
2667            .expect("location should be valid");
2668
2669        let persist_warm_task = warm_persist_state_in_background(
2670            txns_client.clone(),
2671            txn.get_collection_metadata().into_values(),
2672        );
2673        let persist_warm_task = Some(persist_warm_task.abort_on_drop());
2674
2675        // This value must be already installed because we must ensure it's
2676        // durably recorded before it is used, otherwise we risk leaking persist
2677        // state.
2678        let txns_id = txn
2679            .get_txn_wal_shard()
2680            .expect("must call prepare initialization before creating storage controller");
2681
2682        let persist_table_worker = if read_only {
2683            let txns_write = txns_client
2684                .open_writer(
2685                    txns_id,
2686                    Arc::new(TxnsCodecRow::desc()),
2687                    Arc::new(UnitSchema),
2688                    Diagnostics {
2689                        shard_name: "txns".to_owned(),
2690                        handle_purpose: "follow txns upper".to_owned(),
2691                    },
2692                )
2693                .await
2694                .expect("txns schema shouldn't change");
2695            persist_handles::PersistTableWriteWorker::new_read_only_mode(txns_write)
2696        } else {
2697            let txns = TxnsHandle::open(
2698                T::minimum(),
2699                txns_client.clone(),
2700                txns_client.dyncfgs().clone(),
2701                Arc::clone(&txns_metrics),
2702                txns_id,
2703            )
2704            .await;
2705            persist_handles::PersistTableWriteWorker::new_txns(txns)
2706        };
2707        let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
2708
2709        let collection_manager = collection_mgmt::CollectionManager::new(read_only, now.clone());
2710
2711        let introspection_ids = BTreeMap::new();
2712        let introspection_tokens = Arc::new(Mutex::new(BTreeMap::new()));
2713
2714        let (statistics_interval_sender, _) =
2715            channel(mz_storage_types::parameters::STATISTICS_INTERVAL_DEFAULT);
2716
2717        let (pending_table_handle_drops_tx, pending_table_handle_drops_rx) =
2718            tokio::sync::mpsc::unbounded_channel();
2719
2720        let mut maintenance_ticker = tokio::time::interval(Duration::from_secs(1));
2721        maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2722
2723        let (instance_response_tx, instance_response_rx) = mpsc::unbounded_channel();
2724
2725        let metrics = StorageControllerMetrics::new(metrics_registry, controller_metrics);
2726
2727        let now_dt = mz_ore::now::to_datetime(now());
2728
2729        Self {
2730            build_info,
2731            collections: BTreeMap::default(),
2732            dropped_objects: Default::default(),
2733            persist_table_worker,
2734            txns_read,
2735            txns_metrics,
2736            stashed_responses: vec![],
2737            pending_table_handle_drops_tx,
2738            pending_table_handle_drops_rx,
2739            pending_oneshot_ingestions: BTreeMap::default(),
2740            collection_manager,
2741            introspection_ids,
2742            introspection_tokens,
2743            now,
2744            read_only,
2745            source_statistics: Arc::new(Mutex::new(statistics::SourceStatistics {
2746                source_statistics: BTreeMap::new(),
2747                webhook_statistics: BTreeMap::new(),
2748            })),
2749            sink_statistics: Arc::new(Mutex::new(BTreeMap::new())),
2750            statistics_interval_sender,
2751            instances: BTreeMap::new(),
2752            initialized: false,
2753            config: StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs()),
2754            persist_location,
2755            persist: persist_clients,
2756            metrics,
2757            recorded_frontiers: BTreeMap::new(),
2758            recorded_replica_frontiers: BTreeMap::new(),
2759            wallclock_lag,
2760            wallclock_lag_last_recorded: now_dt,
2761            storage_collections,
2762            migrated_storage_collections: BTreeSet::new(),
2763            maintenance_ticker,
2764            maintenance_scheduled: false,
2765            instance_response_rx,
2766            instance_response_tx,
2767            persist_warm_task,
2768        }
2769    }
2770
2771    // This is different from `set_read_policies`, which is for external users.
2772    // This method is for setting the policy that the controller uses when
2773    // maintaining the read holds that it has for collections/exports at the
2774    // StorageCollections.
2775    //
2776    // This is really only used when dropping things, where we set the
2777    // ReadPolicy to the empty Antichain.
2778    #[instrument(level = "debug")]
2779    fn set_hold_policies(&mut self, policies: Vec<(GlobalId, ReadPolicy<T>)>) {
2780        let mut read_capability_changes = BTreeMap::default();
2781
2782        for (id, policy) in policies.into_iter() {
2783            if let Some(collection) = self.collections.get_mut(&id) {
2784                let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state
2785                {
2786                    CollectionStateExtra::Ingestion(ingestion) => (
2787                        ingestion.write_frontier.borrow(),
2788                        &mut ingestion.derived_since,
2789                        &mut ingestion.hold_policy,
2790                    ),
2791                    CollectionStateExtra::None => {
2792                        unreachable!("set_hold_policies is only called for ingestions");
2793                    }
2794                    CollectionStateExtra::Export(export) => (
2795                        export.write_frontier.borrow(),
2796                        &mut export.derived_since,
2797                        &mut export.read_policy,
2798                    ),
2799                };
2800
2801                let new_derived_since = policy.frontier(write_frontier);
2802                let mut update = swap_updates(derived_since, new_derived_since);
2803                if !update.is_empty() {
2804                    read_capability_changes.insert(id, update);
2805                }
2806
2807                *hold_policy = policy;
2808            }
2809        }
2810
2811        if !read_capability_changes.is_empty() {
2812            self.update_hold_capabilities(&mut read_capability_changes);
2813        }
2814    }
2815
2816    #[instrument(level = "debug", fields(updates))]
2817    fn update_write_frontier(&mut self, id: GlobalId, new_upper: &Antichain<T>) {
2818        let mut read_capability_changes = BTreeMap::default();
2819
2820        if let Some(collection) = self.collections.get_mut(&id) {
2821            let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state {
2822                CollectionStateExtra::Ingestion(ingestion) => (
2823                    &mut ingestion.write_frontier,
2824                    &mut ingestion.derived_since,
2825                    &ingestion.hold_policy,
2826                ),
2827                CollectionStateExtra::None => {
2828                    if matches!(collection.data_source, DataSource::Progress) {
2829                        // We do get these, but can't do anything with it!
2830                    } else {
2831                        tracing::error!(
2832                            ?collection,
2833                            ?new_upper,
2834                            "updated write frontier for collection which is not an ingestion"
2835                        );
2836                    }
2837                    return;
2838                }
2839                CollectionStateExtra::Export(export) => (
2840                    &mut export.write_frontier,
2841                    &mut export.derived_since,
2842                    &export.read_policy,
2843                ),
2844            };
2845
2846            if PartialOrder::less_than(write_frontier, new_upper) {
2847                write_frontier.clone_from(new_upper);
2848            }
2849
2850            let new_derived_since = hold_policy.frontier(write_frontier.borrow());
2851            let mut update = swap_updates(derived_since, new_derived_since);
2852            if !update.is_empty() {
2853                read_capability_changes.insert(id, update);
2854            }
2855        } else if self.dropped_objects.contains_key(&id) {
2856            // We dropped an object but might still get updates from cluster
2857            // side, before it notices the drop. This is expected and fine.
2858        } else {
2859            soft_panic_or_log!("spurious upper update for {id}: {new_upper:?}");
2860        }
2861
2862        if !read_capability_changes.is_empty() {
2863            self.update_hold_capabilities(&mut read_capability_changes);
2864        }
2865    }
2866
2867    // This is different from `update_read_capabilities`, which is for external users.
2868    // This method is for maintaining the read holds that the controller has at
2869    // the StorageCollections, for storage dependencies.
2870    #[instrument(level = "debug", fields(updates))]
2871    fn update_hold_capabilities(&mut self, updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>) {
2872        // Location to record consequences that we need to act on.
2873        let mut collections_net = BTreeMap::new();
2874
2875        // We must not rely on any specific relative ordering of `GlobalId`s.
2876        // That said, it is reasonable to assume that collections generally have
2877        // greater IDs than their dependencies, so starting with the largest is
2878        // a useful optimization.
2879        while let Some(key) = updates.keys().rev().next().cloned() {
2880            let mut update = updates.remove(&key).unwrap();
2881
2882            if key.is_user() {
2883                debug!(id = %key, ?update, "update_hold_capability");
2884            }
2885
2886            if let Some(collection) = self.collections.get_mut(&key) {
2887                match &mut collection.extra_state {
2888                    CollectionStateExtra::Ingestion(ingestion) => {
2889                        let changes = ingestion.read_capabilities.update_iter(update.drain());
2890                        update.extend(changes);
2891
2892                        let (changes, frontier, _cluster_id) =
2893                            collections_net.entry(key).or_insert_with(|| {
2894                                (
2895                                    <ChangeBatch<_>>::new(),
2896                                    Antichain::new(),
2897                                    ingestion.instance_id,
2898                                )
2899                            });
2900
2901                        changes.extend(update.drain());
2902                        *frontier = ingestion.read_capabilities.frontier().to_owned();
2903                    }
2904                    CollectionStateExtra::None => {
2905                        // WIP: See if this ever panics in ci.
2906                        soft_panic_or_log!(
2907                            "trying to update holds for collection {collection:?} which is not \
2908                             an ingestion: {update:?}"
2909                        );
2910                        continue;
2911                    }
2912                    CollectionStateExtra::Export(export) => {
2913                        let changes = export.read_capabilities.update_iter(update.drain());
2914                        update.extend(changes);
2915
2916                        let (changes, frontier, _cluster_id) =
2917                            collections_net.entry(key).or_insert_with(|| {
2918                                (<ChangeBatch<_>>::new(), Antichain::new(), export.cluster_id)
2919                            });
2920
2921                        changes.extend(update.drain());
2922                        *frontier = export.read_capabilities.frontier().to_owned();
2923                    }
2924                }
2925            } else {
2926                // This is confusing and we should probably error.
2927                tracing::warn!(id = ?key, ?update, "update_hold_capabilities for unknown object");
2928            }
2929        }
2930
2931        // Translate our net compute actions into `AllowCompaction` commands and
2932        // downgrade persist sinces.
2933        for (key, (mut changes, frontier, cluster_id)) in collections_net {
2934            if !changes.is_empty() {
2935                if key.is_user() {
2936                    debug!(id = %key, ?frontier, "downgrading ingestion read holds!");
2937                }
2938
2939                let collection = self
2940                    .collections
2941                    .get_mut(&key)
2942                    .expect("missing collection state");
2943
2944                let read_holds = match &mut collection.extra_state {
2945                    CollectionStateExtra::Ingestion(ingestion) => {
2946                        ingestion.dependency_read_holds.as_mut_slice()
2947                    }
2948                    CollectionStateExtra::Export(export) => export.read_holds.as_mut_slice(),
2949                    CollectionStateExtra::None => {
2950                        soft_panic_or_log!(
2951                            "trying to downgrade read holds for collection which is not an \
2952                             ingestion: {collection:?}"
2953                        );
2954                        continue;
2955                    }
2956                };
2957
2958                for read_hold in read_holds.iter_mut() {
2959                    read_hold
2960                        .try_downgrade(frontier.clone())
2961                        .expect("we only advance the frontier");
2962                }
2963
2964                // Send AllowCompaction command directly to the instance
2965                if let Some(instance) = self.instances.get_mut(&cluster_id) {
2966                    instance.send(StorageCommand::AllowCompaction(key, frontier.clone()));
2967                } else {
2968                    soft_panic_or_log!(
2969                        "missing instance client for cluster {cluster_id} while we still have outstanding AllowCompaction command {frontier:?} for {key}"
2970                    );
2971                }
2972            }
2973        }
2974    }
2975
2976    /// Validate that a collection exists for all identifiers, and error if any do not.
2977    fn validate_collection_ids(
2978        &self,
2979        ids: impl Iterator<Item = GlobalId>,
2980    ) -> Result<(), StorageError<T>> {
2981        for id in ids {
2982            self.storage_collections.check_exists(id)?;
2983        }
2984        Ok(())
2985    }
2986
2987    /// Validate that a collection exists for all identifiers, and error if any do not.
2988    fn validate_export_ids(
2989        &self,
2990        ids: impl Iterator<Item = GlobalId>,
2991    ) -> Result<(), StorageError<T>> {
2992        for id in ids {
2993            self.export(id)?;
2994        }
2995        Ok(())
2996    }
2997
2998    /// Opens a write and critical since handles for the given `shard`.
2999    ///
3000    /// `since` is an optional `since` that the read handle will be forwarded to if it is less than
3001    /// its current since.
3002    ///
3003    /// This will `halt!` the process if we cannot successfully acquire a critical handle with our
3004    /// current epoch.
3005    async fn open_data_handles(
3006        &self,
3007        id: &GlobalId,
3008        shard: ShardId,
3009        relation_desc: RelationDesc,
3010        persist_client: &PersistClient,
3011    ) -> WriteHandle<SourceData, (), T, StorageDiff> {
3012        let diagnostics = Diagnostics {
3013            shard_name: id.to_string(),
3014            handle_purpose: format!("controller data for {}", id),
3015        };
3016
3017        let mut write = persist_client
3018            .open_writer(
3019                shard,
3020                Arc::new(relation_desc),
3021                Arc::new(UnitSchema),
3022                diagnostics.clone(),
3023            )
3024            .await
3025            .expect("invalid persist usage");
3026
3027        // N.B.
3028        // Fetch the most recent upper for the write handle. Otherwise, this may be behind
3029        // the since of the since handle. Its vital this happens AFTER we create
3030        // the since handle as it needs to be linearized with that operation. It may be true
3031        // that creating the write handle after the since handle already ensures this, but we
3032        // do this out of an abundance of caution.
3033        //
3034        // Note that this returns the upper, but also sets it on the handle to be fetched later.
3035        write.fetch_recent_upper().await;
3036
3037        write
3038    }
3039
3040    /// Registers the given introspection collection and does any preparatory
3041    /// work that we have to do before we start writing to it. This
3042    /// preparatory work will include partial truncation or other cleanup
3043    /// schemes, depending on introspection type.
3044    fn register_introspection_collection(
3045        &mut self,
3046        id: GlobalId,
3047        introspection_type: IntrospectionType,
3048        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
3049        persist_client: PersistClient,
3050    ) -> Result<(), StorageError<T>> {
3051        tracing::info!(%id, ?introspection_type, "registering introspection collection");
3052
3053        // In read-only mode we create a new shard for all migrated storage collections. So we
3054        // "trick" the write task into thinking that it's not in read-only mode so something is
3055        // advancing this new shard.
3056        let force_writable = self.read_only && self.migrated_storage_collections.contains(&id);
3057        if force_writable {
3058            assert!(id.is_system(), "unexpected non-system global id: {id:?}");
3059            info!("writing to migrated storage collection {id} in read-only mode");
3060        }
3061
3062        let prev = self.introspection_ids.insert(introspection_type, id);
3063        assert!(
3064            prev.is_none(),
3065            "cannot have multiple IDs for introspection type"
3066        );
3067
3068        let metadata = self.storage_collections.collection_metadata(id)?.clone();
3069
3070        let read_handle_fn = move || {
3071            let persist_client = persist_client.clone();
3072            let metadata = metadata.clone();
3073
3074            let fut = async move {
3075                let read_handle = persist_client
3076                    .open_leased_reader::<SourceData, (), T, StorageDiff>(
3077                        metadata.data_shard,
3078                        Arc::new(metadata.relation_desc.clone()),
3079                        Arc::new(UnitSchema),
3080                        Diagnostics {
3081                            shard_name: id.to_string(),
3082                            handle_purpose: format!("snapshot {}", id),
3083                        },
3084                        USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3085                    )
3086                    .await
3087                    .expect("invalid persist usage");
3088                read_handle
3089            };
3090
3091            fut.boxed()
3092        };
3093
3094        let recent_upper = write_handle.shared_upper();
3095
3096        match CollectionManagerKind::from(&introspection_type) {
3097            // For these, we first register the collection and then prepare it,
3098            // because the code that prepares differential collection expects to
3099            // be able to update desired state via the collection manager
3100            // already.
3101            CollectionManagerKind::Differential => {
3102                let statistics_retention_duration =
3103                    dyncfgs::STATISTICS_RETENTION_DURATION.get(self.config().config_set());
3104
3105                // These do a shallow copy.
3106                let introspection_config = DifferentialIntrospectionConfig {
3107                    recent_upper,
3108                    introspection_type,
3109                    storage_collections: Arc::clone(&self.storage_collections),
3110                    collection_manager: self.collection_manager.clone(),
3111                    source_statistics: Arc::clone(&self.source_statistics),
3112                    sink_statistics: Arc::clone(&self.sink_statistics),
3113                    statistics_interval: self.config.parameters.statistics_interval.clone(),
3114                    statistics_interval_receiver: self.statistics_interval_sender.subscribe(),
3115                    statistics_retention_duration,
3116                    metrics: self.metrics.clone(),
3117                    introspection_tokens: Arc::clone(&self.introspection_tokens),
3118                };
3119                self.collection_manager.register_differential_collection(
3120                    id,
3121                    write_handle,
3122                    read_handle_fn,
3123                    force_writable,
3124                    introspection_config,
3125                );
3126            }
3127            // For these, we first have to prepare and then register with
3128            // collection manager, because the preparation logic wants to read
3129            // the shard's contents and then do uncontested writes.
3130            //
3131            // TODO(aljoscha): We should make the truncation/cleanup work that
3132            // happens when we take over instead be a periodic thing, and make
3133            // it resilient to the upper moving concurrently.
3134            CollectionManagerKind::AppendOnly => {
3135                let introspection_config = AppendOnlyIntrospectionConfig {
3136                    introspection_type,
3137                    config_set: Arc::clone(self.config.config_set()),
3138                    parameters: self.config.parameters.clone(),
3139                    storage_collections: Arc::clone(&self.storage_collections),
3140                };
3141                self.collection_manager.register_append_only_collection(
3142                    id,
3143                    write_handle,
3144                    force_writable,
3145                    Some(introspection_config),
3146                );
3147            }
3148        }
3149
3150        Ok(())
3151    }
3152
3153    /// Remove statistics for sources/sinks that were dropped but still have statistics rows
3154    /// hanging around.
3155    fn reconcile_dangling_statistics(&self) {
3156        self.source_statistics
3157            .lock()
3158            .expect("poisoned")
3159            .source_statistics
3160            // collections should also contain subsources.
3161            .retain(|(k, _replica_id), _| self.storage_collections.check_exists(*k).is_ok());
3162        self.sink_statistics
3163            .lock()
3164            .expect("poisoned")
3165            .retain(|(k, _replica_id), _| self.export(*k).is_ok());
3166    }
3167
3168    /// Appends a new global ID, shard ID pair to the appropriate collection.
3169    /// Use a `diff` of 1 to append a new entry; -1 to retract an existing
3170    /// entry.
3171    ///
3172    /// # Panics
3173    /// - If `self.collections` does not have an entry for `global_id`.
3174    /// - If `IntrospectionType::ShardMapping`'s `GlobalId` is not registered as
3175    ///   a managed collection.
3176    /// - If diff is any value other than `1` or `-1`.
3177    #[instrument(level = "debug")]
3178    fn append_shard_mappings<I>(&self, global_ids: I, diff: Diff)
3179    where
3180        I: Iterator<Item = GlobalId>,
3181    {
3182        mz_ore::soft_assert_or_log!(
3183            diff == Diff::MINUS_ONE || diff == Diff::ONE,
3184            "use 1 for insert or -1 for delete"
3185        );
3186
3187        let id = *self
3188            .introspection_ids
3189            .get(&IntrospectionType::ShardMapping)
3190            .expect("should be registered before this call");
3191
3192        let mut updates = vec![];
3193        // Pack updates into rows
3194        let mut row_buf = Row::default();
3195
3196        for global_id in global_ids {
3197            let shard_id = if let Some(collection) = self.collections.get(&global_id) {
3198                collection.collection_metadata.data_shard.clone()
3199            } else {
3200                panic!("unknown global id: {}", global_id);
3201            };
3202
3203            let mut packer = row_buf.packer();
3204            packer.push(Datum::from(global_id.to_string().as_str()));
3205            packer.push(Datum::from(shard_id.to_string().as_str()));
3206            updates.push((row_buf.clone(), diff));
3207        }
3208
3209        self.collection_manager.differential_append(id, updates);
3210    }
3211
3212    /// Determines and returns this collection's dependencies, if any.
3213    fn determine_collection_dependencies(
3214        &self,
3215        self_id: GlobalId,
3216        data_source: &DataSource<T>,
3217    ) -> Result<Vec<GlobalId>, StorageError<T>> {
3218        let dependency = match &data_source {
3219            DataSource::Introspection(_)
3220            | DataSource::Webhook
3221            | DataSource::Table { primary: None }
3222            | DataSource::Progress
3223            | DataSource::Other => vec![],
3224            DataSource::Table {
3225                primary: Some(primary),
3226            } => vec![*primary],
3227            DataSource::IngestionExport { ingestion_id, .. } => {
3228                // Ingestion exports depend on their primary source's remap
3229                // collection.
3230                let source_collection = self.collection(*ingestion_id)?;
3231                let ingestion_remap_collection_id = match &source_collection.data_source {
3232                    DataSource::Ingestion(ingestion) => ingestion.remap_collection_id,
3233                    _ => unreachable!(
3234                        "SourceExport must only refer to primary sources that already exist"
3235                    ),
3236                };
3237
3238                // Ingestion exports (aka. subsources) must make sure that 1)
3239                // their own collection's since stays one step behind the upper,
3240                // and, 2) that the remap shard's since stays one step behind
3241                // their upper. Hence they track themselves and the remap shard
3242                // as dependencies.
3243                vec![self_id, ingestion_remap_collection_id]
3244            }
3245            // Ingestions depend on their remap collection.
3246            DataSource::Ingestion(ingestion) => {
3247                // Ingestions must make sure that 1) their own collection's
3248                // since stays one step behind the upper, and, 2) that the remap
3249                // shard's since stays one step behind their upper. Hence they
3250                // track themselves and the remap shard as dependencies.
3251                let mut dependencies = vec![self_id];
3252                if self_id != ingestion.remap_collection_id {
3253                    dependencies.push(ingestion.remap_collection_id);
3254                }
3255                dependencies
3256            }
3257            DataSource::Sink { desc } => {
3258                // Sinks hold back their own frontier and the frontier of their input.
3259                vec![self_id, desc.sink.from]
3260            }
3261        };
3262
3263        Ok(dependency)
3264    }
3265
3266    async fn read_handle_for_snapshot(
3267        &self,
3268        id: GlobalId,
3269    ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
3270        let metadata = self.storage_collections.collection_metadata(id)?;
3271        read_handle_for_snapshot(&self.persist, id, &metadata).await
3272    }
3273
3274    /// Handles writing of status updates for sources/sinks to the appropriate
3275    /// status relation
3276    fn record_status_updates(&mut self, updates: Vec<StatusUpdate>) {
3277        if self.read_only {
3278            return;
3279        }
3280
3281        let mut sink_status_updates = vec![];
3282        let mut source_status_updates = vec![];
3283
3284        for update in updates {
3285            let id = update.id;
3286            if self.export(id).is_ok() {
3287                sink_status_updates.push(update);
3288            } else if self.storage_collections.check_exists(id).is_ok() {
3289                source_status_updates.push(update);
3290            }
3291        }
3292
3293        self.append_status_introspection_updates(
3294            IntrospectionType::SourceStatusHistory,
3295            source_status_updates,
3296        );
3297        self.append_status_introspection_updates(
3298            IntrospectionType::SinkStatusHistory,
3299            sink_status_updates,
3300        );
3301    }
3302
3303    fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, StorageError<T>> {
3304        self.collections
3305            .get(&id)
3306            .ok_or(StorageError::IdentifierMissing(id))
3307    }
3308
3309    /// Runs the identified ingestion using the current definition of the
3310    /// ingestion in-memory.
3311    fn run_ingestion(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3312        tracing::info!(%id, "starting ingestion");
3313
3314        let collection = self.collection(id)?;
3315        let ingestion_description = match &collection.data_source {
3316            DataSource::Ingestion(i) => i.clone(),
3317            _ => {
3318                tracing::warn!("run_ingestion called on non-ingestion ID {}", id);
3319                Err(StorageError::IdentifierInvalid(id))?
3320            }
3321        };
3322
3323        // Enrich all of the exports with their metadata
3324        let mut source_exports = BTreeMap::new();
3325        for (export_id, export) in ingestion_description.source_exports.clone() {
3326            let export_storage_metadata = self.collection(export_id)?.collection_metadata.clone();
3327            source_exports.insert(
3328                export_id,
3329                SourceExport {
3330                    storage_metadata: export_storage_metadata,
3331                    details: export.details,
3332                    data_config: export.data_config,
3333                },
3334            );
3335        }
3336
3337        let remap_collection = self.collection(ingestion_description.remap_collection_id)?;
3338
3339        let description = IngestionDescription::<CollectionMetadata> {
3340            source_exports,
3341            remap_metadata: remap_collection.collection_metadata.clone(),
3342            // The rest of the fields are identical
3343            desc: ingestion_description.desc.clone(),
3344            instance_id: ingestion_description.instance_id,
3345            remap_collection_id: ingestion_description.remap_collection_id,
3346        };
3347
3348        let storage_instance_id = description.instance_id;
3349        // Fetch the client for this ingestion's instance.
3350        let instance = self
3351            .instances
3352            .get_mut(&storage_instance_id)
3353            .ok_or_else(|| StorageError::IngestionInstanceMissing {
3354                storage_instance_id,
3355                ingestion_id: id,
3356            })?;
3357
3358        let augmented_ingestion = Box::new(RunIngestionCommand { id, description });
3359        instance.send(StorageCommand::RunIngestion(augmented_ingestion));
3360
3361        Ok(())
3362    }
3363
3364    /// Runs the identified export using the current definition of the export
3365    /// that we have in memory.
3366    fn run_export(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3367        let DataSource::Sink { desc: description } = &self.collections[&id].data_source else {
3368            return Err(StorageError::IdentifierMissing(id));
3369        };
3370
3371        let from_storage_metadata = self
3372            .storage_collections
3373            .collection_metadata(description.sink.from)?;
3374        let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
3375
3376        // Choose an as-of frontier for this execution of the sink. If the write frontier of the sink
3377        // is strictly larger than its read hold, it must have at least written out its snapshot, and we can skip
3378        // reading it; otherwise assume we may have to replay from the beginning.
3379        let export_state = self.storage_collections.collection_frontiers(id)?;
3380        let mut as_of = description.sink.as_of.clone();
3381        as_of.join_assign(&export_state.implied_capability);
3382        let with_snapshot = description.sink.with_snapshot
3383            && !PartialOrder::less_than(&as_of, &export_state.write_frontier);
3384
3385        info!(
3386            sink_id = %id,
3387            from_id = %description.sink.from,
3388            write_frontier = ?export_state.write_frontier,
3389            ?as_of,
3390            ?with_snapshot,
3391            "run_export"
3392        );
3393
3394        let cmd = RunSinkCommand {
3395            id,
3396            description: StorageSinkDesc {
3397                from: description.sink.from,
3398                from_desc: description.sink.from_desc.clone(),
3399                connection: description.sink.connection.clone(),
3400                envelope: description.sink.envelope,
3401                as_of,
3402                version: description.sink.version,
3403                from_storage_metadata,
3404                with_snapshot,
3405                to_storage_metadata,
3406            },
3407        };
3408
3409        let storage_instance_id = description.instance_id.clone();
3410
3411        let instance = self
3412            .instances
3413            .get_mut(&storage_instance_id)
3414            .ok_or_else(|| StorageError::ExportInstanceMissing {
3415                storage_instance_id,
3416                export_id: id,
3417            })?;
3418
3419        instance.send(StorageCommand::RunSink(Box::new(cmd)));
3420
3421        Ok(())
3422    }
3423
3424    /// Update introspection with the current frontiers of storage objects.
3425    ///
3426    /// This method is invoked by `Controller::maintain`, which we expect to be called once per
3427    /// second during normal operation.
3428    fn update_frontier_introspection(&mut self) {
3429        let mut global_frontiers = BTreeMap::new();
3430        let mut replica_frontiers = BTreeMap::new();
3431
3432        for collection_frontiers in self.storage_collections.active_collection_frontiers() {
3433            let id = collection_frontiers.id;
3434            let since = collection_frontiers.read_capabilities;
3435            let upper = collection_frontiers.write_frontier;
3436
3437            let instance = self
3438                .collections
3439                .get(&id)
3440                .and_then(|collection_state| match &collection_state.extra_state {
3441                    CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
3442                    CollectionStateExtra::Export(export) => Some(export.cluster_id()),
3443                    CollectionStateExtra::None => None,
3444                })
3445                .and_then(|i| self.instances.get(&i));
3446
3447            if let Some(instance) = instance {
3448                for replica_id in instance.replica_ids() {
3449                    replica_frontiers.insert((id, replica_id), upper.clone());
3450                }
3451            }
3452
3453            global_frontiers.insert(id, (since, upper));
3454        }
3455
3456        let mut global_updates = Vec::new();
3457        let mut replica_updates = Vec::new();
3458
3459        let mut push_global_update =
3460            |id: GlobalId, (since, upper): (Antichain<T>, Antichain<T>), diff: Diff| {
3461                let read_frontier = since.into_option().map_or(Datum::Null, |t| t.into());
3462                let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3463                let row = Row::pack_slice(&[
3464                    Datum::String(&id.to_string()),
3465                    read_frontier,
3466                    write_frontier,
3467                ]);
3468                global_updates.push((row, diff));
3469            };
3470
3471        let mut push_replica_update =
3472            |(id, replica_id): (GlobalId, ReplicaId), upper: Antichain<T>, diff: Diff| {
3473                let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3474                let row = Row::pack_slice(&[
3475                    Datum::String(&id.to_string()),
3476                    Datum::String(&replica_id.to_string()),
3477                    write_frontier,
3478                ]);
3479                replica_updates.push((row, diff));
3480            };
3481
3482        let mut old_global_frontiers =
3483            std::mem::replace(&mut self.recorded_frontiers, global_frontiers);
3484        for (&id, new) in &self.recorded_frontiers {
3485            match old_global_frontiers.remove(&id) {
3486                Some(old) if &old != new => {
3487                    push_global_update(id, new.clone(), Diff::ONE);
3488                    push_global_update(id, old, Diff::MINUS_ONE);
3489                }
3490                Some(_) => (),
3491                None => push_global_update(id, new.clone(), Diff::ONE),
3492            }
3493        }
3494        for (id, old) in old_global_frontiers {
3495            push_global_update(id, old, Diff::MINUS_ONE);
3496        }
3497
3498        let mut old_replica_frontiers =
3499            std::mem::replace(&mut self.recorded_replica_frontiers, replica_frontiers);
3500        for (&key, new) in &self.recorded_replica_frontiers {
3501            match old_replica_frontiers.remove(&key) {
3502                Some(old) if &old != new => {
3503                    push_replica_update(key, new.clone(), Diff::ONE);
3504                    push_replica_update(key, old, Diff::MINUS_ONE);
3505                }
3506                Some(_) => (),
3507                None => push_replica_update(key, new.clone(), Diff::ONE),
3508            }
3509        }
3510        for (key, old) in old_replica_frontiers {
3511            push_replica_update(key, old, Diff::MINUS_ONE);
3512        }
3513
3514        let id = self.introspection_ids[&IntrospectionType::Frontiers];
3515        self.collection_manager
3516            .differential_append(id, global_updates);
3517
3518        let id = self.introspection_ids[&IntrospectionType::ReplicaFrontiers];
3519        self.collection_manager
3520            .differential_append(id, replica_updates);
3521    }
3522
3523    /// Refresh the wallclock lag introspection and metrics with the current lag values.
3524    ///
3525    /// This method produces wallclock lag metrics of two different shapes:
3526    ///
3527    /// * Histories: For each replica and each collection, we measure the lag of the write frontier
3528    ///   behind the wallclock time every second. Every minute we emit the maximum lag observed
3529    ///   over the last minute, together with the current time.
3530    /// * Histograms: For each collection, we measure the lag of the write frontier behind
3531    ///   wallclock time every second. Every minute we emit all lags observed over the last minute,
3532    ///   together with the current histogram period.
3533    ///
3534    /// Histories are emitted to both Mz introspection and Prometheus, histograms only to
3535    /// introspection. We treat lags of unreadable collections (i.e. collections that contain no
3536    /// readable times) as undefined and set them to NULL in introspection and `u64::MAX` in
3537    /// Prometheus.
3538    ///
3539    /// This method is invoked by `Controller::maintain`, which we expect to be called once per
3540    /// second during normal operation.
3541    fn refresh_wallclock_lag(&mut self) {
3542        let now_ms = (self.now)();
3543        let histogram_period =
3544            WallclockLagHistogramPeriod::from_epoch_millis(now_ms, self.config.config_set());
3545
3546        let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
3547            Some(ts) => (self.wallclock_lag)(ts.clone()),
3548            None => Duration::ZERO,
3549        };
3550
3551        for frontiers in self.storage_collections.active_collection_frontiers() {
3552            let id = frontiers.id;
3553            let Some(collection) = self.collections.get_mut(&id) else {
3554                continue;
3555            };
3556
3557            let collection_unreadable =
3558                PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
3559            let lag = if collection_unreadable {
3560                WallclockLag::Undefined
3561            } else {
3562                let lag = frontier_lag(&frontiers.write_frontier);
3563                WallclockLag::Seconds(lag.as_secs())
3564            };
3565
3566            collection.wallclock_lag_max = collection.wallclock_lag_max.max(lag);
3567
3568            // No way to specify values as undefined in Prometheus metrics, so we use the
3569            // maximum value instead.
3570            let secs = lag.unwrap_seconds_or(u64::MAX);
3571            collection.wallclock_lag_metrics.observe(secs);
3572
3573            if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
3574                let bucket = lag.map_seconds(|secs| secs.next_power_of_two());
3575
3576                let instance_id = match &collection.extra_state {
3577                    CollectionStateExtra::Ingestion(i) => Some(i.instance_id),
3578                    CollectionStateExtra::Export(e) => Some(e.cluster_id()),
3579                    CollectionStateExtra::None => None,
3580                };
3581                let workload_class = instance_id
3582                    .and_then(|id| self.instances.get(&id))
3583                    .and_then(|i| i.workload_class.clone());
3584                let labels = match workload_class {
3585                    Some(wc) => [("workload_class", wc.clone())].into(),
3586                    None => BTreeMap::new(),
3587                };
3588
3589                let key = (histogram_period, bucket, labels);
3590                *stash.entry(key).or_default() += Diff::ONE;
3591            }
3592        }
3593
3594        // Record lags to persist, if it's time.
3595        self.maybe_record_wallclock_lag();
3596    }
3597
3598    /// Produce new wallclock lag introspection updates, provided enough time has passed since the
3599    /// last recording.
3600    ///
3601    /// We emit new introspection updates if the system time has passed into a new multiple of the
3602    /// recording interval (typically 1 minute) since the last refresh. The compute controller uses
3603    /// the same approach, ensuring that both controllers commit their lags at roughly the same
3604    /// time, avoiding confusion caused by inconsistencies.
3605    fn maybe_record_wallclock_lag(&mut self) {
3606        if self.read_only {
3607            return;
3608        }
3609
3610        let duration_trunc = |datetime: DateTime<_>, interval| {
3611            let td = TimeDelta::from_std(interval).ok()?;
3612            datetime.duration_trunc(td).ok()
3613        };
3614
3615        let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(self.config.config_set());
3616        let now_dt = mz_ore::now::to_datetime((self.now)());
3617        let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
3618            soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
3619            let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
3620            duration_trunc(now_dt, *default).unwrap()
3621        });
3622        if now_trunc <= self.wallclock_lag_last_recorded {
3623            return;
3624        }
3625
3626        let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
3627
3628        let mut history_updates = Vec::new();
3629        let mut histogram_updates = Vec::new();
3630        let mut row_buf = Row::default();
3631        for frontiers in self.storage_collections.active_collection_frontiers() {
3632            let id = frontiers.id;
3633            let Some(collection) = self.collections.get_mut(&id) else {
3634                continue;
3635            };
3636
3637            let max_lag = std::mem::replace(&mut collection.wallclock_lag_max, WallclockLag::MIN);
3638            let row = Row::pack_slice(&[
3639                Datum::String(&id.to_string()),
3640                Datum::Null,
3641                max_lag.into_interval_datum(),
3642                Datum::TimestampTz(now_ts),
3643            ]);
3644            history_updates.push((row, Diff::ONE));
3645
3646            let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
3647                continue;
3648            };
3649
3650            for ((period, lag, labels), count) in std::mem::take(stash) {
3651                let mut packer = row_buf.packer();
3652                packer.extend([
3653                    Datum::TimestampTz(period.start),
3654                    Datum::TimestampTz(period.end),
3655                    Datum::String(&id.to_string()),
3656                    lag.into_uint64_datum(),
3657                ]);
3658                let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
3659                packer.push_dict(labels);
3660
3661                histogram_updates.push((row_buf.clone(), count));
3662            }
3663        }
3664
3665        if !history_updates.is_empty() {
3666            self.append_introspection_updates(
3667                IntrospectionType::WallclockLagHistory,
3668                history_updates,
3669            );
3670        }
3671        if !histogram_updates.is_empty() {
3672            self.append_introspection_updates(
3673                IntrospectionType::WallclockLagHistogram,
3674                histogram_updates,
3675            );
3676        }
3677
3678        self.wallclock_lag_last_recorded = now_trunc;
3679    }
3680
3681    /// Run periodic tasks.
3682    ///
3683    /// This method is invoked roughly once per second during normal operation. It is a good place
3684    /// for tasks that need to run periodically, such as state cleanup or updating of metrics.
3685    fn maintain(&mut self) {
3686        self.update_frontier_introspection();
3687        self.refresh_wallclock_lag();
3688
3689        // Perform instance maintenance work.
3690        for instance in self.instances.values_mut() {
3691            instance.refresh_state_metrics();
3692        }
3693    }
3694}
3695
3696impl From<&IntrospectionType> for CollectionManagerKind {
3697    fn from(value: &IntrospectionType) -> Self {
3698        match value {
3699            IntrospectionType::ShardMapping
3700            | IntrospectionType::Frontiers
3701            | IntrospectionType::ReplicaFrontiers
3702            | IntrospectionType::StorageSourceStatistics
3703            | IntrospectionType::StorageSinkStatistics
3704            | IntrospectionType::ComputeDependencies
3705            | IntrospectionType::ComputeOperatorHydrationStatus
3706            | IntrospectionType::ComputeMaterializedViewRefreshes
3707            | IntrospectionType::ComputeErrorCounts
3708            | IntrospectionType::ComputeHydrationTimes => CollectionManagerKind::Differential,
3709
3710            IntrospectionType::SourceStatusHistory
3711            | IntrospectionType::SinkStatusHistory
3712            | IntrospectionType::PrivatelinkConnectionStatusHistory
3713            | IntrospectionType::ReplicaStatusHistory
3714            | IntrospectionType::ReplicaMetricsHistory
3715            | IntrospectionType::WallclockLagHistory
3716            | IntrospectionType::WallclockLagHistogram
3717            | IntrospectionType::PreparedStatementHistory
3718            | IntrospectionType::StatementExecutionHistory
3719            | IntrospectionType::SessionHistory
3720            | IntrospectionType::StatementLifecycleHistory
3721            | IntrospectionType::SqlText => CollectionManagerKind::AppendOnly,
3722        }
3723    }
3724}
3725
3726/// Get the current rows in the given statistics table. This is used to bootstrap
3727/// the statistics tasks.
3728///
3729// TODO(guswynn): we need to be more careful about the update time we get here:
3730// <https://github.com/MaterializeInc/database-issues/issues/7564>
3731async fn snapshot_statistics<T>(
3732    id: GlobalId,
3733    upper: Antichain<T>,
3734    storage_collections: &Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
3735) -> Vec<Row>
3736where
3737    T: Codec64 + From<EpochMillis> + TimestampManipulation,
3738{
3739    match upper.as_option() {
3740        Some(f) if f > &T::minimum() => {
3741            let as_of = f.step_back().unwrap();
3742
3743            let snapshot = storage_collections.snapshot(id, as_of).await.unwrap();
3744            snapshot
3745                .into_iter()
3746                .map(|(row, diff)| {
3747                    assert_eq!(diff, 1);
3748                    row
3749                })
3750                .collect()
3751        }
3752        // If collection is closed or the frontier is the minimum, we cannot
3753        // or don't need to truncate (respectively).
3754        _ => Vec::new(),
3755    }
3756}
3757
3758async fn read_handle_for_snapshot<T>(
3759    persist: &PersistClientCache,
3760    id: GlobalId,
3761    metadata: &CollectionMetadata,
3762) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>>
3763where
3764    T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
3765{
3766    let persist_client = persist
3767        .open(metadata.persist_location.clone())
3768        .await
3769        .unwrap();
3770
3771    // We create a new read handle every time someone requests a snapshot and then immediately
3772    // expire it instead of keeping a read handle permanently in our state to avoid having it
3773    // heartbeat continously. The assumption is that calls to snapshot are rare and therefore
3774    // worth it to always create a new handle.
3775    let read_handle = persist_client
3776        .open_leased_reader::<SourceData, (), _, _>(
3777            metadata.data_shard,
3778            Arc::new(metadata.relation_desc.clone()),
3779            Arc::new(UnitSchema),
3780            Diagnostics {
3781                shard_name: id.to_string(),
3782                handle_purpose: format!("snapshot {}", id),
3783            },
3784            USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3785        )
3786        .await
3787        .expect("invalid persist usage");
3788    Ok(read_handle)
3789}
3790
3791/// State maintained about individual collections.
3792#[derive(Debug)]
3793struct CollectionState<T: TimelyTimestamp> {
3794    /// The source of this collection's data.
3795    pub data_source: DataSource<T>,
3796
3797    pub collection_metadata: CollectionMetadata,
3798
3799    pub extra_state: CollectionStateExtra<T>,
3800
3801    /// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update.
3802    wallclock_lag_max: WallclockLag,
3803    /// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
3804    /// introspection update.
3805    ///
3806    /// Keys are `(period, lag, labels)` triples, values are counts.
3807    ///
3808    /// If this is `None`, wallclock lag is not tracked for this collection.
3809    wallclock_lag_histogram_stash: Option<
3810        BTreeMap<
3811            (
3812                WallclockLagHistogramPeriod,
3813                WallclockLag,
3814                BTreeMap<&'static str, String>,
3815            ),
3816            Diff,
3817        >,
3818    >,
3819    /// Frontier wallclock lag metrics tracked for this collection.
3820    wallclock_lag_metrics: WallclockLagMetrics,
3821}
3822
3823impl<T: TimelyTimestamp> CollectionState<T> {
3824    fn new(
3825        data_source: DataSource<T>,
3826        collection_metadata: CollectionMetadata,
3827        extra_state: CollectionStateExtra<T>,
3828        wallclock_lag_metrics: WallclockLagMetrics,
3829    ) -> Self {
3830        // Only collect wallclock lag histogram data for collections written by storage, to avoid
3831        // duplicate measurements. Collections written by other components (e.g. compute) have
3832        // their wallclock lags recorded by these components.
3833        let wallclock_lag_histogram_stash = match &data_source {
3834            DataSource::Other => None,
3835            _ => Some(Default::default()),
3836        };
3837
3838        Self {
3839            data_source,
3840            collection_metadata,
3841            extra_state,
3842            wallclock_lag_max: WallclockLag::MIN,
3843            wallclock_lag_histogram_stash,
3844            wallclock_lag_metrics,
3845        }
3846    }
3847}
3848
3849/// Additional state that the controller maintains for select collection types.
3850#[derive(Debug)]
3851enum CollectionStateExtra<T: TimelyTimestamp> {
3852    Ingestion(IngestionState<T>),
3853    Export(ExportState<T>),
3854    None,
3855}
3856
3857/// State maintained about ingestions and ingestion exports
3858#[derive(Debug)]
3859struct IngestionState<T: TimelyTimestamp> {
3860    /// Really only for keeping track of changes to the `derived_since`.
3861    pub read_capabilities: MutableAntichain<T>,
3862
3863    /// The current since frontier, derived from `write_frontier` using
3864    /// `hold_policy`.
3865    pub derived_since: Antichain<T>,
3866
3867    /// Holds that this ingestion (or ingestion export) has on its dependencies.
3868    pub dependency_read_holds: Vec<ReadHold<T>>,
3869
3870    /// Reported write frontier.
3871    pub write_frontier: Antichain<T>,
3872
3873    /// The policy that drives how we downgrade our read hold. That is how we
3874    /// derive our since from our upper.
3875    ///
3876    /// This is a _storage-controller-internal_ policy used to derive its
3877    /// personal read hold on the collection. It should not be confused with any
3878    /// read policies that the adapter might install at [StorageCollections].
3879    pub hold_policy: ReadPolicy<T>,
3880
3881    /// The ID of the instance in which the ingestion is running.
3882    pub instance_id: StorageInstanceId,
3883
3884    /// Set of replica IDs on which this ingestion is hydrated.
3885    pub hydrated_on: BTreeSet<ReplicaId>,
3886}
3887
3888/// A description of a status history collection.
3889///
3890/// Used to inform partial truncation, see
3891/// [`collection_mgmt::partially_truncate_status_history`].
3892struct StatusHistoryDesc<K> {
3893    retention_policy: StatusHistoryRetentionPolicy,
3894    extract_key: Box<dyn Fn(&[Datum]) -> K + Send>,
3895    extract_time: Box<dyn Fn(&[Datum]) -> CheckedTimestamp<DateTime<Utc>> + Send>,
3896}
3897enum StatusHistoryRetentionPolicy {
3898    // Truncates everything but the last N updates for each key.
3899    LastN(usize),
3900    // Truncates everything past the time window for each key.
3901    TimeWindow(Duration),
3902}
3903
3904fn source_status_history_desc(
3905    params: &StorageParameters,
3906) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3907    let desc = &MZ_SOURCE_STATUS_HISTORY_DESC;
3908    let (source_id_idx, _) = desc.get_by_name(&"source_id".into()).expect("exists");
3909    let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3910    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3911
3912    StatusHistoryDesc {
3913        retention_policy: StatusHistoryRetentionPolicy::LastN(
3914            params.keep_n_source_status_history_entries,
3915        ),
3916        extract_key: Box::new(move |datums| {
3917            (
3918                GlobalId::from_str(datums[source_id_idx].unwrap_str()).expect("GlobalId column"),
3919                if datums[replica_id_idx].is_null() {
3920                    None
3921                } else {
3922                    Some(
3923                        ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3924                            .expect("ReplicaId column"),
3925                    )
3926                },
3927            )
3928        }),
3929        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3930    }
3931}
3932
3933fn sink_status_history_desc(
3934    params: &StorageParameters,
3935) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3936    let desc = &MZ_SINK_STATUS_HISTORY_DESC;
3937    let (sink_id_idx, _) = desc.get_by_name(&"sink_id".into()).expect("exists");
3938    let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3939    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3940
3941    StatusHistoryDesc {
3942        retention_policy: StatusHistoryRetentionPolicy::LastN(
3943            params.keep_n_sink_status_history_entries,
3944        ),
3945        extract_key: Box::new(move |datums| {
3946            (
3947                GlobalId::from_str(datums[sink_id_idx].unwrap_str()).expect("GlobalId column"),
3948                if datums[replica_id_idx].is_null() {
3949                    None
3950                } else {
3951                    Some(
3952                        ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3953                            .expect("ReplicaId column"),
3954                    )
3955                },
3956            )
3957        }),
3958        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3959    }
3960}
3961
3962fn privatelink_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<GlobalId> {
3963    let desc = &MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC;
3964    let (key_idx, _) = desc.get_by_name(&"connection_id".into()).expect("exists");
3965    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3966
3967    StatusHistoryDesc {
3968        retention_policy: StatusHistoryRetentionPolicy::LastN(
3969            params.keep_n_privatelink_status_history_entries,
3970        ),
3971        extract_key: Box::new(move |datums| {
3972            GlobalId::from_str(datums[key_idx].unwrap_str()).expect("GlobalId column")
3973        }),
3974        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3975    }
3976}
3977
3978fn replica_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<(GlobalId, u64)> {
3979    let desc = &REPLICA_STATUS_HISTORY_DESC;
3980    let (replica_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3981    let (process_idx, _) = desc.get_by_name(&"process_id".into()).expect("exists");
3982    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3983
3984    StatusHistoryDesc {
3985        retention_policy: StatusHistoryRetentionPolicy::TimeWindow(
3986            params.replica_status_history_retention_window,
3987        ),
3988        extract_key: Box::new(move |datums| {
3989            (
3990                GlobalId::from_str(datums[replica_idx].unwrap_str()).expect("GlobalId column"),
3991                datums[process_idx].unwrap_uint64(),
3992            )
3993        }),
3994        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3995    }
3996}
3997
3998/// Replace one antichain with another, tracking the overall changes in the returned `ChangeBatch`.
3999fn swap_updates<T: Timestamp>(
4000    from: &mut Antichain<T>,
4001    mut replace_with: Antichain<T>,
4002) -> ChangeBatch<T> {
4003    let mut update = ChangeBatch::new();
4004    if PartialOrder::less_equal(from, &replace_with) {
4005        update.extend(replace_with.iter().map(|time| (time.clone(), 1)));
4006        std::mem::swap(from, &mut replace_with);
4007        update.extend(replace_with.iter().map(|time| (time.clone(), -1)));
4008    }
4009    update
4010}