mz_storage_controller/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of the storage controller trait.
11
12use std::any::Any;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::{Debug, Display};
15use std::num::NonZeroI64;
16use std::str::FromStr;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use async_trait::async_trait;
21use chrono::{DateTime, DurationRound, TimeDelta, Utc};
22use derivative::Derivative;
23use differential_dataflow::lattice::Lattice;
24use futures::FutureExt;
25use futures::StreamExt;
26use itertools::Itertools;
27use mz_build_info::BuildInfo;
28use mz_cluster_client::client::ClusterReplicaLocation;
29use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
30use mz_cluster_client::{ReplicaId, WallclockLagFn};
31use mz_controller_types::dyncfgs::{
32    ENABLE_0DT_DEPLOYMENT_SOURCES, ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION,
33    WALLCLOCK_LAG_RECORDING_INTERVAL,
34};
35use mz_ore::collections::CollectionExt;
36use mz_ore::metrics::MetricsRegistry;
37use mz_ore::now::{EpochMillis, NowFn};
38use mz_ore::task::AbortOnDropHandle;
39use mz_ore::{assert_none, halt, instrument, soft_panic_or_log};
40use mz_persist_client::batch::ProtoBatch;
41use mz_persist_client::cache::PersistClientCache;
42use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
43use mz_persist_client::read::ReadHandle;
44use mz_persist_client::schema::CaESchema;
45use mz_persist_client::write::WriteHandle;
46use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
47use mz_persist_types::Codec64;
48use mz_persist_types::codec_impls::UnitSchema;
49use mz_proto::RustType;
50use mz_repr::adt::timestamp::CheckedTimestamp;
51use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
52use mz_storage_client::client::{
53    AppendOnlyUpdate, ProtoStorageCommand, ProtoStorageResponse, RunIngestionCommand,
54    RunOneshotIngestion, RunSinkCommand, Status, StatusUpdate, StorageCommand, StorageResponse,
55    TableData,
56};
57use mz_storage_client::controller::{
58    BoxFuture, CollectionDescription, DataSource, ExportDescription, ExportState,
59    IntrospectionType, MonotonicAppender, PersistEpoch, Response, StorageController,
60    StorageMetadata, StorageTxn, StorageWriteOp, WallclockLag, WallclockLagHistogramPeriod,
61};
62use mz_storage_client::healthcheck::{
63    MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC,
64    MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_STATUS_HISTORY_DESC,
65};
66use mz_storage_client::metrics::StorageControllerMetrics;
67use mz_storage_client::statistics::{
68    SinkStatisticsUpdate, SourceStatisticsUpdate, WebhookStatistics,
69};
70use mz_storage_client::storage_collections::StorageCollections;
71use mz_storage_types::configuration::StorageConfiguration;
72use mz_storage_types::connections::ConnectionContext;
73use mz_storage_types::connections::inline::InlinedConnection;
74use mz_storage_types::controller::{AlterError, CollectionMetadata, StorageError, TxnsCodecRow};
75use mz_storage_types::instances::StorageInstanceId;
76use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
77use mz_storage_types::parameters::StorageParameters;
78use mz_storage_types::read_holds::ReadHold;
79use mz_storage_types::read_policy::ReadPolicy;
80use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
81use mz_storage_types::sources::{
82    GenericSourceConnection, IngestionDescription, SourceConnection, SourceData, SourceDesc,
83    SourceExport, SourceExportDataConfig,
84};
85use mz_storage_types::{AlterCompatible, StorageDiff, dyncfgs};
86use mz_txn_wal::metrics::Metrics as TxnMetrics;
87use mz_txn_wal::txn_read::TxnsRead;
88use mz_txn_wal::txns::TxnsHandle;
89use timely::order::{PartialOrder, TotalOrder};
90use timely::progress::Timestamp as TimelyTimestamp;
91use timely::progress::frontier::MutableAntichain;
92use timely::progress::{Antichain, ChangeBatch, Timestamp};
93use tokio::sync::watch::{Sender, channel};
94use tokio::sync::{mpsc, oneshot};
95use tokio::time::MissedTickBehavior;
96use tokio::time::error::Elapsed;
97use tracing::{debug, info, warn};
98
99use crate::collection_mgmt::{
100    AppendOnlyIntrospectionConfig, CollectionManagerKind, DifferentialIntrospectionConfig,
101};
102use crate::instance::{Instance, ReplicaConfig};
103use crate::statistics::StatsState;
104
105mod collection_mgmt;
106mod history;
107mod instance;
108mod persist_handles;
109mod rtr;
110mod statistics;
111
112#[derive(Derivative)]
113#[derivative(Debug)]
114struct PendingOneshotIngestion {
115    /// Callback used to provide results of the ingestion.
116    #[derivative(Debug = "ignore")]
117    result_tx: OneshotResultCallback<ProtoBatch>,
118    /// Cluster currently running this ingestion
119    cluster_id: StorageInstanceId,
120}
121
122impl PendingOneshotIngestion {
123    /// Consume the pending ingestion, responding with a cancelation message.
124    ///
125    /// TODO(cf2): Refine these error messages so they're not stringly typed.
126    pub(crate) fn cancel(self) {
127        (self.result_tx)(vec![Err("canceled".to_string())])
128    }
129}
130
131/// A storage controller for a storage instance.
132#[derive(Derivative)]
133#[derivative(Debug)]
134pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation>
135{
136    /// The build information for this process.
137    build_info: &'static BuildInfo,
138    /// A function that returns the current time.
139    now: NowFn,
140    /// The fencing token for this instance of the controller.
141    envd_epoch: NonZeroI64,
142
143    /// Whether or not this controller is in read-only mode.
144    ///
145    /// When in read-only mode, neither this controller nor the instances
146    /// controlled by it are allowed to affect changes to external systems
147    /// (largely persist).
148    read_only: bool,
149
150    /// Collections maintained by the storage controller.
151    ///
152    /// This collection only grows, although individual collections may be rendered unusable.
153    /// This is to prevent the re-binding of identifiers to other descriptions.
154    pub(crate) collections: BTreeMap<GlobalId, CollectionState<T>>,
155
156    /// Map from IDs of objects that have been dropped to replicas we are still
157    /// expecting DroppedId messages from. This is cleared out once all replicas
158    /// have responded.
159    ///
160    /// We use this only to catch problems in the protocol between controller
161    /// and replicas, for example we can differentiate between late messages for
162    /// objects that have already been dropped and unexpected (read erroneous)
163    /// messages from the replica.
164    dropped_objects: BTreeMap<GlobalId, BTreeSet<ReplicaId>>,
165
166    /// Write handle for table shards.
167    pub(crate) persist_table_worker: persist_handles::PersistTableWriteWorker<T>,
168    /// A shared TxnsCache running in a task and communicated with over a channel.
169    txns_read: TxnsRead<T>,
170    txns_metrics: Arc<TxnMetrics>,
171    stashed_responses: Vec<(Option<ReplicaId>, StorageResponse<T>)>,
172    /// Channel for sending table handle drops.
173    #[derivative(Debug = "ignore")]
174    pending_table_handle_drops_tx: mpsc::UnboundedSender<GlobalId>,
175    /// Channel for receiving table handle drops.
176    #[derivative(Debug = "ignore")]
177    pending_table_handle_drops_rx: mpsc::UnboundedReceiver<GlobalId>,
178    /// Closures that can be used to send responses from oneshot ingestions.
179    #[derivative(Debug = "ignore")]
180    pending_oneshot_ingestions: BTreeMap<uuid::Uuid, PendingOneshotIngestion>,
181
182    /// Interface for managed collections
183    pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
184
185    /// Tracks which collection is responsible for which [`IntrospectionType`].
186    pub(crate) introspection_ids: BTreeMap<IntrospectionType, GlobalId>,
187    /// Tokens for tasks that drive updating introspection collections. Dropping
188    /// this will make sure that any tasks (or other resources) will stop when
189    /// needed.
190    // TODO(aljoscha): Should these live somewhere else?
191    introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
192
193    // The following two fields must always be locked in order.
194    /// Consolidated metrics updates to periodically write. We do not eagerly initialize this,
195    /// and its contents are entirely driven by `StorageResponse::StatisticsUpdates`'s, as well
196    /// as webhook statistics.
197    source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
198    /// Consolidated metrics updates to periodically write. We do not eagerly initialize this,
199    /// and its contents are entirely driven by `StorageResponse::StatisticsUpdates`'s.
200    sink_statistics: Arc<Mutex<BTreeMap<GlobalId, statistics::StatsState<SinkStatisticsUpdate>>>>,
201    /// A way to update the statistics interval in the statistics tasks.
202    statistics_interval_sender: Sender<Duration>,
203
204    /// Clients for all known storage instances.
205    instances: BTreeMap<StorageInstanceId, Instance<T>>,
206    /// Set to `true` once `initialization_complete` has been called.
207    initialized: bool,
208    /// Storage configuration to apply to newly provisioned instances, and use during purification.
209    config: StorageConfiguration,
210    /// The persist location where all storage collections are being written to
211    persist_location: PersistLocation,
212    /// A persist client used to write to storage collections
213    persist: Arc<PersistClientCache>,
214    /// Metrics of the Storage controller
215    metrics: StorageControllerMetrics,
216    /// `(read, write)` frontiers that have been recorded in the `Frontiers` collection, kept to be
217    /// able to retract old rows.
218    recorded_frontiers: BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)>,
219    /// Write frontiers that have been recorded in the `ReplicaFrontiers` collection, kept to be
220    /// able to retract old rows.
221    recorded_replica_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<T>>,
222
223    /// A function that computes the lag between the given time and wallclock time.
224    #[derivative(Debug = "ignore")]
225    wallclock_lag: WallclockLagFn<T>,
226    /// The last time wallclock lag introspection was recorded.
227    wallclock_lag_last_recorded: DateTime<Utc>,
228
229    /// Handle to a [StorageCollections].
230    storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
231    /// Migrated storage collections that can be written even in read only mode.
232    migrated_storage_collections: BTreeSet<GlobalId>,
233
234    /// Ticker for scheduling periodic maintenance work.
235    maintenance_ticker: tokio::time::Interval,
236    /// Whether maintenance work was scheduled.
237    maintenance_scheduled: bool,
238
239    /// Shared transmit channel for replicas to send responses.
240    instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
241    /// Receive end for replica responses.
242    instance_response_rx: mpsc::UnboundedReceiver<(Option<ReplicaId>, StorageResponse<T>)>,
243
244    /// Background task run at startup to warm persist state.
245    persist_warm_task: Option<AbortOnDropHandle<Box<dyn Debug + Send>>>,
246}
247
248/// Warm up persist state for `shard_ids` in a background task.
249///
250/// With better parallelism during startup this would likely be unnecessary, but empirically we see
251/// some nice speedups with this relatively simple function.
252fn warm_persist_state_in_background(
253    client: PersistClient,
254    shard_ids: impl Iterator<Item = ShardId> + Send + 'static,
255) -> mz_ore::task::JoinHandle<Box<dyn Debug + Send>> {
256    /// Bound the number of shards that we warm at a single time, to limit our overall resource use.
257    const MAX_CONCURRENT_WARMS: usize = 16;
258    let logic = async move {
259        let fetchers: Vec<_> = tokio_stream::iter(shard_ids)
260            .map(|shard_id| {
261                let client = client.clone();
262                async move {
263                    client
264                        .create_batch_fetcher::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
265                            shard_id,
266                            Arc::new(RelationDesc::empty()),
267                            Arc::new(UnitSchema),
268                            true,
269                            Diagnostics::from_purpose("warm persist load state"),
270                        )
271                        .await
272                }
273            })
274            .buffer_unordered(MAX_CONCURRENT_WARMS)
275            .collect()
276            .await;
277        let fetchers: Box<dyn Debug + Send> = Box::new(fetchers);
278        fetchers
279    };
280    mz_ore::task::spawn(|| "warm_persist_load_state", logic)
281}
282
283#[async_trait(?Send)]
284impl<T> StorageController for Controller<T>
285where
286    T: Timestamp
287        + Lattice
288        + TotalOrder
289        + Codec64
290        + From<EpochMillis>
291        + TimestampManipulation
292        + Into<Datum<'static>>
293        + Display,
294    StorageCommand<T>: RustType<ProtoStorageCommand>,
295    StorageResponse<T>: RustType<ProtoStorageResponse>,
296{
297    type Timestamp = T;
298
299    fn initialization_complete(&mut self) {
300        self.reconcile_dangling_statistics();
301        self.initialized = true;
302
303        for instance in self.instances.values_mut() {
304            instance.send(StorageCommand::InitializationComplete);
305        }
306    }
307
308    fn update_parameters(&mut self, config_params: StorageParameters) {
309        self.storage_collections
310            .update_parameters(config_params.clone());
311
312        // We serialize the dyncfg updates in StorageParameters, but configure
313        // persist separately.
314        self.persist.cfg().apply_from(&config_params.dyncfg_updates);
315
316        for instance in self.instances.values_mut() {
317            let params = Box::new(config_params.clone());
318            instance.send(StorageCommand::UpdateConfiguration(params));
319        }
320        self.config.update(config_params);
321        self.statistics_interval_sender
322            .send_replace(self.config.parameters.statistics_interval);
323        self.collection_manager.update_user_batch_duration(
324            self.config
325                .parameters
326                .user_storage_managed_collections_batch_duration,
327        );
328    }
329
330    /// Get the current configuration
331    fn config(&self) -> &StorageConfiguration {
332        &self.config
333    }
334
335    fn collection_metadata(
336        &self,
337        id: GlobalId,
338    ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>> {
339        self.storage_collections.collection_metadata(id)
340    }
341
342    fn collection_hydrated(
343        &self,
344        collection_id: GlobalId,
345    ) -> Result<bool, StorageError<Self::Timestamp>> {
346        let collection = self.collection(collection_id)?;
347
348        let instance_id = match &collection.data_source {
349            DataSource::Ingestion(ingestion_description) => ingestion_description.instance_id,
350            DataSource::IngestionExport { ingestion_id, .. } => {
351                let ingestion_state = self.collections.get(ingestion_id).expect("known to exist");
352
353                let instance_id = match &ingestion_state.data_source {
354                    DataSource::Ingestion(ingestion_desc) => ingestion_desc.instance_id,
355                    _ => unreachable!("SourceExport must only refer to primary source"),
356                };
357
358                instance_id
359            }
360            _ => return Ok(true),
361        };
362
363        let instance = self.instances.get(&instance_id).ok_or_else(|| {
364            StorageError::IngestionInstanceMissing {
365                storage_instance_id: instance_id,
366                ingestion_id: collection_id,
367            }
368        })?;
369
370        if instance.replica_ids().next().is_none() {
371            // Ingestions on zero-replica clusters are always considered
372            // hydrated.
373            return Ok(true);
374        }
375
376        match &collection.extra_state {
377            CollectionStateExtra::Ingestion(ingestion_state) => {
378                // An ingestion is hydrated if it is hydrated on at least one replica.
379                Ok(ingestion_state.hydrated_on.len() >= 1)
380            }
381            CollectionStateExtra::Export(_) => {
382                // For now, sinks are always considered hydrated. We rely on
383                // them starting up "instantly" and don't wait for them when
384                // checking hydration status of a replica.  TODO(sinks): base
385                // this off of the sink shard's frontier?
386                Ok(true)
387            }
388            CollectionStateExtra::None => {
389                // For now, objects that are not ingestions are always
390                // considered hydrated. This is tables and webhooks, as of
391                // today.
392                Ok(true)
393            }
394        }
395    }
396
397    #[mz_ore::instrument(level = "debug")]
398    fn collections_hydrated_on_replicas(
399        &self,
400        target_replica_ids: Option<Vec<ReplicaId>>,
401        exclude_collections: &BTreeSet<GlobalId>,
402    ) -> Result<bool, StorageError<Self::Timestamp>> {
403        // If target_replica_ids is provided, use it as the set of target
404        // replicas. Otherwise check for hydration on any replica.
405        let target_replicas: Option<BTreeSet<ReplicaId>> =
406            target_replica_ids.map(|ids| ids.into_iter().collect());
407
408        let mut all_hydrated = true;
409        for (collection_id, collection_state) in self.collections.iter() {
410            if collection_id.is_transient() || exclude_collections.contains(collection_id) {
411                continue;
412            }
413            let hydrated = match &collection_state.extra_state {
414                CollectionStateExtra::Ingestion(state) => {
415                    match &target_replicas {
416                        Some(target_replicas) => !state.hydrated_on.is_disjoint(target_replicas),
417                        None => {
418                            // Not target replicas, so check that it's hydrated
419                            // on at least one replica.
420                            state.hydrated_on.len() >= 1
421                        }
422                    }
423                }
424                CollectionStateExtra::Export(_) => {
425                    // For now, sinks are always considered hydrated. We rely on
426                    // them starting up "instantly" and don't wait for them when
427                    // checking hydration status of a replica.  TODO(sinks):
428                    // base this off of the sink shard's frontier?
429                    true
430                }
431                CollectionStateExtra::None => {
432                    // For now, objects that are not ingestions are always
433                    // considered hydrated. This is tables and webhooks, as of
434                    // today.
435                    true
436                }
437            };
438            if !hydrated {
439                tracing::info!(%collection_id, "collection is not hydrated on any replica");
440                all_hydrated = false;
441                // We continue with our loop instead of breaking out early, so
442                // that we log all non-hydrated replicas.
443            }
444        }
445        Ok(all_hydrated)
446    }
447
448    fn collection_frontiers(
449        &self,
450        id: GlobalId,
451    ) -> Result<
452        (Antichain<Self::Timestamp>, Antichain<Self::Timestamp>),
453        StorageError<Self::Timestamp>,
454    > {
455        let frontiers = self.storage_collections.collection_frontiers(id)?;
456        Ok((frontiers.implied_capability, frontiers.write_frontier))
457    }
458
459    fn collections_frontiers(
460        &self,
461        mut ids: Vec<GlobalId>,
462    ) -> Result<Vec<(GlobalId, Antichain<T>, Antichain<T>)>, StorageError<Self::Timestamp>> {
463        let mut result = vec![];
464        // In theory, we could pull all our frontiers from storage collections...
465        // but in practice those frontiers may not be identical. For historical reasons, we use the
466        // locally-tracked frontier for sinks but the storage-collections-maintained frontier for
467        // sources.
468        ids.retain(|&id| match self.export(id) {
469            Ok(export) => {
470                result.push((
471                    id,
472                    export.input_hold().since().clone(),
473                    export.write_frontier.clone(),
474                ));
475                false
476            }
477            Err(_) => true,
478        });
479        result.extend(
480            self.storage_collections
481                .collections_frontiers(ids)?
482                .into_iter()
483                .map(|frontiers| {
484                    (
485                        frontiers.id,
486                        frontiers.implied_capability,
487                        frontiers.write_frontier,
488                    )
489                }),
490        );
491
492        Ok(result)
493    }
494
495    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
496        self.storage_collections.active_collection_metadatas()
497    }
498
499    fn active_ingestions(
500        &self,
501        instance_id: StorageInstanceId,
502    ) -> Box<dyn Iterator<Item = &GlobalId> + '_> {
503        Box::new(self.instances[&instance_id].active_ingestions())
504    }
505
506    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
507        self.storage_collections.check_exists(id)
508    }
509
510    fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>) {
511        let metrics = self.metrics.for_instance(id);
512        let mut instance = Instance::new(
513            workload_class,
514            self.envd_epoch,
515            metrics,
516            Arc::clone(self.config().config_set()),
517            self.now.clone(),
518            self.instance_response_tx.clone(),
519        );
520        if self.initialized {
521            instance.send(StorageCommand::InitializationComplete);
522        }
523        if !self.read_only {
524            instance.send(StorageCommand::AllowWrites);
525        }
526
527        let params = Box::new(self.config.parameters.clone());
528        instance.send(StorageCommand::UpdateConfiguration(params));
529
530        let old_instance = self.instances.insert(id, instance);
531        assert_none!(old_instance, "storage instance {id} already exists");
532    }
533
534    fn drop_instance(&mut self, id: StorageInstanceId) {
535        let instance = self.instances.remove(&id);
536        assert!(instance.is_some(), "storage instance {id} does not exist");
537    }
538
539    fn update_instance_workload_class(
540        &mut self,
541        id: StorageInstanceId,
542        workload_class: Option<String>,
543    ) {
544        let instance = self
545            .instances
546            .get_mut(&id)
547            .unwrap_or_else(|| panic!("instance {id} does not exist"));
548
549        instance.workload_class = workload_class;
550    }
551
552    fn connect_replica(
553        &mut self,
554        instance_id: StorageInstanceId,
555        replica_id: ReplicaId,
556        location: ClusterReplicaLocation,
557    ) {
558        let instance = self
559            .instances
560            .get_mut(&instance_id)
561            .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
562
563        let config = ReplicaConfig {
564            build_info: self.build_info,
565            location,
566            grpc_client: self.config.parameters.grpc_client.clone(),
567        };
568        instance.add_replica(replica_id, config);
569    }
570
571    fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId) {
572        let instance = self
573            .instances
574            .get_mut(&instance_id)
575            .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
576
577        let status_now = mz_ore::now::to_datetime((self.now)());
578        let mut source_status_updates = vec![];
579        let mut sink_status_updates = vec![];
580
581        let make_update = |id, object_type| StatusUpdate {
582            id,
583            status: Status::Paused,
584            timestamp: status_now,
585            error: None,
586            hints: BTreeSet::from([format!(
587                "The replica running this {object_type} has been dropped"
588            )]),
589            namespaced_errors: Default::default(),
590            replica_id: Some(replica_id),
591        };
592
593        for id in instance.active_ingestions() {
594            if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
595                active_replicas.remove(&replica_id);
596                if active_replicas.is_empty() {
597                    self.dropped_objects.remove(id);
598                }
599            }
600
601            let ingestion = self
602                .collections
603                .get_mut(id)
604                .expect("instance contains unknown ingestion");
605
606            let ingestion_description = match &ingestion.data_source {
607                DataSource::Ingestion(ingestion_description) => ingestion_description.clone(),
608                _ => panic!(
609                    "unexpected data source for ingestion: {:?}",
610                    ingestion.data_source
611                ),
612            };
613
614            // NOTE(aljoscha): We filter out the remap collection because we
615            // don't get any status updates about it from the replica side. So
616            // we don't want to synthesize a 'paused' status here.
617            //
618            // TODO(aljoscha): I think we want to fix this eventually, and make
619            // sure we get status updates for the remap shard as well. Currently
620            // its handling in the source status collection is a bit difficult
621            // because we don't have updates for it in the status history
622            // collection.
623            let subsource_ids = ingestion_description
624                .collection_ids()
625                .filter(|id| id != &ingestion_description.remap_collection_id);
626            for id in subsource_ids {
627                source_status_updates.push(make_update(id, "source"));
628            }
629        }
630
631        for id in instance.active_exports() {
632            if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
633                active_replicas.remove(&replica_id);
634                if active_replicas.is_empty() {
635                    self.dropped_objects.remove(id);
636                }
637            }
638
639            sink_status_updates.push(make_update(*id, "sink"));
640        }
641
642        instance.drop_replica(replica_id);
643
644        if !self.read_only {
645            if !source_status_updates.is_empty() {
646                self.append_status_introspection_updates(
647                    IntrospectionType::SourceStatusHistory,
648                    source_status_updates,
649                );
650            }
651            if !sink_status_updates.is_empty() {
652                self.append_status_introspection_updates(
653                    IntrospectionType::SinkStatusHistory,
654                    sink_status_updates,
655                );
656            }
657        }
658    }
659
660    async fn evolve_nullability_for_bootstrap(
661        &mut self,
662        storage_metadata: &StorageMetadata,
663        collections: Vec<(GlobalId, RelationDesc)>,
664    ) -> Result<(), StorageError<Self::Timestamp>> {
665        let persist_client = self
666            .persist
667            .open(self.persist_location.clone())
668            .await
669            .unwrap();
670
671        for (global_id, relation_desc) in collections {
672            let shard_id = storage_metadata.get_collection_shard(global_id)?;
673            let diagnostics = Diagnostics {
674                shard_name: global_id.to_string(),
675                handle_purpose: "evolve nullability for bootstrap".to_string(),
676            };
677            let latest_schema = persist_client
678                .latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
679                .await
680                .expect("invalid persist usage");
681            let Some((schema_id, current_schema, _)) = latest_schema else {
682                tracing::debug!(?global_id, "no schema registered");
683                continue;
684            };
685            tracing::debug!(?global_id, ?current_schema, new_schema = ?relation_desc, "migrating schema");
686
687            let diagnostics = Diagnostics {
688                shard_name: global_id.to_string(),
689                handle_purpose: "evolve nullability for bootstrap".to_string(),
690            };
691            let evolve_result = persist_client
692                .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
693                    shard_id,
694                    schema_id,
695                    &relation_desc,
696                    &UnitSchema,
697                    diagnostics,
698                )
699                .await
700                .expect("invalid persist usage");
701            match evolve_result {
702                CaESchema::Ok(_) => (),
703                CaESchema::ExpectedMismatch {
704                    schema_id,
705                    key,
706                    val: _,
707                } => {
708                    return Err(StorageError::PersistSchemaEvolveRace {
709                        global_id,
710                        shard_id,
711                        schema_id,
712                        relation_desc: key,
713                    });
714                }
715                CaESchema::Incompatible => {
716                    return Err(StorageError::PersistInvalidSchemaEvolve {
717                        global_id,
718                        shard_id,
719                    });
720                }
721            };
722        }
723
724        Ok(())
725    }
726
727    /// Create and "execute" the described collection.
728    ///
729    /// "Execute" is in scare quotes because what executing a collection means
730    /// varies widely based on the type of collection you're creating.
731    ///
732    /// The general process creating a collection undergoes is:
733    /// 1. Enrich the description we get from the user with the metadata only
734    ///    the storage controller's metadata. This is mostly a matter of
735    ///    separating concerns.
736    /// 2. Generate write and read persist handles for the collection.
737    /// 3. Store the collection's metadata in the appropriate field.
738    /// 4. "Execute" the collection. What that means is contingent on the type of
739    ///    collection. so consult the code for more details.
740    ///
741    // TODO(aljoscha): It would be swell if we could refactor this Leviathan of
742    // a method/move individual parts to their own methods. @guswynn observes
743    // that a number of these operations could be moved into fns on
744    // `DataSource`.
745    #[instrument(name = "storage::create_collections")]
746    async fn create_collections_for_bootstrap(
747        &mut self,
748        storage_metadata: &StorageMetadata,
749        register_ts: Option<Self::Timestamp>,
750        mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
751        migrated_storage_collections: &BTreeSet<GlobalId>,
752    ) -> Result<(), StorageError<Self::Timestamp>> {
753        self.migrated_storage_collections
754            .extend(migrated_storage_collections.iter().cloned());
755
756        self.storage_collections
757            .create_collections_for_bootstrap(
758                storage_metadata,
759                register_ts.clone(),
760                collections.clone(),
761                migrated_storage_collections,
762            )
763            .await?;
764
765        // At this point we're connected to all the collection shards in persist. Our warming task
766        // is no longer useful, so abort it if it's still running.
767        drop(self.persist_warm_task.take());
768
769        // Validate first, to avoid corrupting state.
770        // 1. create a dropped identifier, or
771        // 2. create an existing identifier with a new description.
772        // Make sure to check for errors within `ingestions` as well.
773        collections.sort_by_key(|(id, _)| *id);
774        collections.dedup();
775        for pos in 1..collections.len() {
776            if collections[pos - 1].0 == collections[pos].0 {
777                return Err(StorageError::CollectionIdReused(collections[pos].0));
778            }
779        }
780
781        // We first enrich each collection description with some additional metadata...
782        let enriched_with_metadata = collections
783            .into_iter()
784            .map(|(id, description)| {
785                let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
786
787                let get_shard = |id| -> Result<ShardId, StorageError<T>> {
788                    let shard = storage_metadata.get_collection_shard::<T>(id)?;
789                    Ok(shard)
790                };
791
792                let remap_shard = match &description.data_source {
793                    // Only ingestions can have remap shards.
794                    DataSource::Ingestion(IngestionDescription {
795                        remap_collection_id,
796                        ..
797                    }) => {
798                        // Iff ingestion has a remap collection, its metadata must
799                        // exist (and be correct) by this point.
800                        Some(get_shard(*remap_collection_id)?)
801                    }
802                    _ => None,
803                };
804
805                // If the shard is being managed by txn-wal (initially, tables), then we need to
806                // pass along the shard id for the txns shard to dataflow rendering.
807                let txns_shard = description
808                    .data_source
809                    .in_txns()
810                    .then(|| *self.txns_read.txns_id());
811
812                let metadata = CollectionMetadata {
813                    persist_location: self.persist_location.clone(),
814                    remap_shard,
815                    data_shard,
816                    relation_desc: description.desc.clone(),
817                    txns_shard,
818                };
819
820                Ok((id, description, metadata))
821            })
822            .collect_vec();
823
824        // So that we can open persist handles for each collections concurrently.
825        let persist_client = self
826            .persist
827            .open(self.persist_location.clone())
828            .await
829            .unwrap();
830        let persist_client = &persist_client;
831
832        // Reborrow the `&mut self` as immutable, as all the concurrent work to be processed in
833        // this stream cannot all have exclusive access.
834        use futures::stream::{StreamExt, TryStreamExt};
835        let this = &*self;
836        let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
837            .map(|data: Result<_, StorageError<Self::Timestamp>>| {
838                async move {
839                    let (id, description, metadata) = data?;
840
841                    // should be replaced with real introspection (https://github.com/MaterializeInc/database-issues/issues/4078)
842                    // but for now, it's helpful to have this mapping written down somewhere
843                    debug!(
844                        "mapping GlobalId={} to remap shard ({:?}), data shard ({})",
845                        id, metadata.remap_shard, metadata.data_shard
846                    );
847
848                    let write = this
849                        .open_data_handles(
850                            &id,
851                            metadata.data_shard,
852                            metadata.relation_desc.clone(),
853                            persist_client,
854                        )
855                        .await;
856
857                    Ok::<_, StorageError<T>>((id, description, write, metadata))
858                }
859            })
860            // Poll each future for each collection concurrently, maximum of 50 at a time.
861            .buffer_unordered(50)
862            // HERE BE DRAGONS:
863            //
864            // There are at least 2 subtleties in using `FuturesUnordered` (which
865            // `buffer_unordered` uses underneath:
866            // - One is captured here <https://github.com/rust-lang/futures-rs/issues/2387>
867            // - And the other is deadlocking if processing an OUTPUT of a `FuturesUnordered`
868            // stream attempts to obtain an async mutex that is also obtained in the futures
869            // being polled.
870            //
871            // Both of these could potentially be issues in all usages of `buffer_unordered` in
872            // this method, so we stick the standard advice: only use `try_collect` or
873            // `collect`!
874            .try_collect()
875            .await?;
876
877        // The set of collections that we should render at the end of this
878        // function.
879        let mut to_execute = BTreeSet::new();
880        // New collections that are being created; this is distinct from the set
881        // of collections we plan to execute because
882        // `DataSource::IngestionExport` is added as a new collection, but is
883        // not executed directly.
884        let mut new_collections = BTreeSet::new();
885        let mut table_registers = Vec::with_capacity(to_register.len());
886
887        // Reorder in dependency order.
888        to_register.sort_by_key(|(id, ..)| *id);
889
890        // Register tables first, but register them in reverse order since earlier tables
891        // can depend on later tables.
892        //
893        // Note: We could do more complex sorting to avoid the allocations, but IMO it's
894        // easier to reason about it this way.
895        let (tables_to_register, collections_to_register): (Vec<_>, Vec<_>) = to_register
896            .into_iter()
897            .partition(|(_id, desc, ..)| matches!(desc.data_source, DataSource::Table { .. }));
898        let to_register = tables_to_register
899            .into_iter()
900            .rev()
901            .chain(collections_to_register.into_iter());
902
903        // Statistics need a level of indirection so we can mutably borrow
904        // `self` when registering collections and when we are inserting
905        // statistics.
906        let mut new_source_statistic_entries = BTreeSet::new();
907        let mut new_webhook_statistic_entries = BTreeSet::new();
908        let mut new_sink_statistic_entries = BTreeSet::new();
909
910        for (id, description, write, metadata) in to_register {
911            let is_in_txns = |id, metadata: &CollectionMetadata| {
912                metadata.txns_shard.is_some()
913                    && !(self.read_only && migrated_storage_collections.contains(&id))
914            };
915
916            let mut data_source = description.data_source;
917
918            to_execute.insert(id);
919            new_collections.insert(id);
920
921            // Ensure that the ingestion has an export for its primary source if applicable.
922            // This is done in an awkward spot to appease the borrow checker.
923            // TODO(database-issues#8620): This will be removed once sources no longer export
924            // to primary collections and only export to explicit SourceExports (tables).
925            if let DataSource::Ingestion(ingestion) = &mut data_source {
926                let export = ingestion.desc.primary_source_export();
927                ingestion.source_exports.insert(id, export);
928            }
929
930            let write_frontier = write.upper();
931
932            // Determine if this collection has another dependency.
933            let storage_dependencies = self.determine_collection_dependencies(id, &data_source)?;
934
935            let dependency_read_holds = self
936                .storage_collections
937                .acquire_read_holds(storage_dependencies)
938                .expect("can acquire read holds");
939
940            let mut dependency_since = Antichain::from_elem(T::minimum());
941            for read_hold in dependency_read_holds.iter() {
942                dependency_since.join_assign(read_hold.since());
943            }
944
945            // Assert some invariants.
946            //
947            // TODO(alter_table): Include Tables (is_in_txns) in this check. After
948            // supporting ALTER TABLE, it's now possible for a table to have a dependency
949            // and thus run this check. But tables are managed by txn-wal and thus the
950            // upper of the shard's `write_handle` generally isn't the logical upper of
951            // the shard. Instead we need to thread through the upper of the `txn` shard
952            // here so we can check this invariant.
953            if !dependency_read_holds.is_empty()
954                && !is_in_txns(id, &metadata)
955                && !matches!(&data_source, DataSource::Sink { .. })
956            {
957                // As the halt message says, this can happen when trying to come
958                // up in read-only mode and the current read-write
959                // environmentd/controller deletes a collection. We exit
960                // gracefully, which means we'll get restarted and get to try
961                // again.
962                if dependency_since.is_empty() {
963                    halt!(
964                        "dependency since frontier is empty while dependent upper \
965                        is not empty (dependent id={id}, write_frontier={:?}, dependency_read_holds={:?}), \
966                        this indicates concurrent deletion of a collection",
967                        write_frontier,
968                        dependency_read_holds,
969                    );
970                }
971
972                // The dependency since cannot be beyond the dependent (our)
973                // upper unless the collection is new. In practice, the
974                // depdenency is the remap shard of a source (export), and if
975                // the since is allowed to "catch up" to the upper, that is
976                // `upper <= since`, a restarting ingestion cannot differentiate
977                // between updates that have already been written out to the
978                // backing persist shard and updates that have yet to be
979                // written. We would write duplicate updates.
980                //
981                // If this check fails, it means that the read hold installed on
982                // the dependency was probably not upheld –– if it were, the
983                // dependency's since could not have advanced as far the
984                // dependent's upper.
985                //
986                // We don't care about the dependency since when the write
987                // frontier is empty. In that case, no-one can write down any
988                // more updates.
989                mz_ore::soft_assert_or_log!(
990                    write_frontier.elements() == &[T::minimum()]
991                        || write_frontier.is_empty()
992                        || PartialOrder::less_than(&dependency_since, write_frontier),
993                    "dependency since has advanced past dependent ({id}) upper \n
994                            dependent ({id}): upper {:?} \n
995                            dependency since {:?} \n
996                            dependency read holds: {:?}",
997                    write_frontier,
998                    dependency_since,
999                    dependency_read_holds,
1000                );
1001            }
1002
1003            // Perform data source-specific setup.
1004            let mut extra_state = CollectionStateExtra::None;
1005            let mut maybe_instance_id = None;
1006            match &data_source {
1007                DataSource::Introspection(typ) => {
1008                    debug!(
1009                        ?data_source, meta = ?metadata,
1010                        "registering {id} with persist monotonic worker",
1011                    );
1012                    // We always register the collection with the collection manager,
1013                    // regardless of read-only mode. The CollectionManager itself is
1014                    // aware of read-only mode and will not attempt to write before told
1015                    // to do so.
1016                    //
1017                    self.register_introspection_collection(
1018                        id,
1019                        *typ,
1020                        write,
1021                        persist_client.clone(),
1022                    )?;
1023                }
1024                DataSource::Webhook => {
1025                    debug!(
1026                        ?data_source, meta = ?metadata,
1027                        "registering {id} with persist monotonic worker",
1028                    );
1029                    new_source_statistic_entries.insert(id);
1030                    // This collection of statistics is periodically aggregated into
1031                    // `source_statistics`.
1032                    new_webhook_statistic_entries.insert(id);
1033                    // Register the collection so our manager knows about it.
1034                    //
1035                    // NOTE: Maybe this shouldn't be in the collection manager,
1036                    // and collection manager should only be responsible for
1037                    // built-in introspection collections?
1038                    self.collection_manager
1039                        .register_append_only_collection(id, write, false, None);
1040                }
1041                DataSource::IngestionExport {
1042                    ingestion_id,
1043                    details,
1044                    data_config,
1045                } => {
1046                    debug!(
1047                        ?data_source, meta = ?metadata,
1048                        "not registering {id} with a controller persist worker",
1049                    );
1050                    // Adjust the source to contain this export.
1051                    let ingestion_state = self
1052                        .collections
1053                        .get_mut(ingestion_id)
1054                        .expect("known to exist");
1055
1056                    let instance_id = match &mut ingestion_state.data_source {
1057                        DataSource::Ingestion(ingestion_desc) => {
1058                            ingestion_desc.source_exports.insert(
1059                                id,
1060                                SourceExport {
1061                                    storage_metadata: (),
1062                                    details: details.clone(),
1063                                    data_config: data_config.clone(),
1064                                },
1065                            );
1066
1067                            // Record the ingestion's cluster ID for the
1068                            // ingestion export. This way we always have a
1069                            // record of it, even if the ingestion's collection
1070                            // description disappears.
1071                            ingestion_desc.instance_id
1072                        }
1073                        _ => unreachable!(
1074                            "SourceExport must only refer to primary sources that already exist"
1075                        ),
1076                    };
1077
1078                    // Executing the source export doesn't do anything, ensure we execute the source instead.
1079                    to_execute.remove(&id);
1080                    to_execute.insert(*ingestion_id);
1081
1082                    let ingestion_state = IngestionState {
1083                        read_capabilities: MutableAntichain::from(dependency_since.clone()),
1084                        dependency_read_holds,
1085                        derived_since: dependency_since,
1086                        write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1087                        hold_policy: ReadPolicy::step_back(),
1088                        instance_id,
1089                        hydrated_on: BTreeSet::new(),
1090                    };
1091
1092                    extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1093                    maybe_instance_id = Some(instance_id);
1094
1095                    new_source_statistic_entries.insert(id);
1096                }
1097                DataSource::Table { .. } => {
1098                    debug!(
1099                        ?data_source, meta = ?metadata,
1100                        "registering {id} with persist table worker",
1101                    );
1102                    table_registers.push((id, write));
1103                }
1104                DataSource::Progress | DataSource::Other => {
1105                    debug!(
1106                        ?data_source, meta = ?metadata,
1107                        "not registering {id} with a controller persist worker",
1108                    );
1109                }
1110                DataSource::Ingestion(ingestion_desc) => {
1111                    debug!(
1112                        ?data_source, meta = ?metadata,
1113                        "not registering {id} with a controller persist worker",
1114                    );
1115
1116                    let mut dependency_since = Antichain::from_elem(T::minimum());
1117                    for read_hold in dependency_read_holds.iter() {
1118                        dependency_since.join_assign(read_hold.since());
1119                    }
1120
1121                    let ingestion_state = IngestionState {
1122                        read_capabilities: MutableAntichain::from(dependency_since.clone()),
1123                        dependency_read_holds,
1124                        derived_since: dependency_since,
1125                        write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1126                        hold_policy: ReadPolicy::step_back(),
1127                        instance_id: ingestion_desc.instance_id,
1128                        hydrated_on: BTreeSet::new(),
1129                    };
1130
1131                    extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1132                    maybe_instance_id = Some(ingestion_desc.instance_id);
1133
1134                    new_source_statistic_entries.insert(id);
1135                }
1136                DataSource::Sink { desc } => {
1137                    let mut dependency_since = Antichain::from_elem(T::minimum());
1138                    for read_hold in dependency_read_holds.iter() {
1139                        dependency_since.join_assign(read_hold.since());
1140                    }
1141
1142                    let [self_hold, read_hold] =
1143                        dependency_read_holds.try_into().expect("two holds");
1144
1145                    let state = ExportState::new(
1146                        desc.instance_id,
1147                        read_hold,
1148                        self_hold,
1149                        write_frontier.clone(),
1150                        ReadPolicy::step_back(),
1151                    );
1152                    maybe_instance_id = Some(state.cluster_id);
1153                    extra_state = CollectionStateExtra::Export(state);
1154
1155                    new_sink_statistic_entries.insert(id);
1156                }
1157            }
1158
1159            let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(id, maybe_instance_id);
1160            let collection_state =
1161                CollectionState::new(data_source, metadata, extra_state, wallclock_lag_metrics);
1162
1163            self.collections.insert(id, collection_state);
1164        }
1165
1166        {
1167            // Ensure all sources are associated with the statistics.
1168            //
1169            // We currently do not call `create_collections` after we have initialized the source
1170            // statistics scrapers, but in the interest of safety, avoid overriding existing
1171            // statistics values.
1172            let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1173            let mut sink_statistics = self.sink_statistics.lock().expect("poisoned");
1174
1175            for id in new_source_statistic_entries {
1176                source_statistics
1177                    .source_statistics
1178                    .entry(id)
1179                    .or_insert_with(|| StatsState::new(SourceStatisticsUpdate::new(id)));
1180            }
1181            for id in new_webhook_statistic_entries {
1182                source_statistics.webhook_statistics.entry(id).or_default();
1183            }
1184            for id in new_sink_statistic_entries {
1185                sink_statistics
1186                    .entry(id)
1187                    .or_insert_with(|| StatsState::new(SinkStatisticsUpdate::new(id)));
1188            }
1189        }
1190
1191        // Register the tables all in one batch.
1192        if !table_registers.is_empty() {
1193            let register_ts = register_ts
1194                .expect("caller should have provided a register_ts when creating a table");
1195
1196            if self.read_only {
1197                // In read-only mode, we use a special read-only table worker
1198                // that allows writing to migrated tables and will continually
1199                // bump their shard upper so that it tracks the txn shard upper.
1200                // We do this, so that they remain readable at a recent
1201                // timestamp, which in turn allows dataflows that depend on them
1202                // to (re-)hydrate.
1203                //
1204                // We only want to register migrated tables, though, and leave
1205                // existing tables out/never write to them in read-only mode.
1206                table_registers
1207                    .retain(|(id, _write_handle)| migrated_storage_collections.contains(id));
1208
1209                self.persist_table_worker
1210                    .register(register_ts, table_registers)
1211                    .await
1212                    .expect("table worker unexpectedly shut down");
1213            } else {
1214                self.persist_table_worker
1215                    .register(register_ts, table_registers)
1216                    .await
1217                    .expect("table worker unexpectedly shut down");
1218            }
1219        }
1220
1221        self.append_shard_mappings(new_collections.into_iter(), Diff::ONE);
1222
1223        // TODO(guswynn): perform the io in this final section concurrently.
1224        for id in to_execute {
1225            match &self.collection(id)?.data_source {
1226                DataSource::Ingestion(ingestion) => {
1227                    if !self.read_only
1228                        || (ENABLE_0DT_DEPLOYMENT_SOURCES.get(self.config.config_set())
1229                            && ingestion.desc.connection.supports_read_only())
1230                    {
1231                        self.run_ingestion(id)?;
1232                    }
1233                }
1234                DataSource::IngestionExport { .. } => unreachable!(
1235                    "ingestion exports do not execute directly, but instead schedule their source to be re-executed"
1236                ),
1237                DataSource::Introspection(_)
1238                | DataSource::Webhook
1239                | DataSource::Table { .. }
1240                | DataSource::Progress
1241                | DataSource::Other => {}
1242                DataSource::Sink { .. } => {
1243                    if !self.read_only {
1244                        self.run_export(id)?;
1245                    }
1246                }
1247            };
1248        }
1249
1250        Ok(())
1251    }
1252
1253    fn check_alter_ingestion_source_desc(
1254        &mut self,
1255        ingestion_id: GlobalId,
1256        source_desc: &SourceDesc,
1257    ) -> Result<(), StorageError<Self::Timestamp>> {
1258        let source_collection = self.collection(ingestion_id)?;
1259        let data_source = &source_collection.data_source;
1260        match &data_source {
1261            DataSource::Ingestion(cur_ingestion) => {
1262                cur_ingestion
1263                    .desc
1264                    .alter_compatible(ingestion_id, source_desc)?;
1265            }
1266            o => {
1267                tracing::info!(
1268                    "{ingestion_id} inalterable because its data source is {:?} and not an ingestion",
1269                    o
1270                );
1271                Err(AlterError { id: ingestion_id })?
1272            }
1273        }
1274
1275        Ok(())
1276    }
1277
1278    async fn alter_ingestion_source_desc(
1279        &mut self,
1280        ingestion_id: GlobalId,
1281        source_desc: SourceDesc,
1282    ) -> Result<(), StorageError<Self::Timestamp>> {
1283        self.check_alter_ingestion_source_desc(ingestion_id, &source_desc)?;
1284
1285        // Update the `SourceDesc` and the source exports
1286        // simultaneously.
1287        let collection = self
1288            .collections
1289            .get_mut(&ingestion_id)
1290            .expect("validated exists");
1291        let curr_ingestion = match &mut collection.data_source {
1292            DataSource::Ingestion(curr_ingestion) => curr_ingestion,
1293            _ => unreachable!("verified collection refers to ingestion"),
1294        };
1295
1296        curr_ingestion.desc = source_desc;
1297        tracing::debug!("altered {ingestion_id}'s SourceDesc");
1298
1299        // n.b. we do not re-run updated ingestions because updating the source
1300        // desc is only done in preparation for adding subsources, which will
1301        // then run the ingestion.
1302        //
1303        // If this expectation ever changes, we will almost certainly know
1304        // because failing to run an altered ingestion means that whatever
1305        // changes you expect to occur will not be reflected in the running
1306        // dataflow.
1307
1308        Ok(())
1309    }
1310
1311    async fn alter_ingestion_connections(
1312        &mut self,
1313        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
1314    ) -> Result<(), StorageError<Self::Timestamp>> {
1315        // Also have to let StorageCollections know!
1316        self.storage_collections
1317            .alter_ingestion_connections(source_connections.clone())
1318            .await?;
1319
1320        let mut ingestions_to_run = BTreeSet::new();
1321
1322        for (id, conn) in source_connections {
1323            let collection = self
1324                .collections
1325                .get_mut(&id)
1326                .ok_or_else(|| StorageError::IdentifierMissing(id))?;
1327
1328            match &mut collection.data_source {
1329                DataSource::Ingestion(ingestion) => {
1330                    // If the connection hasn't changed, there's no sense in
1331                    // re-rendering the dataflow.
1332                    if ingestion.desc.connection != conn {
1333                        tracing::info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
1334                        ingestion.desc.connection = conn;
1335                        ingestions_to_run.insert(id);
1336                    } else {
1337                        tracing::warn!(
1338                            "update_source_connection called on {id} but the \
1339                            connection was the same"
1340                        );
1341                    }
1342                }
1343                o => {
1344                    tracing::warn!("update_source_connection called on {:?}", o);
1345                    Err(StorageError::IdentifierInvalid(id))?;
1346                }
1347            }
1348        }
1349
1350        for id in ingestions_to_run {
1351            self.run_ingestion(id)?;
1352        }
1353        Ok(())
1354    }
1355
1356    async fn alter_ingestion_export_data_configs(
1357        &mut self,
1358        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
1359    ) -> Result<(), StorageError<Self::Timestamp>> {
1360        // Also have to let StorageCollections know!
1361        self.storage_collections
1362            .alter_ingestion_export_data_configs(source_exports.clone())
1363            .await?;
1364
1365        let mut ingestions_to_run = BTreeSet::new();
1366
1367        for (source_export_id, new_data_config) in source_exports {
1368            // We need to adjust the data config on the CollectionState for
1369            // the source export collection directly
1370            let source_export_collection = self
1371                .collections
1372                .get_mut(&source_export_id)
1373                .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1374            let ingestion_id = match &mut source_export_collection.data_source {
1375                DataSource::IngestionExport {
1376                    ingestion_id,
1377                    details: _,
1378                    data_config,
1379                } => {
1380                    *data_config = new_data_config.clone();
1381                    *ingestion_id
1382                }
1383                o => {
1384                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1385                    Err(StorageError::IdentifierInvalid(source_export_id))?
1386                }
1387            };
1388            // We also need to adjust the data config on the CollectionState of the
1389            // Ingestion that the export is associated with.
1390            let ingestion_collection = self
1391                .collections
1392                .get_mut(&ingestion_id)
1393                .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
1394
1395            match &mut ingestion_collection.data_source {
1396                DataSource::Ingestion(ingestion_desc) => {
1397                    let source_export = ingestion_desc
1398                        .source_exports
1399                        .get_mut(&source_export_id)
1400                        .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1401
1402                    // If the data config hasn't changed, there's no sense in
1403                    // re-rendering the dataflow.
1404                    if source_export.data_config != new_data_config {
1405                        tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
1406                        source_export.data_config = new_data_config;
1407
1408                        ingestions_to_run.insert(ingestion_id);
1409                    } else {
1410                        tracing::warn!(
1411                            "alter_ingestion_export_data_configs called on \
1412                                    export {source_export_id} of {ingestion_id} but \
1413                                    the data config was the same"
1414                        );
1415                    }
1416                }
1417                o => {
1418                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1419                    Err(StorageError::IdentifierInvalid(ingestion_id))?
1420                }
1421            }
1422        }
1423
1424        for id in ingestions_to_run {
1425            self.run_ingestion(id)?;
1426        }
1427        Ok(())
1428    }
1429
1430    async fn alter_table_desc(
1431        &mut self,
1432        existing_collection: GlobalId,
1433        new_collection: GlobalId,
1434        new_desc: RelationDesc,
1435        expected_version: RelationVersion,
1436        register_ts: Self::Timestamp,
1437    ) -> Result<(), StorageError<Self::Timestamp>> {
1438        let data_shard = {
1439            let Controller {
1440                collections,
1441                storage_collections,
1442                ..
1443            } = self;
1444
1445            let existing = collections
1446                .get(&existing_collection)
1447                .ok_or(StorageError::IdentifierMissing(existing_collection))?;
1448            if !matches!(existing.data_source, DataSource::Table { .. }) {
1449                return Err(StorageError::IdentifierInvalid(existing_collection));
1450            }
1451
1452            // Let StorageCollections know!
1453            storage_collections
1454                .alter_table_desc(
1455                    existing_collection,
1456                    new_collection,
1457                    new_desc.clone(),
1458                    expected_version,
1459                )
1460                .await?;
1461
1462            existing.collection_metadata.data_shard.clone()
1463        };
1464
1465        let persist_client = self
1466            .persist
1467            .open(self.persist_location.clone())
1468            .await
1469            .expect("invalid persist location");
1470        let write_handle = self
1471            .open_data_handles(
1472                &existing_collection,
1473                data_shard,
1474                new_desc.clone(),
1475                &persist_client,
1476            )
1477            .await;
1478
1479        // Note: The new collection is now the "primary collection" so we specify `None` here.
1480        let collection_desc = CollectionDescription::<T>::for_table(new_desc.clone(), None);
1481        let collection_meta = CollectionMetadata {
1482            persist_location: self.persist_location.clone(),
1483            data_shard,
1484            relation_desc: new_desc.clone(),
1485            // TODO(alter_table): Support schema evolution on sources.
1486            remap_shard: None,
1487            txns_shard: Some(self.txns_read.txns_id().clone()),
1488        };
1489        // TODO(alter_table): Support schema evolution on sources.
1490        let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(new_collection, None);
1491        let collection_state = CollectionState::new(
1492            collection_desc.data_source.clone(),
1493            collection_meta,
1494            CollectionStateExtra::None,
1495            wallclock_lag_metrics,
1496        );
1497
1498        // Great! We have successfully evolved the schema of our Table, now we need to update our
1499        // in-memory data structures.
1500        self.collections.insert(new_collection, collection_state);
1501        let existing = self
1502            .collections
1503            .get_mut(&existing_collection)
1504            .expect("missing existing collection");
1505        assert!(matches!(
1506            existing.data_source,
1507            DataSource::Table { primary: None }
1508        ));
1509        existing.data_source = DataSource::Table {
1510            primary: Some(new_collection),
1511        };
1512
1513        self.persist_table_worker
1514            .register(register_ts, vec![(new_collection, write_handle)])
1515            .await
1516            .expect("table worker unexpectedly shut down");
1517
1518        Ok(())
1519    }
1520
1521    fn export(
1522        &self,
1523        id: GlobalId,
1524    ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1525        self.collections
1526            .get(&id)
1527            .and_then(|c| match &c.extra_state {
1528                CollectionStateExtra::Export(state) => Some(state),
1529                _ => None,
1530            })
1531            .ok_or(StorageError::IdentifierMissing(id))
1532    }
1533
1534    fn export_mut(
1535        &mut self,
1536        id: GlobalId,
1537    ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1538        self.collections
1539            .get_mut(&id)
1540            .and_then(|c| match &mut c.extra_state {
1541                CollectionStateExtra::Export(state) => Some(state),
1542                _ => None,
1543            })
1544            .ok_or(StorageError::IdentifierMissing(id))
1545    }
1546
1547    /// Create a oneshot ingestion.
1548    async fn create_oneshot_ingestion(
1549        &mut self,
1550        ingestion_id: uuid::Uuid,
1551        collection_id: GlobalId,
1552        instance_id: StorageInstanceId,
1553        request: OneshotIngestionRequest,
1554        result_tx: OneshotResultCallback<ProtoBatch>,
1555    ) -> Result<(), StorageError<Self::Timestamp>> {
1556        let collection_meta = self
1557            .collections
1558            .get(&collection_id)
1559            .ok_or_else(|| StorageError::IdentifierMissing(collection_id))?
1560            .collection_metadata
1561            .clone();
1562        let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1563            // TODO(cf2): Refine this error.
1564            StorageError::Generic(anyhow::anyhow!("missing cluster {instance_id}"))
1565        })?;
1566        let oneshot_cmd = RunOneshotIngestion {
1567            ingestion_id,
1568            collection_id,
1569            collection_meta,
1570            request,
1571        };
1572
1573        if !self.read_only {
1574            instance.send(StorageCommand::RunOneshotIngestion(Box::new(oneshot_cmd)));
1575            let pending = PendingOneshotIngestion {
1576                result_tx,
1577                cluster_id: instance_id,
1578            };
1579            let novel = self
1580                .pending_oneshot_ingestions
1581                .insert(ingestion_id, pending);
1582            assert_none!(novel);
1583            Ok(())
1584        } else {
1585            Err(StorageError::ReadOnly)
1586        }
1587    }
1588
1589    fn cancel_oneshot_ingestion(
1590        &mut self,
1591        ingestion_id: uuid::Uuid,
1592    ) -> Result<(), StorageError<Self::Timestamp>> {
1593        if self.read_only {
1594            return Err(StorageError::ReadOnly);
1595        }
1596
1597        let pending = self
1598            .pending_oneshot_ingestions
1599            .remove(&ingestion_id)
1600            .ok_or_else(|| {
1601                // TODO(cf2): Refine this error.
1602                StorageError::Generic(anyhow::anyhow!("missing oneshot ingestion {ingestion_id}"))
1603            })?;
1604
1605        match self.instances.get_mut(&pending.cluster_id) {
1606            Some(instance) => {
1607                instance.send(StorageCommand::CancelOneshotIngestion(ingestion_id));
1608            }
1609            None => {
1610                mz_ore::soft_panic_or_log!(
1611                    "canceling oneshot ingestion on non-existent cluster, ingestion {:?}, instance {}",
1612                    ingestion_id,
1613                    pending.cluster_id,
1614                );
1615            }
1616        }
1617        // Respond to the user that the request has been canceled.
1618        pending.cancel();
1619
1620        Ok(())
1621    }
1622
1623    async fn alter_export(
1624        &mut self,
1625        id: GlobalId,
1626        new_description: ExportDescription<Self::Timestamp>,
1627    ) -> Result<(), StorageError<Self::Timestamp>> {
1628        let from_id = new_description.sink.from;
1629
1630        // Acquire read holds at StorageCollections to ensure that the
1631        // sinked collection is not dropped while we're sinking it.
1632        let desired_read_holds = vec![from_id.clone(), id.clone()];
1633        let [input_hold, self_hold] = self
1634            .storage_collections
1635            .acquire_read_holds(desired_read_holds)
1636            .expect("missing dependency")
1637            .try_into()
1638            .expect("expected number of holds");
1639        let from_storage_metadata = self.storage_collections.collection_metadata(from_id)?;
1640        let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1641
1642        // Check whether the sink's write frontier is beyond the read hold we got
1643        let cur_export = self.export_mut(id)?;
1644        let input_readable = cur_export
1645            .write_frontier
1646            .iter()
1647            .all(|t| input_hold.since().less_than(t));
1648        if !input_readable {
1649            return Err(StorageError::ReadBeforeSince(from_id));
1650        }
1651
1652        let new_export = ExportState {
1653            read_capabilities: cur_export.read_capabilities.clone(),
1654            cluster_id: new_description.instance_id,
1655            derived_since: cur_export.derived_since.clone(),
1656            read_holds: [input_hold, self_hold],
1657            read_policy: cur_export.read_policy.clone(),
1658            write_frontier: cur_export.write_frontier.clone(),
1659        };
1660        *cur_export = new_export;
1661
1662        let cmd = RunSinkCommand {
1663            id,
1664            description: StorageSinkDesc {
1665                from: from_id,
1666                from_desc: new_description.sink.from_desc,
1667                connection: new_description.sink.connection,
1668                envelope: new_description.sink.envelope,
1669                as_of: new_description.sink.as_of,
1670                version: new_description.sink.version,
1671                from_storage_metadata,
1672                with_snapshot: new_description.sink.with_snapshot,
1673                to_storage_metadata,
1674            },
1675        };
1676
1677        // Fetch the client for this export's cluster.
1678        let instance = self
1679            .instances
1680            .get_mut(&new_description.instance_id)
1681            .ok_or_else(|| StorageError::ExportInstanceMissing {
1682                storage_instance_id: new_description.instance_id,
1683                export_id: id,
1684            })?;
1685
1686        instance.send(StorageCommand::RunSink(Box::new(cmd)));
1687        Ok(())
1688    }
1689
1690    /// Create the sinks described by the `ExportDescription`.
1691    async fn alter_export_connections(
1692        &mut self,
1693        exports: BTreeMap<GlobalId, StorageSinkConnection>,
1694    ) -> Result<(), StorageError<Self::Timestamp>> {
1695        let mut updates_by_instance =
1696            BTreeMap::<StorageInstanceId, Vec<(RunSinkCommand<T>, ExportDescription<T>)>>::new();
1697
1698        for (id, connection) in exports {
1699            // We stage changes in new_export_description and then apply all
1700            // updates to exports at the end.
1701            //
1702            // We don't just go ahead and clone the `ExportState` itself and
1703            // update that because `ExportState` is not clone, because it holds
1704            // a `ReadHandle` and cloning that would cause additional work for
1705            // whoever guarantees those read holds.
1706            let (mut new_export_description, as_of): (ExportDescription<Self::Timestamp>, _) = {
1707                let export = &self.collections[&id];
1708                let DataSource::Sink { desc } = &export.data_source else {
1709                    panic!("export exists")
1710                };
1711                let CollectionStateExtra::Export(state) = &export.extra_state else {
1712                    panic!("export exists")
1713                };
1714                let export_description = desc.clone();
1715                let as_of = state.input_hold().since().clone();
1716
1717                (export_description, as_of)
1718            };
1719            let current_sink = new_export_description.sink.clone();
1720
1721            new_export_description.sink.connection = connection;
1722
1723            // Ensure compatibility
1724            current_sink.alter_compatible(id, &new_export_description.sink)?;
1725
1726            let from_storage_metadata = self
1727                .storage_collections
1728                .collection_metadata(new_export_description.sink.from)?;
1729            let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1730
1731            let cmd = RunSinkCommand {
1732                id,
1733                description: StorageSinkDesc {
1734                    from: new_export_description.sink.from,
1735                    from_desc: new_export_description.sink.from_desc.clone(),
1736                    connection: new_export_description.sink.connection.clone(),
1737                    envelope: new_export_description.sink.envelope,
1738                    with_snapshot: new_export_description.sink.with_snapshot,
1739                    version: new_export_description.sink.version,
1740                    // Here we are about to send a RunSinkCommand with the current read capaibility
1741                    // held by this sink. However, clusters are already running a version of the
1742                    // sink and nothing guarantees that by the time this command arrives at the
1743                    // clusters they won't have made additional progress such that this read
1744                    // capability is invalidated.
1745                    // The solution to this problem is for the controller to track specific
1746                    // executions of dataflows such that it can track the shutdown of the current
1747                    // instance and the initialization of the new instance separately and ensure
1748                    // read holds are held for the correct amount of time.
1749                    // TODO(petrosagg): change the controller to explicitly track dataflow executions
1750                    as_of: as_of.to_owned(),
1751                    from_storage_metadata,
1752                    to_storage_metadata,
1753                },
1754            };
1755
1756            let update = updates_by_instance
1757                .entry(new_export_description.instance_id)
1758                .or_default();
1759            update.push((cmd, new_export_description));
1760        }
1761
1762        for (instance_id, updates) in updates_by_instance {
1763            let mut export_updates = BTreeMap::new();
1764            let mut cmds = Vec::with_capacity(updates.len());
1765
1766            for (cmd, export_state) in updates {
1767                export_updates.insert(cmd.id, export_state);
1768                cmds.push(cmd);
1769            }
1770
1771            // Fetch the client for this exports's cluster.
1772            let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1773                StorageError::ExportInstanceMissing {
1774                    storage_instance_id: instance_id,
1775                    export_id: *export_updates
1776                        .keys()
1777                        .next()
1778                        .expect("set of exports not empty"),
1779                }
1780            })?;
1781
1782            for cmd in cmds {
1783                instance.send(StorageCommand::RunSink(Box::new(cmd)));
1784            }
1785
1786            // Update state only after all possible errors have occurred.
1787            for (id, new_export_description) in export_updates {
1788                let Some(state) = self.collections.get_mut(&id) else {
1789                    panic!("export known to exist")
1790                };
1791                let DataSource::Sink { desc } = &mut state.data_source else {
1792                    panic!("export known to exist")
1793                };
1794                *desc = new_export_description;
1795            }
1796        }
1797
1798        Ok(())
1799    }
1800
1801    // Dropping a table takes roughly the following flow:
1802    //
1803    // First determine if this is a TableWrites table or a source-fed table (an IngestionExport):
1804    //
1805    // If this is a TableWrites table:
1806    //   1. We remove the table from the persist table write worker.
1807    //   2. The table removal is awaited in an async task.
1808    //   3. A message is sent to the storage controller that the table has been removed from the
1809    //      table write worker.
1810    //   4. The controller drains all table drop messages during `process`.
1811    //   5. `process` calls `drop_sources` with the dropped tables.
1812    //
1813    // If this is an IngestionExport table:
1814    //   1. We validate the ids and then call drop_sources_unvalidated to proceed dropping.
1815    fn drop_tables(
1816        &mut self,
1817        storage_metadata: &StorageMetadata,
1818        identifiers: Vec<GlobalId>,
1819        ts: Self::Timestamp,
1820    ) -> Result<(), StorageError<Self::Timestamp>> {
1821        // Collect tables by their data_source
1822        let (table_write_ids, data_source_ids): (Vec<_>, Vec<_>) = identifiers
1823            .into_iter()
1824            .partition(|id| match self.collections[id].data_source {
1825                DataSource::Table { .. } => true,
1826                DataSource::IngestionExport { .. } | DataSource::Webhook => false,
1827                _ => panic!("identifier is not a table: {}", id),
1828            });
1829
1830        // Drop table write tables
1831        if table_write_ids.len() > 0 {
1832            let drop_notif = self
1833                .persist_table_worker
1834                .drop_handles(table_write_ids.clone(), ts);
1835            let tx = self.pending_table_handle_drops_tx.clone();
1836            mz_ore::task::spawn(|| "table-cleanup".to_string(), async move {
1837                drop_notif.await;
1838                for identifier in table_write_ids {
1839                    let _ = tx.send(identifier);
1840                }
1841            });
1842        }
1843
1844        // Drop source-fed tables
1845        if data_source_ids.len() > 0 {
1846            self.validate_collection_ids(data_source_ids.iter().cloned())?;
1847            self.drop_sources_unvalidated(storage_metadata, data_source_ids)?;
1848        }
1849
1850        Ok(())
1851    }
1852
1853    fn drop_sources(
1854        &mut self,
1855        storage_metadata: &StorageMetadata,
1856        identifiers: Vec<GlobalId>,
1857    ) -> Result<(), StorageError<Self::Timestamp>> {
1858        self.validate_collection_ids(identifiers.iter().cloned())?;
1859        self.drop_sources_unvalidated(storage_metadata, identifiers)
1860    }
1861
1862    fn drop_sources_unvalidated(
1863        &mut self,
1864        storage_metadata: &StorageMetadata,
1865        ids: Vec<GlobalId>,
1866    ) -> Result<(), StorageError<Self::Timestamp>> {
1867        // Keep track of which ingestions we have to execute still, and which we
1868        // have to change because of dropped subsources.
1869        let mut ingestions_to_execute = BTreeSet::new();
1870        let mut ingestions_to_drop = BTreeSet::new();
1871        let mut source_statistics_to_drop = Vec::new();
1872
1873        // Ingestions (and their exports) are also collections, but we keep
1874        // track of non-ingestion collections separately, because they have
1875        // slightly different cleanup logic below.
1876        let mut collections_to_drop = Vec::new();
1877
1878        for id in ids.iter() {
1879            let metadata = storage_metadata.get_collection_shard::<T>(*id);
1880            mz_ore::soft_assert_or_log!(
1881                matches!(metadata, Err(StorageError::IdentifierMissing(_))),
1882                "dropping {id}, but drop was not synchronized with storage \
1883                controller via `synchronize_collections`"
1884            );
1885
1886            let collection_state = self.collections.get(id);
1887
1888            if let Some(collection_state) = collection_state {
1889                match collection_state.data_source {
1890                    DataSource::Webhook => {
1891                        // TODO(parkmycar): The Collection Manager and PersistMonotonicWriter
1892                        // could probably use some love and maybe get merged together?
1893                        let fut = self.collection_manager.unregister_collection(*id);
1894                        mz_ore::task::spawn(|| format!("storage-webhook-cleanup-{id}"), fut);
1895
1896                        collections_to_drop.push(*id);
1897                        source_statistics_to_drop.push(*id);
1898                    }
1899                    DataSource::Ingestion(_) => {
1900                        ingestions_to_drop.insert(*id);
1901                        source_statistics_to_drop.push(*id);
1902                    }
1903                    DataSource::IngestionExport { ingestion_id, .. } => {
1904                        // If we are dropping source exports, we need to modify the
1905                        // ingestion that it runs on.
1906                        //
1907                        // If we remove this export, we need to stop producing data to
1908                        // it, so plan to re-execute the ingestion with the amended
1909                        // description.
1910                        ingestions_to_execute.insert(ingestion_id);
1911
1912                        // Adjust the source to remove this export.
1913                        let ingestion_state = match self.collections.get_mut(&ingestion_id) {
1914                            Some(ingestion_collection) => ingestion_collection,
1915                            // Primary ingestion already dropped.
1916                            None => {
1917                                tracing::error!(
1918                                    "primary source {ingestion_id} seemingly dropped before subsource {id}"
1919                                );
1920                                continue;
1921                            }
1922                        };
1923
1924                        match &mut ingestion_state.data_source {
1925                            DataSource::Ingestion(ingestion_desc) => {
1926                                let removed = ingestion_desc.source_exports.remove(id);
1927                                mz_ore::soft_assert_or_log!(
1928                                    removed.is_some(),
1929                                    "dropped subsource {id} already removed from source exports"
1930                                );
1931                            }
1932                            _ => unreachable!(
1933                                "SourceExport must only refer to primary sources that already exist"
1934                            ),
1935                        };
1936
1937                        // Ingestion exports also have ReadHolds that we need to
1938                        // downgrade, and much of their drop machinery is the
1939                        // same as for the "main" ingestion.
1940                        ingestions_to_drop.insert(*id);
1941                        source_statistics_to_drop.push(*id);
1942                    }
1943                    DataSource::Progress | DataSource::Table { .. } | DataSource::Other => {
1944                        collections_to_drop.push(*id);
1945                    }
1946                    DataSource::Introspection(_) | DataSource::Sink { .. } => {
1947                        // Collections of these types are either not sources and should be dropped
1948                        // through other means, or are sources but should never be dropped.
1949                        soft_panic_or_log!(
1950                            "drop_sources called on a {:?} (id={id}))",
1951                            collection_state.data_source,
1952                        );
1953                    }
1954                }
1955            }
1956        }
1957
1958        // Do not bother re-executing ingestions we know we plan to drop.
1959        ingestions_to_execute.retain(|id| !ingestions_to_drop.contains(id));
1960        for ingestion_id in ingestions_to_execute {
1961            self.run_ingestion(ingestion_id)?;
1962        }
1963
1964        // For ingestions, we fabricate a new hold that will propagate through
1965        // the cluster and then back to us.
1966
1967        // We don't explicitly remove read capabilities! Downgrading the
1968        // frontier of the source to `[]` (the empty Antichain), will propagate
1969        // to the storage dependencies.
1970        let ingestion_policies = ingestions_to_drop
1971            .iter()
1972            .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
1973            .collect();
1974
1975        tracing::debug!(
1976            ?ingestion_policies,
1977            "dropping sources by setting read hold policies"
1978        );
1979        self.set_hold_policies(ingestion_policies);
1980
1981        // Delete all collection->shard mappings
1982        let shards_to_update: BTreeSet<_> = ingestions_to_drop
1983            .iter()
1984            .chain(collections_to_drop.iter())
1985            .cloned()
1986            .collect();
1987        self.append_shard_mappings(shards_to_update.into_iter(), Diff::MINUS_ONE);
1988
1989        let status_now = mz_ore::now::to_datetime((self.now)());
1990        let mut status_updates = vec![];
1991        for id in ingestions_to_drop.iter() {
1992            status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
1993        }
1994
1995        if !self.read_only {
1996            self.append_status_introspection_updates(
1997                IntrospectionType::SourceStatusHistory,
1998                status_updates,
1999            );
2000        }
2001
2002        {
2003            let mut source_statistics = self.source_statistics.lock().expect("poisoned");
2004            for id in source_statistics_to_drop {
2005                source_statistics.source_statistics.remove(&id);
2006                source_statistics.webhook_statistics.remove(&id);
2007            }
2008        }
2009
2010        // Remove collection state
2011        for id in ingestions_to_drop.iter().chain(collections_to_drop.iter()) {
2012            tracing::info!(%id, "dropping collection state");
2013            let collection = self
2014                .collections
2015                .remove(id)
2016                .expect("list populated after checking that self.collections contains it");
2017
2018            let instance = match &collection.extra_state {
2019                CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
2020                CollectionStateExtra::Export(export) => Some(export.cluster_id()),
2021                CollectionStateExtra::None => None,
2022            }
2023            .and_then(|i| self.instances.get(&i));
2024
2025            // Record which replicas were running a collection, so that we can
2026            // match DroppedId messages against them and eventually remove state
2027            // from self.dropped_objects
2028            if let Some(instance) = instance {
2029                let active_replicas = instance.get_active_replicas_for_object(id);
2030                if !active_replicas.is_empty() {
2031                    // The remap collection of an ingestion doesn't have extra
2032                    // state and doesn't have an instance_id, but we still get
2033                    // upper updates for them, so want to make sure to populate
2034                    // dropped_ids.
2035                    // TODO(aljoscha): All this is a bit icky. But here we are
2036                    // for now...
2037                    match &collection.data_source {
2038                        DataSource::Ingestion(ingestion_desc) => {
2039                            self.dropped_objects.insert(
2040                                ingestion_desc.remap_collection_id,
2041                                active_replicas.clone(),
2042                            );
2043                        }
2044                        _ => {}
2045                    }
2046
2047                    self.dropped_objects.insert(*id, active_replicas);
2048                }
2049            }
2050        }
2051
2052        // Also let StorageCollections know!
2053        self.storage_collections
2054            .drop_collections_unvalidated(storage_metadata, ids);
2055
2056        Ok(())
2057    }
2058
2059    /// Drops the read capability for the sinks and allows their resources to be reclaimed.
2060    fn drop_sinks(
2061        &mut self,
2062        storage_metadata: &StorageMetadata,
2063        identifiers: Vec<GlobalId>,
2064    ) -> Result<(), StorageError<Self::Timestamp>> {
2065        self.validate_export_ids(identifiers.iter().cloned())?;
2066        self.drop_sinks_unvalidated(storage_metadata, identifiers);
2067        Ok(())
2068    }
2069
2070    fn drop_sinks_unvalidated(
2071        &mut self,
2072        storage_metadata: &StorageMetadata,
2073        mut sinks_to_drop: Vec<GlobalId>,
2074    ) {
2075        // Ignore exports that have already been removed.
2076        sinks_to_drop.retain(|id| self.export(*id).is_ok());
2077
2078        // TODO: ideally we'd advance the write frontier ourselves here, but this function's
2079        // not yet marked async.
2080
2081        // We don't explicitly remove read capabilities! Downgrading the
2082        // frontier of the source to `[]` (the empty Antichain), will propagate
2083        // to the storage dependencies.
2084        let drop_policy = sinks_to_drop
2085            .iter()
2086            .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
2087            .collect();
2088
2089        tracing::debug!(
2090            ?drop_policy,
2091            "dropping sources by setting read hold policies"
2092        );
2093        self.set_hold_policies(drop_policy);
2094
2095        // Record the drop status for all sink drops.
2096        //
2097        // We also delete the items' statistics objects.
2098        //
2099        // The locks are held for a short time, only while we do some removals from a map.
2100
2101        let status_now = mz_ore::now::to_datetime((self.now)());
2102
2103        // Record the drop status for all pending sink drops.
2104        let mut status_updates = vec![];
2105        {
2106            let mut sink_statistics = self.sink_statistics.lock().expect("poisoned");
2107            for id in sinks_to_drop.iter() {
2108                status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
2109                sink_statistics.remove(id);
2110            }
2111        }
2112
2113        if !self.read_only {
2114            self.append_status_introspection_updates(
2115                IntrospectionType::SinkStatusHistory,
2116                status_updates,
2117            );
2118        }
2119
2120        // Remove collection/export state
2121        for id in sinks_to_drop.iter() {
2122            tracing::info!(%id, "dropping export state");
2123            let collection = self
2124                .collections
2125                .remove(id)
2126                .expect("list populated after checking that self.collections contains it");
2127
2128            let instance = match &collection.extra_state {
2129                CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
2130                CollectionStateExtra::Export(export) => Some(export.cluster_id()),
2131                CollectionStateExtra::None => None,
2132            }
2133            .and_then(|i| self.instances.get(&i));
2134
2135            // Record how many replicas were running an export, so that we can
2136            // match `DroppedId` messages against it and eventually remove state
2137            // from `self.dropped_objects`.
2138            if let Some(instance) = instance {
2139                let active_replicas = instance.get_active_replicas_for_object(id);
2140                if !active_replicas.is_empty() {
2141                    self.dropped_objects.insert(*id, active_replicas);
2142                }
2143            }
2144        }
2145
2146        // Also let StorageCollections know!
2147        self.storage_collections
2148            .drop_collections_unvalidated(storage_metadata, sinks_to_drop);
2149    }
2150
2151    #[instrument(level = "debug")]
2152    fn append_table(
2153        &mut self,
2154        write_ts: Self::Timestamp,
2155        advance_to: Self::Timestamp,
2156        commands: Vec<(GlobalId, Vec<TableData>)>,
2157    ) -> Result<
2158        tokio::sync::oneshot::Receiver<Result<(), StorageError<Self::Timestamp>>>,
2159        StorageError<Self::Timestamp>,
2160    > {
2161        if self.read_only {
2162            // While in read only mode, ONLY collections that have been migrated
2163            // and need to be re-hydrated in read only mode can be written to.
2164            if !commands
2165                .iter()
2166                .all(|(id, _)| id.is_system() && self.migrated_storage_collections.contains(id))
2167            {
2168                return Err(StorageError::ReadOnly);
2169            }
2170        }
2171
2172        // TODO(petrosagg): validate appends against the expected RelationDesc of the collection
2173        for (id, updates) in commands.iter() {
2174            if !updates.is_empty() {
2175                if !write_ts.less_than(&advance_to) {
2176                    return Err(StorageError::UpdateBeyondUpper(*id));
2177                }
2178            }
2179        }
2180
2181        Ok(self
2182            .persist_table_worker
2183            .append(write_ts, advance_to, commands))
2184    }
2185
2186    fn monotonic_appender(
2187        &self,
2188        id: GlobalId,
2189    ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>> {
2190        self.collection_manager.monotonic_appender(id)
2191    }
2192
2193    fn webhook_statistics(
2194        &self,
2195        id: GlobalId,
2196    ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>> {
2197        // Call to this method are usually cached so the lock is not in the critical path.
2198        let source_statistics = self.source_statistics.lock().expect("poisoned");
2199        source_statistics
2200            .webhook_statistics
2201            .get(&id)
2202            .cloned()
2203            .ok_or(StorageError::IdentifierMissing(id))
2204    }
2205
2206    async fn ready(&mut self) {
2207        if self.maintenance_scheduled {
2208            return;
2209        }
2210
2211        if !self.pending_table_handle_drops_rx.is_empty() {
2212            return;
2213        }
2214
2215        tokio::select! {
2216            Some(m) = self.instance_response_rx.recv() => {
2217                self.stashed_responses.push(m);
2218                while let Ok(m) = self.instance_response_rx.try_recv() {
2219                    self.stashed_responses.push(m);
2220                }
2221            }
2222            _ = self.maintenance_ticker.tick() => {
2223                self.maintenance_scheduled = true;
2224            },
2225        };
2226    }
2227
2228    #[instrument(level = "debug")]
2229    fn process(
2230        &mut self,
2231        storage_metadata: &StorageMetadata,
2232    ) -> Result<Option<Response<T>>, anyhow::Error> {
2233        // Perform periodic maintenance work.
2234        if self.maintenance_scheduled {
2235            self.maintain();
2236            self.maintenance_scheduled = false;
2237        }
2238
2239        for instance in self.instances.values_mut() {
2240            instance.rehydrate_failed_replicas();
2241        }
2242
2243        let mut status_updates = vec![];
2244        let mut updated_frontiers = BTreeMap::new();
2245
2246        // Take the currently stashed responses so that we can call mut receiver functions in the loop.
2247        let stashed_responses = std::mem::take(&mut self.stashed_responses);
2248        for resp in stashed_responses {
2249            match resp {
2250                (_replica_id, StorageResponse::FrontierUpper(id, upper)) => {
2251                    self.update_write_frontier(id, &upper);
2252                    updated_frontiers.insert(id, upper);
2253                }
2254                (replica_id, StorageResponse::DroppedId(id)) => {
2255                    let replica_id = replica_id.expect("DroppedId from unknown replica");
2256                    if let Some(remaining_replicas) = self.dropped_objects.get_mut(&id) {
2257                        remaining_replicas.remove(&replica_id);
2258                        if remaining_replicas.is_empty() {
2259                            self.dropped_objects.remove(&id);
2260                        }
2261                    } else {
2262                        soft_panic_or_log!("unexpected DroppedId for {id}");
2263                    }
2264                }
2265                (_replica_id, StorageResponse::StatisticsUpdates(source_stats, sink_stats)) => {
2266                    // Note we only hold the locks while moving some plain-old-data around here.
2267                    //
2268                    // We just write the whole object, as the update from storage represents the
2269                    // current values.
2270                    //
2271                    // We don't overwrite removed objects, as we may have received a late
2272                    // `StatisticsUpdates` while we were shutting down the storage object.
2273                    {
2274                        let mut shared_stats = self.source_statistics.lock().expect("poisoned");
2275                        for stat in source_stats {
2276                            // Don't override it if its been removed.
2277                            shared_stats
2278                                .source_statistics
2279                                .entry(stat.id)
2280                                .and_modify(|current| current.stat().incorporate(stat));
2281                        }
2282                    }
2283
2284                    {
2285                        let mut shared_stats = self.sink_statistics.lock().expect("poisoned");
2286                        for stat in sink_stats {
2287                            // Don't override it if its been removed.
2288                            shared_stats
2289                                .entry(stat.id)
2290                                .and_modify(|current| current.stat().incorporate(stat));
2291                        }
2292                    }
2293                }
2294                (replica_id, StorageResponse::StatusUpdate(mut status_update)) => {
2295                    // NOTE(aljoscha): We sniff out the hydration status for
2296                    // ingestions from status updates. This is the easiest we
2297                    // can do right now, without going deeper into changing the
2298                    // comms protocol between controller and cluster. We cannot,
2299                    // for example use `StorageResponse::FrontierUpper`,
2300                    // because those will already get sent when the ingestion is
2301                    // just being created.
2302                    //
2303                    // Sources differ in when they will report as Running. Kafka
2304                    // UPSERT sources will only switch to `Running` once their
2305                    // state has been initialized from persist, which is the
2306                    // first case that we care about right now.
2307                    //
2308                    // I wouldn't say it's ideal, but it's workable until we
2309                    // find something better.
2310                    match status_update.status {
2311                        Status::Running => {
2312                            let collection = self.collections.get_mut(&status_update.id);
2313                            match collection {
2314                                Some(collection) => {
2315                                    match collection.extra_state {
2316                                        CollectionStateExtra::Ingestion(
2317                                            ref mut ingestion_state,
2318                                        ) => {
2319                                            if ingestion_state.hydrated_on.is_empty() {
2320                                                tracing::debug!(ingestion_id = %status_update.id, "ingestion is hydrated");
2321                                            }
2322                                            ingestion_state.hydrated_on.insert(replica_id.expect(
2323                                                "replica id should be present for status running",
2324                                            ));
2325                                        }
2326                                        CollectionStateExtra::Export(_) => {
2327                                            // TODO(sinks): track sink hydration?
2328                                        }
2329                                        CollectionStateExtra::None => {
2330                                            // Nothing to do
2331                                        }
2332                                    }
2333                                }
2334                                None => (), // no collection, let's say that's fine
2335                                            // here
2336                            }
2337                        }
2338                        Status::Paused => {
2339                            let collection = self.collections.get_mut(&status_update.id);
2340                            match collection {
2341                                Some(collection) => {
2342                                    match collection.extra_state {
2343                                        CollectionStateExtra::Ingestion(
2344                                            ref mut ingestion_state,
2345                                        ) => {
2346                                            // TODO: Paused gets send when there
2347                                            // are no active replicas. We should
2348                                            // change this to send a targeted
2349                                            // Pause for each replica, and do
2350                                            // more fine-grained hydration
2351                                            // tracking here.
2352                                            tracing::debug!(ingestion_id = %status_update.id, "ingestion is now paused");
2353                                            ingestion_state.hydrated_on.clear();
2354                                        }
2355                                        CollectionStateExtra::Export(_) => {
2356                                            // TODO(sinks): track sink hydration?
2357                                        }
2358                                        CollectionStateExtra::None => {
2359                                            // Nothing to do
2360                                        }
2361                                    }
2362                                }
2363                                None => (), // no collection, let's say that's fine
2364                                            // here
2365                            }
2366                        }
2367                        _ => (),
2368                    }
2369
2370                    // Set replica_id in the status update if available
2371                    if let Some(id) = replica_id {
2372                        status_update.replica_id = Some(id);
2373                    }
2374                    status_updates.push(status_update);
2375                }
2376                (_replica_id, StorageResponse::StagedBatches(batches)) => {
2377                    for (ingestion_id, batches) in batches {
2378                        match self.pending_oneshot_ingestions.remove(&ingestion_id) {
2379                            Some(pending) => {
2380                                // Send a cancel command so our command history is correct. And to
2381                                // avoid duplicate work once we have active replication.
2382                                if let Some(instance) = self.instances.get_mut(&pending.cluster_id)
2383                                {
2384                                    instance
2385                                        .send(StorageCommand::CancelOneshotIngestion(ingestion_id));
2386                                }
2387                                // Send the results down our channel.
2388                                (pending.result_tx)(batches)
2389                            }
2390                            None => {
2391                                // We might not be tracking this oneshot ingestion anymore because
2392                                // it was canceled.
2393                            }
2394                        }
2395                    }
2396                }
2397            }
2398        }
2399
2400        self.record_status_updates(status_updates);
2401
2402        // Process dropped tables in a single batch.
2403        let mut dropped_table_ids = Vec::new();
2404        while let Ok(dropped_id) = self.pending_table_handle_drops_rx.try_recv() {
2405            dropped_table_ids.push(dropped_id);
2406        }
2407        if !dropped_table_ids.is_empty() {
2408            self.drop_sources(storage_metadata, dropped_table_ids)?;
2409        }
2410
2411        if updated_frontiers.is_empty() {
2412            Ok(None)
2413        } else {
2414            Ok(Some(Response::FrontierUpdates(
2415                updated_frontiers.into_iter().collect(),
2416            )))
2417        }
2418    }
2419
2420    async fn inspect_persist_state(
2421        &self,
2422        id: GlobalId,
2423    ) -> Result<serde_json::Value, anyhow::Error> {
2424        let collection = &self.storage_collections.collection_metadata(id)?;
2425        let client = self
2426            .persist
2427            .open(collection.persist_location.clone())
2428            .await?;
2429        let shard_state = client
2430            .inspect_shard::<Self::Timestamp>(&collection.data_shard)
2431            .await?;
2432        let json_state = serde_json::to_value(shard_state)?;
2433        Ok(json_state)
2434    }
2435
2436    fn append_introspection_updates(
2437        &mut self,
2438        type_: IntrospectionType,
2439        updates: Vec<(Row, Diff)>,
2440    ) {
2441        let id = self.introspection_ids[&type_];
2442        let updates = updates.into_iter().map(|update| update.into()).collect();
2443        self.collection_manager.blind_write(id, updates);
2444    }
2445
2446    fn append_status_introspection_updates(
2447        &mut self,
2448        type_: IntrospectionType,
2449        updates: Vec<StatusUpdate>,
2450    ) {
2451        let id = self.introspection_ids[&type_];
2452        let updates: Vec<_> = updates.into_iter().map(|update| update.into()).collect();
2453        if !updates.is_empty() {
2454            self.collection_manager.blind_write(id, updates);
2455        }
2456    }
2457
2458    fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp) {
2459        let id = self.introspection_ids[&type_];
2460        self.collection_manager.differential_write(id, op);
2461    }
2462
2463    fn append_only_introspection_tx(
2464        &self,
2465        type_: IntrospectionType,
2466    ) -> mpsc::UnboundedSender<(
2467        Vec<AppendOnlyUpdate>,
2468        oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2469    )> {
2470        let id = self.introspection_ids[&type_];
2471        self.collection_manager.append_only_write_sender(id)
2472    }
2473
2474    fn differential_introspection_tx(
2475        &self,
2476        type_: IntrospectionType,
2477    ) -> mpsc::UnboundedSender<(
2478        StorageWriteOp,
2479        oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2480    )> {
2481        let id = self.introspection_ids[&type_];
2482        self.collection_manager.differential_write_sender(id)
2483    }
2484
2485    async fn real_time_recent_timestamp(
2486        &self,
2487        timestamp_objects: BTreeSet<GlobalId>,
2488        timeout: Duration,
2489    ) -> Result<
2490        BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>,
2491        StorageError<Self::Timestamp>,
2492    > {
2493        use mz_storage_types::sources::GenericSourceConnection;
2494
2495        let mut rtr_futures = BTreeMap::new();
2496
2497        // Only user sources can be read from w/ RTR.
2498        for id in timestamp_objects.into_iter().filter(GlobalId::is_user) {
2499            let collection = match self.collection(id) {
2500                Ok(c) => c,
2501                // Not a storage item, which we accept.
2502                Err(_) => continue,
2503            };
2504
2505            let (source_conn, remap_id) = match &collection.data_source {
2506                DataSource::Ingestion(IngestionDescription {
2507                    desc: SourceDesc { connection, .. },
2508                    remap_collection_id,
2509                    ..
2510                }) => match connection {
2511                    GenericSourceConnection::Kafka(_)
2512                    | GenericSourceConnection::Postgres(_)
2513                    | GenericSourceConnection::MySql(_)
2514                    | GenericSourceConnection::SqlServer(_) => {
2515                        (connection.clone(), *remap_collection_id)
2516                    }
2517
2518                    // These internal sources do not yet (and might never)
2519                    // support RTR. However, erroring if they're selected from
2520                    // poses an annoying user experience, so instead just skip
2521                    // over them.
2522                    GenericSourceConnection::LoadGenerator(_) => continue,
2523                },
2524                // Skip over all other objects
2525                _ => {
2526                    continue;
2527                }
2528            };
2529
2530            // Prepare for getting the external system's frontier.
2531            let config = self.config().clone();
2532
2533            // Determine the remap collection we plan to read from.
2534            //
2535            // Note that the process of reading from the remap shard is the same
2536            // as other areas in this code that do the same thing, but we inline
2537            // it here because we must prove that we have not taken ownership of
2538            // `self` to move the stream of data from the remap shard into a
2539            // future.
2540            let read_handle = self.read_handle_for_snapshot(remap_id).await?;
2541
2542            // Have to acquire a read hold to prevent the since from advancing
2543            // while we read.
2544            let remap_read_hold = self
2545                .storage_collections
2546                .acquire_read_holds(vec![remap_id])
2547                .map_err(|_e| StorageError::ReadBeforeSince(remap_id))?
2548                .expect_element(|| "known to be exactly one");
2549
2550            let remap_as_of = remap_read_hold
2551                .since()
2552                .to_owned()
2553                .into_option()
2554                .ok_or(StorageError::ReadBeforeSince(remap_id))?;
2555
2556            rtr_futures.insert(
2557                id,
2558                tokio::time::timeout(timeout, async move {
2559                    use mz_storage_types::sources::SourceConnection as _;
2560
2561                    // Fetch the remap shard's contents; we must do this first so
2562                    // that the `as_of` doesn't change.
2563                    let as_of = Antichain::from_elem(remap_as_of);
2564                    let remap_subscribe = read_handle
2565                        .subscribe(as_of.clone())
2566                        .await
2567                        .map_err(|_| StorageError::ReadBeforeSince(remap_id))?;
2568
2569                    tracing::debug!(?id, type_ = source_conn.name(), upstream = ?source_conn.external_reference(), "fetching real time recency");
2570
2571                    let result = rtr::real_time_recency_ts(source_conn, id, config, as_of, remap_subscribe)
2572                        .await.map_err(|e| {
2573                            tracing::debug!(?id, "real time recency error: {:?}", e);
2574                            e
2575                        });
2576
2577                    // Drop once we have read succesfully.
2578                    drop(remap_read_hold);
2579
2580                    result
2581                }),
2582            );
2583        }
2584
2585        Ok(Box::pin(async move {
2586            let (ids, futs): (Vec<_>, Vec<_>) = rtr_futures.into_iter().unzip();
2587            ids.into_iter()
2588                .zip_eq(futures::future::join_all(futs).await)
2589                .try_fold(T::minimum(), |curr, (id, per_source_res)| {
2590                    let new =
2591                        per_source_res.map_err(|_e: Elapsed| StorageError::RtrTimeout(id))??;
2592                    Ok::<_, StorageError<Self::Timestamp>>(std::cmp::max(curr, new))
2593                })
2594        }))
2595    }
2596}
2597
2598/// Seed [`StorageTxn`] with any state required to instantiate a
2599/// [`StorageController`].
2600///
2601/// This cannot be a member of [`StorageController`] because it cannot take a
2602/// `self` parameter.
2603///
2604pub fn prepare_initialization<T>(txn: &mut dyn StorageTxn<T>) -> Result<(), StorageError<T>> {
2605    if txn.get_txn_wal_shard().is_none() {
2606        let txns_id = ShardId::new();
2607        txn.write_txn_wal_shard(txns_id)?;
2608    }
2609
2610    Ok(())
2611}
2612
2613impl<T> Controller<T>
2614where
2615    T: Timestamp
2616        + Lattice
2617        + TotalOrder
2618        + Codec64
2619        + From<EpochMillis>
2620        + TimestampManipulation
2621        + Into<Datum<'static>>,
2622    StorageCommand<T>: RustType<ProtoStorageCommand>,
2623    StorageResponse<T>: RustType<ProtoStorageResponse>,
2624    Self: StorageController<Timestamp = T>,
2625{
2626    /// Create a new storage controller from a client it should wrap.
2627    ///
2628    /// Note that when creating a new storage controller, you must also
2629    /// reconcile it with the previous state.
2630    ///
2631    /// # Panics
2632    /// If this function is called before [`prepare_initialization`].
2633    pub async fn new(
2634        build_info: &'static BuildInfo,
2635        persist_location: PersistLocation,
2636        persist_clients: Arc<PersistClientCache>,
2637        now: NowFn,
2638        wallclock_lag: WallclockLagFn<T>,
2639        txns_metrics: Arc<TxnMetrics>,
2640        envd_epoch: NonZeroI64,
2641        read_only: bool,
2642        metrics_registry: &MetricsRegistry,
2643        controller_metrics: ControllerMetrics,
2644        connection_context: ConnectionContext,
2645        txn: &dyn StorageTxn<T>,
2646        storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
2647    ) -> Self {
2648        let txns_client = persist_clients
2649            .open(persist_location.clone())
2650            .await
2651            .expect("location should be valid");
2652
2653        let persist_warm_task = warm_persist_state_in_background(
2654            txns_client.clone(),
2655            txn.get_collection_metadata().into_values(),
2656        );
2657        let persist_warm_task = Some(persist_warm_task.abort_on_drop());
2658
2659        // This value must be already installed because we must ensure it's
2660        // durably recorded before it is used, otherwise we risk leaking persist
2661        // state.
2662        let txns_id = txn
2663            .get_txn_wal_shard()
2664            .expect("must call prepare initialization before creating storage controller");
2665
2666        let persist_table_worker = if read_only {
2667            let txns_write = txns_client
2668                .open_writer(
2669                    txns_id,
2670                    Arc::new(TxnsCodecRow::desc()),
2671                    Arc::new(UnitSchema),
2672                    Diagnostics {
2673                        shard_name: "txns".to_owned(),
2674                        handle_purpose: "follow txns upper".to_owned(),
2675                    },
2676                )
2677                .await
2678                .expect("txns schema shouldn't change");
2679            persist_handles::PersistTableWriteWorker::new_read_only_mode(txns_write)
2680        } else {
2681            let txns = TxnsHandle::open(
2682                T::minimum(),
2683                txns_client.clone(),
2684                txns_client.dyncfgs().clone(),
2685                Arc::clone(&txns_metrics),
2686                txns_id,
2687            )
2688            .await;
2689            persist_handles::PersistTableWriteWorker::new_txns(txns)
2690        };
2691        let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
2692
2693        let collection_manager = collection_mgmt::CollectionManager::new(read_only, now.clone());
2694
2695        let introspection_ids = BTreeMap::new();
2696        let introspection_tokens = Arc::new(Mutex::new(BTreeMap::new()));
2697
2698        let (statistics_interval_sender, _) =
2699            channel(mz_storage_types::parameters::STATISTICS_INTERVAL_DEFAULT);
2700
2701        let (pending_table_handle_drops_tx, pending_table_handle_drops_rx) =
2702            tokio::sync::mpsc::unbounded_channel();
2703
2704        let mut maintenance_ticker = tokio::time::interval(Duration::from_secs(1));
2705        maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2706
2707        let (instance_response_tx, instance_response_rx) = mpsc::unbounded_channel();
2708
2709        let metrics = StorageControllerMetrics::new(metrics_registry, controller_metrics);
2710
2711        let now_dt = mz_ore::now::to_datetime(now());
2712
2713        Self {
2714            build_info,
2715            collections: BTreeMap::default(),
2716            dropped_objects: Default::default(),
2717            persist_table_worker,
2718            txns_read,
2719            txns_metrics,
2720            stashed_responses: vec![],
2721            pending_table_handle_drops_tx,
2722            pending_table_handle_drops_rx,
2723            pending_oneshot_ingestions: BTreeMap::default(),
2724            collection_manager,
2725            introspection_ids,
2726            introspection_tokens,
2727            now,
2728            envd_epoch,
2729            read_only,
2730            source_statistics: Arc::new(Mutex::new(statistics::SourceStatistics {
2731                source_statistics: BTreeMap::new(),
2732                webhook_statistics: BTreeMap::new(),
2733            })),
2734            sink_statistics: Arc::new(Mutex::new(BTreeMap::new())),
2735            statistics_interval_sender,
2736            instances: BTreeMap::new(),
2737            initialized: false,
2738            config: StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs()),
2739            persist_location,
2740            persist: persist_clients,
2741            metrics,
2742            recorded_frontiers: BTreeMap::new(),
2743            recorded_replica_frontiers: BTreeMap::new(),
2744            wallclock_lag,
2745            wallclock_lag_last_recorded: now_dt,
2746            storage_collections,
2747            migrated_storage_collections: BTreeSet::new(),
2748            maintenance_ticker,
2749            maintenance_scheduled: false,
2750            instance_response_rx,
2751            instance_response_tx,
2752            persist_warm_task,
2753        }
2754    }
2755
2756    // This is different from `set_read_policies`, which is for external users.
2757    // This method is for setting the policy that the controller uses when
2758    // maintaining the read holds that it has for collections/exports at the
2759    // StorageCollections.
2760    //
2761    // This is really only used when dropping things, where we set the
2762    // ReadPolicy to the empty Antichain.
2763    #[instrument(level = "debug")]
2764    fn set_hold_policies(&mut self, policies: Vec<(GlobalId, ReadPolicy<T>)>) {
2765        let mut read_capability_changes = BTreeMap::default();
2766
2767        for (id, policy) in policies.into_iter() {
2768            if let Some(collection) = self.collections.get_mut(&id) {
2769                let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state
2770                {
2771                    CollectionStateExtra::Ingestion(ingestion) => (
2772                        ingestion.write_frontier.borrow(),
2773                        &mut ingestion.derived_since,
2774                        &mut ingestion.hold_policy,
2775                    ),
2776                    CollectionStateExtra::None => {
2777                        unreachable!("set_hold_policies is only called for ingestions");
2778                    }
2779                    CollectionStateExtra::Export(export) => (
2780                        export.write_frontier.borrow(),
2781                        &mut export.derived_since,
2782                        &mut export.read_policy,
2783                    ),
2784                };
2785
2786                let new_derived_since = policy.frontier(write_frontier);
2787                let mut update = swap_updates(derived_since, new_derived_since);
2788                if !update.is_empty() {
2789                    read_capability_changes.insert(id, update);
2790                }
2791
2792                *hold_policy = policy;
2793            }
2794        }
2795
2796        if !read_capability_changes.is_empty() {
2797            self.update_hold_capabilities(&mut read_capability_changes);
2798        }
2799    }
2800
2801    #[instrument(level = "debug", fields(updates))]
2802    fn update_write_frontier(&mut self, id: GlobalId, new_upper: &Antichain<T>) {
2803        let mut read_capability_changes = BTreeMap::default();
2804
2805        if let Some(collection) = self.collections.get_mut(&id) {
2806            let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state {
2807                CollectionStateExtra::Ingestion(ingestion) => (
2808                    &mut ingestion.write_frontier,
2809                    &mut ingestion.derived_since,
2810                    &ingestion.hold_policy,
2811                ),
2812                CollectionStateExtra::None => {
2813                    if matches!(collection.data_source, DataSource::Progress) {
2814                        // We do get these, but can't do anything with it!
2815                    } else {
2816                        tracing::error!(
2817                            ?collection,
2818                            ?new_upper,
2819                            "updated write frontier for collection which is not an ingestion"
2820                        );
2821                    }
2822                    return;
2823                }
2824                CollectionStateExtra::Export(export) => (
2825                    &mut export.write_frontier,
2826                    &mut export.derived_since,
2827                    &export.read_policy,
2828                ),
2829            };
2830
2831            if PartialOrder::less_than(write_frontier, new_upper) {
2832                write_frontier.clone_from(new_upper);
2833            }
2834
2835            let new_derived_since = hold_policy.frontier(write_frontier.borrow());
2836            let mut update = swap_updates(derived_since, new_derived_since);
2837            if !update.is_empty() {
2838                read_capability_changes.insert(id, update);
2839            }
2840        } else if self.dropped_objects.contains_key(&id) {
2841            // We dropped an object but might still get updates from cluster
2842            // side, before it notices the drop. This is expected and fine.
2843        } else {
2844            soft_panic_or_log!("spurious upper update for {id}: {new_upper:?}");
2845        }
2846
2847        if !read_capability_changes.is_empty() {
2848            self.update_hold_capabilities(&mut read_capability_changes);
2849        }
2850    }
2851
2852    // This is different from `update_read_capabilities`, which is for external users.
2853    // This method is for maintaining the read holds that the controller has at
2854    // the StorageCollections, for storage dependencies.
2855    #[instrument(level = "debug", fields(updates))]
2856    fn update_hold_capabilities(&mut self, updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>) {
2857        // Location to record consequences that we need to act on.
2858        let mut collections_net = BTreeMap::new();
2859
2860        // We must not rely on any specific relative ordering of `GlobalId`s.
2861        // That said, it is reasonable to assume that collections generally have
2862        // greater IDs than their dependencies, so starting with the largest is
2863        // a useful optimization.
2864        while let Some(key) = updates.keys().rev().next().cloned() {
2865            let mut update = updates.remove(&key).unwrap();
2866
2867            if key.is_user() {
2868                debug!(id = %key, ?update, "update_hold_capability");
2869            }
2870
2871            if let Some(collection) = self.collections.get_mut(&key) {
2872                match &mut collection.extra_state {
2873                    CollectionStateExtra::Ingestion(ingestion) => {
2874                        let changes = ingestion.read_capabilities.update_iter(update.drain());
2875                        update.extend(changes);
2876
2877                        let (changes, frontier, _cluster_id) =
2878                            collections_net.entry(key).or_insert_with(|| {
2879                                (
2880                                    <ChangeBatch<_>>::new(),
2881                                    Antichain::new(),
2882                                    ingestion.instance_id,
2883                                )
2884                            });
2885
2886                        changes.extend(update.drain());
2887                        *frontier = ingestion.read_capabilities.frontier().to_owned();
2888                    }
2889                    CollectionStateExtra::None => {
2890                        // WIP: See if this ever panics in ci.
2891                        soft_panic_or_log!(
2892                            "trying to update holds for collection {collection:?} which is not \
2893                             an ingestion: {update:?}"
2894                        );
2895                        continue;
2896                    }
2897                    CollectionStateExtra::Export(export) => {
2898                        let changes = export.read_capabilities.update_iter(update.drain());
2899                        update.extend(changes);
2900
2901                        let (changes, frontier, _cluster_id) =
2902                            collections_net.entry(key).or_insert_with(|| {
2903                                (<ChangeBatch<_>>::new(), Antichain::new(), export.cluster_id)
2904                            });
2905
2906                        changes.extend(update.drain());
2907                        *frontier = export.read_capabilities.frontier().to_owned();
2908                    }
2909                }
2910            } else {
2911                // This is confusing and we should probably error.
2912                tracing::warn!(id = ?key, ?update, "update_hold_capabilities for unknown object");
2913            }
2914        }
2915
2916        // Translate our net compute actions into `AllowCompaction` commands and
2917        // downgrade persist sinces.
2918        for (key, (mut changes, frontier, cluster_id)) in collections_net {
2919            if !changes.is_empty() {
2920                if key.is_user() {
2921                    debug!(id = %key, ?frontier, "downgrading ingestion read holds!");
2922                }
2923
2924                let collection = self
2925                    .collections
2926                    .get_mut(&key)
2927                    .expect("missing collection state");
2928
2929                let read_holds = match &mut collection.extra_state {
2930                    CollectionStateExtra::Ingestion(ingestion) => {
2931                        ingestion.dependency_read_holds.as_mut_slice()
2932                    }
2933                    CollectionStateExtra::Export(export) => export.read_holds.as_mut_slice(),
2934                    CollectionStateExtra::None => {
2935                        soft_panic_or_log!(
2936                            "trying to downgrade read holds for collection which is not an \
2937                             ingestion: {collection:?}"
2938                        );
2939                        continue;
2940                    }
2941                };
2942
2943                for read_hold in read_holds.iter_mut() {
2944                    read_hold
2945                        .try_downgrade(frontier.clone())
2946                        .expect("we only advance the frontier");
2947                }
2948
2949                // Send AllowCompaction command directly to the instance
2950                if let Some(instance) = self.instances.get_mut(&cluster_id) {
2951                    instance.send(StorageCommand::AllowCompaction(key, frontier.clone()));
2952                } else {
2953                    soft_panic_or_log!(
2954                        "missing instance client for cluster {cluster_id} while we still have outstanding AllowCompaction command {frontier:?} for {key}"
2955                    );
2956                }
2957            }
2958        }
2959    }
2960
2961    /// Validate that a collection exists for all identifiers, and error if any do not.
2962    fn validate_collection_ids(
2963        &self,
2964        ids: impl Iterator<Item = GlobalId>,
2965    ) -> Result<(), StorageError<T>> {
2966        for id in ids {
2967            self.storage_collections.check_exists(id)?;
2968        }
2969        Ok(())
2970    }
2971
2972    /// Validate that a collection exists for all identifiers, and error if any do not.
2973    fn validate_export_ids(
2974        &self,
2975        ids: impl Iterator<Item = GlobalId>,
2976    ) -> Result<(), StorageError<T>> {
2977        for id in ids {
2978            self.export(id)?;
2979        }
2980        Ok(())
2981    }
2982
2983    /// Opens a write and critical since handles for the given `shard`.
2984    ///
2985    /// `since` is an optional `since` that the read handle will be forwarded to if it is less than
2986    /// its current since.
2987    ///
2988    /// This will `halt!` the process if we cannot successfully acquire a critical handle with our
2989    /// current epoch.
2990    async fn open_data_handles(
2991        &self,
2992        id: &GlobalId,
2993        shard: ShardId,
2994        relation_desc: RelationDesc,
2995        persist_client: &PersistClient,
2996    ) -> WriteHandle<SourceData, (), T, StorageDiff> {
2997        let diagnostics = Diagnostics {
2998            shard_name: id.to_string(),
2999            handle_purpose: format!("controller data for {}", id),
3000        };
3001
3002        let mut write = persist_client
3003            .open_writer(
3004                shard,
3005                Arc::new(relation_desc),
3006                Arc::new(UnitSchema),
3007                diagnostics.clone(),
3008            )
3009            .await
3010            .expect("invalid persist usage");
3011
3012        // N.B.
3013        // Fetch the most recent upper for the write handle. Otherwise, this may be behind
3014        // the since of the since handle. Its vital this happens AFTER we create
3015        // the since handle as it needs to be linearized with that operation. It may be true
3016        // that creating the write handle after the since handle already ensures this, but we
3017        // do this out of an abundance of caution.
3018        //
3019        // Note that this returns the upper, but also sets it on the handle to be fetched later.
3020        write.fetch_recent_upper().await;
3021
3022        write
3023    }
3024
3025    /// Registers the given introspection collection and does any preparatory
3026    /// work that we have to do before we start writing to it. This
3027    /// preparatory work will include partial truncation or other cleanup
3028    /// schemes, depending on introspection type.
3029    fn register_introspection_collection(
3030        &mut self,
3031        id: GlobalId,
3032        introspection_type: IntrospectionType,
3033        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
3034        persist_client: PersistClient,
3035    ) -> Result<(), StorageError<T>> {
3036        tracing::info!(%id, ?introspection_type, "registering introspection collection");
3037
3038        // In read-only mode we create a new shard for all migrated storage collections. So we
3039        // "trick" the write task into thinking that it's not in read-only mode so something is
3040        // advancing this new shard.
3041        let force_writable = self.read_only && self.migrated_storage_collections.contains(&id);
3042        if force_writable {
3043            assert!(id.is_system(), "unexpected non-system global id: {id:?}");
3044            info!("writing to migrated storage collection {id} in read-only mode");
3045        }
3046
3047        let prev = self.introspection_ids.insert(introspection_type, id);
3048        assert!(
3049            prev.is_none(),
3050            "cannot have multiple IDs for introspection type"
3051        );
3052
3053        let metadata = self.storage_collections.collection_metadata(id)?.clone();
3054
3055        let read_handle_fn = move || {
3056            let persist_client = persist_client.clone();
3057            let metadata = metadata.clone();
3058
3059            let fut = async move {
3060                let read_handle = persist_client
3061                    .open_leased_reader::<SourceData, (), T, StorageDiff>(
3062                        metadata.data_shard,
3063                        Arc::new(metadata.relation_desc.clone()),
3064                        Arc::new(UnitSchema),
3065                        Diagnostics {
3066                            shard_name: id.to_string(),
3067                            handle_purpose: format!("snapshot {}", id),
3068                        },
3069                        USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3070                    )
3071                    .await
3072                    .expect("invalid persist usage");
3073                read_handle
3074            };
3075
3076            fut.boxed()
3077        };
3078
3079        let recent_upper = write_handle.shared_upper();
3080
3081        match CollectionManagerKind::from(&introspection_type) {
3082            // For these, we first register the collection and then prepare it,
3083            // because the code that prepares differential collection expects to
3084            // be able to update desired state via the collection manager
3085            // already.
3086            CollectionManagerKind::Differential => {
3087                // These do a shallow copy.
3088                let introspection_config = DifferentialIntrospectionConfig {
3089                    recent_upper,
3090                    introspection_type,
3091                    storage_collections: Arc::clone(&self.storage_collections),
3092                    collection_manager: self.collection_manager.clone(),
3093                    source_statistics: Arc::clone(&self.source_statistics),
3094                    sink_statistics: Arc::clone(&self.sink_statistics),
3095                    statistics_interval: self.config.parameters.statistics_interval.clone(),
3096                    statistics_interval_receiver: self.statistics_interval_sender.subscribe(),
3097                    metrics: self.metrics.clone(),
3098                    introspection_tokens: Arc::clone(&self.introspection_tokens),
3099                };
3100                self.collection_manager.register_differential_collection(
3101                    id,
3102                    write_handle,
3103                    read_handle_fn,
3104                    force_writable,
3105                    introspection_config,
3106                );
3107            }
3108            // For these, we first have to prepare and then register with
3109            // collection manager, because the preparation logic wants to read
3110            // the shard's contents and then do uncontested writes.
3111            //
3112            // TODO(aljoscha): We should make the truncation/cleanup work that
3113            // happens when we take over instead be a periodic thing, and make
3114            // it resilient to the upper moving concurrently.
3115            CollectionManagerKind::AppendOnly => {
3116                let introspection_config = AppendOnlyIntrospectionConfig {
3117                    introspection_type,
3118                    config_set: Arc::clone(self.config.config_set()),
3119                    parameters: self.config.parameters.clone(),
3120                    storage_collections: Arc::clone(&self.storage_collections),
3121                };
3122                self.collection_manager.register_append_only_collection(
3123                    id,
3124                    write_handle,
3125                    force_writable,
3126                    Some(introspection_config),
3127                );
3128            }
3129        }
3130
3131        Ok(())
3132    }
3133
3134    /// Remove statistics for sources/sinks that were dropped but still have statistics rows
3135    /// hanging around.
3136    fn reconcile_dangling_statistics(&self) {
3137        self.source_statistics
3138            .lock()
3139            .expect("poisoned")
3140            .source_statistics
3141            // collections should also contain subsources.
3142            .retain(|k, _| self.storage_collections.check_exists(*k).is_ok());
3143        self.sink_statistics
3144            .lock()
3145            .expect("poisoned")
3146            .retain(|k, _| self.export(*k).is_ok());
3147    }
3148
3149    /// Appends a new global ID, shard ID pair to the appropriate collection.
3150    /// Use a `diff` of 1 to append a new entry; -1 to retract an existing
3151    /// entry.
3152    ///
3153    /// # Panics
3154    /// - If `self.collections` does not have an entry for `global_id`.
3155    /// - If `IntrospectionType::ShardMapping`'s `GlobalId` is not registered as
3156    ///   a managed collection.
3157    /// - If diff is any value other than `1` or `-1`.
3158    #[instrument(level = "debug")]
3159    fn append_shard_mappings<I>(&self, global_ids: I, diff: Diff)
3160    where
3161        I: Iterator<Item = GlobalId>,
3162    {
3163        mz_ore::soft_assert_or_log!(
3164            diff == Diff::MINUS_ONE || diff == Diff::ONE,
3165            "use 1 for insert or -1 for delete"
3166        );
3167
3168        let id = *self
3169            .introspection_ids
3170            .get(&IntrospectionType::ShardMapping)
3171            .expect("should be registered before this call");
3172
3173        let mut updates = vec![];
3174        // Pack updates into rows
3175        let mut row_buf = Row::default();
3176
3177        for global_id in global_ids {
3178            let shard_id = if let Some(collection) = self.collections.get(&global_id) {
3179                collection.collection_metadata.data_shard.clone()
3180            } else {
3181                panic!("unknown global id: {}", global_id);
3182            };
3183
3184            let mut packer = row_buf.packer();
3185            packer.push(Datum::from(global_id.to_string().as_str()));
3186            packer.push(Datum::from(shard_id.to_string().as_str()));
3187            updates.push((row_buf.clone(), diff));
3188        }
3189
3190        self.collection_manager.differential_append(id, updates);
3191    }
3192
3193    /// Determines and returns this collection's dependencies, if any.
3194    fn determine_collection_dependencies(
3195        &self,
3196        self_id: GlobalId,
3197        data_source: &DataSource<T>,
3198    ) -> Result<Vec<GlobalId>, StorageError<T>> {
3199        let dependency = match &data_source {
3200            DataSource::Introspection(_)
3201            | DataSource::Webhook
3202            | DataSource::Table { primary: None }
3203            | DataSource::Progress
3204            | DataSource::Other => vec![],
3205            DataSource::Table {
3206                primary: Some(primary),
3207            } => vec![*primary],
3208            DataSource::IngestionExport { ingestion_id, .. } => {
3209                // Ingestion exports depend on their primary source's remap
3210                // collection.
3211                let source_collection = self.collection(*ingestion_id)?;
3212                let ingestion_remap_collection_id = match &source_collection.data_source {
3213                    DataSource::Ingestion(ingestion) => ingestion.remap_collection_id,
3214                    _ => unreachable!(
3215                        "SourceExport must only refer to primary sources that already exist"
3216                    ),
3217                };
3218
3219                // Ingestion exports (aka. subsources) must make sure that 1)
3220                // their own collection's since stays one step behind the upper,
3221                // and, 2) that the remap shard's since stays one step behind
3222                // their upper. Hence they track themselves and the remap shard
3223                // as dependencies.
3224                vec![self_id, ingestion_remap_collection_id]
3225            }
3226            // Ingestions depend on their remap collection.
3227            DataSource::Ingestion(ingestion) => {
3228                // Ingestions must make sure that 1) their own collection's
3229                // since stays one step behind the upper, and, 2) that the remap
3230                // shard's since stays one step behind their upper. Hence they
3231                // track themselves and the remap shard as dependencies.
3232                vec![self_id, ingestion.remap_collection_id]
3233            }
3234            DataSource::Sink { desc } => {
3235                // Sinks hold back their own frontier and the frontier of their input.
3236                vec![self_id, desc.sink.from]
3237            }
3238        };
3239
3240        Ok(dependency)
3241    }
3242
3243    async fn read_handle_for_snapshot(
3244        &self,
3245        id: GlobalId,
3246    ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
3247        let metadata = self.storage_collections.collection_metadata(id)?;
3248        read_handle_for_snapshot(&self.persist, id, &metadata).await
3249    }
3250
3251    /// Handles writing of status updates for sources/sinks to the appropriate
3252    /// status relation
3253    fn record_status_updates(&mut self, updates: Vec<StatusUpdate>) {
3254        if self.read_only {
3255            return;
3256        }
3257
3258        let mut sink_status_updates = vec![];
3259        let mut source_status_updates = vec![];
3260
3261        for update in updates {
3262            let id = update.id;
3263            if self.export(id).is_ok() {
3264                sink_status_updates.push(update);
3265            } else if self.storage_collections.check_exists(id).is_ok() {
3266                source_status_updates.push(update);
3267            }
3268        }
3269
3270        self.append_status_introspection_updates(
3271            IntrospectionType::SourceStatusHistory,
3272            source_status_updates,
3273        );
3274        self.append_status_introspection_updates(
3275            IntrospectionType::SinkStatusHistory,
3276            sink_status_updates,
3277        );
3278    }
3279
3280    fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, StorageError<T>> {
3281        self.collections
3282            .get(&id)
3283            .ok_or(StorageError::IdentifierMissing(id))
3284    }
3285
3286    /// Runs the identified ingestion using the current definition of the
3287    /// ingestion in-memory.
3288    fn run_ingestion(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3289        tracing::info!(%id, "starting ingestion");
3290
3291        let collection = self.collection(id)?;
3292        let ingestion_description = match &collection.data_source {
3293            DataSource::Ingestion(i) => i.clone(),
3294            _ => {
3295                tracing::warn!("run_ingestion called on non-ingestion ID {}", id);
3296                Err(StorageError::IdentifierInvalid(id))?
3297            }
3298        };
3299
3300        // Enrich all of the exports with their metadata
3301        let mut source_exports = BTreeMap::new();
3302        for (
3303            export_id,
3304            SourceExport {
3305                storage_metadata: (),
3306                details,
3307                data_config,
3308            },
3309        ) in ingestion_description.source_exports.clone()
3310        {
3311            let export_storage_metadata = self.collection(export_id)?.collection_metadata.clone();
3312            source_exports.insert(
3313                export_id,
3314                SourceExport {
3315                    storage_metadata: export_storage_metadata,
3316                    details,
3317                    data_config,
3318                },
3319            );
3320        }
3321
3322        let description = IngestionDescription::<CollectionMetadata> {
3323            source_exports,
3324            // The ingestion metadata is simply the collection metadata of the collection with
3325            // the associated ingestion
3326            ingestion_metadata: collection.collection_metadata.clone(),
3327            // The rest of the fields are identical
3328            desc: ingestion_description.desc.clone(),
3329            instance_id: ingestion_description.instance_id,
3330            remap_collection_id: ingestion_description.remap_collection_id,
3331        };
3332
3333        let storage_instance_id = description.instance_id;
3334        // Fetch the client for this ingestion's instance.
3335        let instance = self
3336            .instances
3337            .get_mut(&storage_instance_id)
3338            .ok_or_else(|| StorageError::IngestionInstanceMissing {
3339                storage_instance_id,
3340                ingestion_id: id,
3341            })?;
3342
3343        let augmented_ingestion = Box::new(RunIngestionCommand { id, description });
3344        instance.send(StorageCommand::RunIngestion(augmented_ingestion));
3345
3346        Ok(())
3347    }
3348
3349    /// Runs the identified export using the current definition of the export
3350    /// that we have in memory.
3351    fn run_export(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3352        let DataSource::Sink { desc: description } = &self.collections[&id].data_source else {
3353            return Err(StorageError::IdentifierMissing(id));
3354        };
3355
3356        let from_storage_metadata = self
3357            .storage_collections
3358            .collection_metadata(description.sink.from)?;
3359        let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
3360
3361        // Choose an as-of frontier for this execution of the sink. If the write frontier of the sink
3362        // is strictly larger than its read hold, it must have at least written out its snapshot, and we can skip
3363        // reading it; otherwise assume we may have to replay from the beginning.
3364        let enable_snapshot_frontier =
3365            dyncfgs::STORAGE_SINK_SNAPSHOT_FRONTIER.get(self.config().config_set());
3366        let export_state = self.storage_collections.collection_frontiers(id)?;
3367        let mut as_of = description.sink.as_of.clone();
3368        as_of.join_assign(&export_state.implied_capability);
3369        let with_snapshot = if enable_snapshot_frontier
3370            && PartialOrder::less_than(&as_of, &export_state.write_frontier)
3371        {
3372            false
3373        } else {
3374            description.sink.with_snapshot
3375        };
3376
3377        info!(
3378            sink_id = %id,
3379            from_id = %description.sink.from,
3380            write_frontier = ?export_state.write_frontier,
3381            ?as_of,
3382            ?with_snapshot,
3383            "run_export"
3384        );
3385
3386        let cmd = RunSinkCommand {
3387            id,
3388            description: StorageSinkDesc {
3389                from: description.sink.from,
3390                from_desc: description.sink.from_desc.clone(),
3391                connection: description.sink.connection.clone(),
3392                envelope: description.sink.envelope,
3393                as_of,
3394                version: description.sink.version,
3395                from_storage_metadata,
3396                with_snapshot,
3397                to_storage_metadata,
3398            },
3399        };
3400
3401        let storage_instance_id = description.instance_id.clone();
3402
3403        let instance = self
3404            .instances
3405            .get_mut(&storage_instance_id)
3406            .ok_or_else(|| StorageError::ExportInstanceMissing {
3407                storage_instance_id,
3408                export_id: id,
3409            })?;
3410
3411        instance.send(StorageCommand::RunSink(Box::new(cmd)));
3412
3413        Ok(())
3414    }
3415
3416    /// Update introspection with the current frontiers of storage objects.
3417    ///
3418    /// This method is invoked by `Controller::maintain`, which we expect to be called once per
3419    /// second during normal operation.
3420    fn update_frontier_introspection(&mut self) {
3421        let mut global_frontiers = BTreeMap::new();
3422        let mut replica_frontiers = BTreeMap::new();
3423
3424        for collection_frontiers in self.storage_collections.active_collection_frontiers() {
3425            let id = collection_frontiers.id;
3426            let since = collection_frontiers.read_capabilities;
3427            let upper = collection_frontiers.write_frontier;
3428
3429            let instance = self
3430                .collections
3431                .get(&id)
3432                .and_then(|collection_state| match &collection_state.extra_state {
3433                    CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
3434                    CollectionStateExtra::Export(export) => Some(export.cluster_id()),
3435                    CollectionStateExtra::None => None,
3436                })
3437                .and_then(|i| self.instances.get(&i));
3438
3439            if let Some(instance) = instance {
3440                for replica_id in instance.replica_ids() {
3441                    replica_frontiers.insert((id, replica_id), upper.clone());
3442                }
3443            }
3444
3445            global_frontiers.insert(id, (since, upper));
3446        }
3447
3448        let mut global_updates = Vec::new();
3449        let mut replica_updates = Vec::new();
3450
3451        let mut push_global_update =
3452            |id: GlobalId, (since, upper): (Antichain<T>, Antichain<T>), diff: Diff| {
3453                let read_frontier = since.into_option().map_or(Datum::Null, |t| t.into());
3454                let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3455                let row = Row::pack_slice(&[
3456                    Datum::String(&id.to_string()),
3457                    read_frontier,
3458                    write_frontier,
3459                ]);
3460                global_updates.push((row, diff));
3461            };
3462
3463        let mut push_replica_update =
3464            |(id, replica_id): (GlobalId, ReplicaId), upper: Antichain<T>, diff: Diff| {
3465                let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3466                let row = Row::pack_slice(&[
3467                    Datum::String(&id.to_string()),
3468                    Datum::String(&replica_id.to_string()),
3469                    write_frontier,
3470                ]);
3471                replica_updates.push((row, diff));
3472            };
3473
3474        let mut old_global_frontiers =
3475            std::mem::replace(&mut self.recorded_frontiers, global_frontiers);
3476        for (&id, new) in &self.recorded_frontiers {
3477            match old_global_frontiers.remove(&id) {
3478                Some(old) if &old != new => {
3479                    push_global_update(id, new.clone(), Diff::ONE);
3480                    push_global_update(id, old, Diff::MINUS_ONE);
3481                }
3482                Some(_) => (),
3483                None => push_global_update(id, new.clone(), Diff::ONE),
3484            }
3485        }
3486        for (id, old) in old_global_frontiers {
3487            push_global_update(id, old, Diff::MINUS_ONE);
3488        }
3489
3490        let mut old_replica_frontiers =
3491            std::mem::replace(&mut self.recorded_replica_frontiers, replica_frontiers);
3492        for (&key, new) in &self.recorded_replica_frontiers {
3493            match old_replica_frontiers.remove(&key) {
3494                Some(old) if &old != new => {
3495                    push_replica_update(key, new.clone(), Diff::ONE);
3496                    push_replica_update(key, old, Diff::MINUS_ONE);
3497                }
3498                Some(_) => (),
3499                None => push_replica_update(key, new.clone(), Diff::ONE),
3500            }
3501        }
3502        for (key, old) in old_replica_frontiers {
3503            push_replica_update(key, old, Diff::MINUS_ONE);
3504        }
3505
3506        let id = self.introspection_ids[&IntrospectionType::Frontiers];
3507        self.collection_manager
3508            .differential_append(id, global_updates);
3509
3510        let id = self.introspection_ids[&IntrospectionType::ReplicaFrontiers];
3511        self.collection_manager
3512            .differential_append(id, replica_updates);
3513    }
3514
3515    /// Refresh the wallclock lag introspection and metrics with the current lag values.
3516    ///
3517    /// This method produces wallclock lag metrics of two different shapes:
3518    ///
3519    /// * Histories: For each replica and each collection, we measure the lag of the write frontier
3520    ///   behind the wallclock time every second. Every minute we emit the maximum lag observed
3521    ///   over the last minute, together with the current time.
3522    /// * Histograms: For each collection, we measure the lag of the write frontier behind
3523    ///   wallclock time every second. Every minute we emit all lags observed over the last minute,
3524    ///   together with the current histogram period.
3525    ///
3526    /// Histories are emitted to both Mz introspection and Prometheus, histograms only to
3527    /// introspection. We treat lags of unreadable collections (i.e. collections that contain no
3528    /// readable times) as undefined and set them to NULL in introspection and `u64::MAX` in
3529    /// Prometheus.
3530    ///
3531    /// This method is invoked by `Controller::maintain`, which we expect to be called once per
3532    /// second during normal operation.
3533    fn refresh_wallclock_lag(&mut self) {
3534        let now_ms = (self.now)();
3535        let histogram_period =
3536            WallclockLagHistogramPeriod::from_epoch_millis(now_ms, self.config.config_set());
3537
3538        let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
3539            Some(ts) => (self.wallclock_lag)(ts.clone()),
3540            None => Duration::ZERO,
3541        };
3542
3543        for frontiers in self.storage_collections.active_collection_frontiers() {
3544            let id = frontiers.id;
3545            let Some(collection) = self.collections.get_mut(&id) else {
3546                continue;
3547            };
3548
3549            let collection_unreadable =
3550                PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
3551            let lag = if collection_unreadable {
3552                WallclockLag::Undefined
3553            } else {
3554                let lag = frontier_lag(&frontiers.write_frontier);
3555                WallclockLag::Seconds(lag.as_secs())
3556            };
3557
3558            collection.wallclock_lag_max = collection.wallclock_lag_max.max(lag);
3559
3560            // No way to specify values as undefined in Prometheus metrics, so we use the
3561            // maximum value instead.
3562            let secs = lag.unwrap_seconds_or(u64::MAX);
3563            collection.wallclock_lag_metrics.observe(secs);
3564
3565            if !ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION.get(self.config.config_set()) {
3566                continue;
3567            }
3568
3569            if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
3570                let bucket = lag.map_seconds(|secs| secs.next_power_of_two());
3571
3572                let instance_id = match &collection.extra_state {
3573                    CollectionStateExtra::Ingestion(i) => Some(i.instance_id),
3574                    CollectionStateExtra::Export(e) => Some(e.cluster_id()),
3575                    CollectionStateExtra::None => None,
3576                };
3577                let workload_class = instance_id
3578                    .and_then(|id| self.instances.get(&id))
3579                    .and_then(|i| i.workload_class.clone());
3580                let labels = match workload_class {
3581                    Some(wc) => [("workload_class", wc.clone())].into(),
3582                    None => BTreeMap::new(),
3583                };
3584
3585                let key = (histogram_period, bucket, labels);
3586                *stash.entry(key).or_default() += Diff::ONE;
3587            }
3588        }
3589
3590        // Record lags to persist, if it's time.
3591        self.maybe_record_wallclock_lag();
3592    }
3593
3594    /// Produce new wallclock lag introspection updates, provided enough time has passed since the
3595    /// last recording.
3596    ///
3597    /// We emit new introspection updates if the system time has passed into a new multiple of the
3598    /// recording interval (typically 1 minute) since the last refresh. The compute controller uses
3599    /// the same approach, ensuring that both controllers commit their lags at roughly the same
3600    /// time, avoiding confusion caused by inconsistencies.
3601    fn maybe_record_wallclock_lag(&mut self) {
3602        if self.read_only {
3603            return;
3604        }
3605
3606        let duration_trunc = |datetime: DateTime<_>, interval| {
3607            let td = TimeDelta::from_std(interval).ok()?;
3608            datetime.duration_trunc(td).ok()
3609        };
3610
3611        let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(self.config.config_set());
3612        let now_dt = mz_ore::now::to_datetime((self.now)());
3613        let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
3614            soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
3615            let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
3616            duration_trunc(now_dt, *default).unwrap()
3617        });
3618        if now_trunc <= self.wallclock_lag_last_recorded {
3619            return;
3620        }
3621
3622        let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
3623
3624        let mut history_updates = Vec::new();
3625        let mut histogram_updates = Vec::new();
3626        let mut row_buf = Row::default();
3627        for frontiers in self.storage_collections.active_collection_frontiers() {
3628            let id = frontiers.id;
3629            let Some(collection) = self.collections.get_mut(&id) else {
3630                continue;
3631            };
3632
3633            let max_lag = std::mem::replace(&mut collection.wallclock_lag_max, WallclockLag::MIN);
3634            let row = Row::pack_slice(&[
3635                Datum::String(&id.to_string()),
3636                Datum::Null,
3637                max_lag.into_interval_datum(),
3638                Datum::TimestampTz(now_ts),
3639            ]);
3640            history_updates.push((row, Diff::ONE));
3641
3642            let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
3643                continue;
3644            };
3645
3646            for ((period, lag, labels), count) in std::mem::take(stash) {
3647                let mut packer = row_buf.packer();
3648                packer.extend([
3649                    Datum::TimestampTz(period.start),
3650                    Datum::TimestampTz(period.end),
3651                    Datum::String(&id.to_string()),
3652                    lag.into_uint64_datum(),
3653                ]);
3654                let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
3655                packer.push_dict(labels);
3656
3657                histogram_updates.push((row_buf.clone(), count));
3658            }
3659        }
3660
3661        if !history_updates.is_empty() {
3662            self.append_introspection_updates(
3663                IntrospectionType::WallclockLagHistory,
3664                history_updates,
3665            );
3666        }
3667        if !histogram_updates.is_empty() {
3668            self.append_introspection_updates(
3669                IntrospectionType::WallclockLagHistogram,
3670                histogram_updates,
3671            );
3672        }
3673
3674        self.wallclock_lag_last_recorded = now_trunc;
3675    }
3676
3677    /// Run periodic tasks.
3678    ///
3679    /// This method is invoked roughly once per second during normal operation. It is a good place
3680    /// for tasks that need to run periodically, such as state cleanup or updating of metrics.
3681    fn maintain(&mut self) {
3682        self.update_frontier_introspection();
3683        self.refresh_wallclock_lag();
3684
3685        // Perform instance maintenance work.
3686        for instance in self.instances.values_mut() {
3687            instance.refresh_state_metrics();
3688        }
3689    }
3690}
3691
3692impl From<&IntrospectionType> for CollectionManagerKind {
3693    fn from(value: &IntrospectionType) -> Self {
3694        match value {
3695            IntrospectionType::ShardMapping
3696            | IntrospectionType::Frontiers
3697            | IntrospectionType::ReplicaFrontiers
3698            | IntrospectionType::StorageSourceStatistics
3699            | IntrospectionType::StorageSinkStatistics
3700            | IntrospectionType::ComputeDependencies
3701            | IntrospectionType::ComputeOperatorHydrationStatus
3702            | IntrospectionType::ComputeMaterializedViewRefreshes
3703            | IntrospectionType::ComputeErrorCounts
3704            | IntrospectionType::ComputeHydrationTimes => CollectionManagerKind::Differential,
3705
3706            IntrospectionType::SourceStatusHistory
3707            | IntrospectionType::SinkStatusHistory
3708            | IntrospectionType::PrivatelinkConnectionStatusHistory
3709            | IntrospectionType::ReplicaStatusHistory
3710            | IntrospectionType::ReplicaMetricsHistory
3711            | IntrospectionType::WallclockLagHistory
3712            | IntrospectionType::WallclockLagHistogram
3713            | IntrospectionType::PreparedStatementHistory
3714            | IntrospectionType::StatementExecutionHistory
3715            | IntrospectionType::SessionHistory
3716            | IntrospectionType::StatementLifecycleHistory
3717            | IntrospectionType::SqlText => CollectionManagerKind::AppendOnly,
3718        }
3719    }
3720}
3721
3722/// Get the current rows in the given statistics table. This is used to bootstrap
3723/// the statistics tasks.
3724///
3725// TODO(guswynn): we need to be more careful about the update time we get here:
3726// <https://github.com/MaterializeInc/database-issues/issues/7564>
3727async fn snapshot_statistics<T>(
3728    id: GlobalId,
3729    upper: Antichain<T>,
3730    storage_collections: &Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
3731) -> Vec<Row>
3732where
3733    T: Codec64 + From<EpochMillis> + TimestampManipulation,
3734{
3735    match upper.as_option() {
3736        Some(f) if f > &T::minimum() => {
3737            let as_of = f.step_back().unwrap();
3738
3739            let snapshot = storage_collections.snapshot(id, as_of).await.unwrap();
3740            snapshot
3741                .into_iter()
3742                .map(|(row, diff)| {
3743                    assert_eq!(diff, 1);
3744                    row
3745                })
3746                .collect()
3747        }
3748        // If collection is closed or the frontier is the minimum, we cannot
3749        // or don't need to truncate (respectively).
3750        _ => Vec::new(),
3751    }
3752}
3753
3754async fn read_handle_for_snapshot<T>(
3755    persist: &PersistClientCache,
3756    id: GlobalId,
3757    metadata: &CollectionMetadata,
3758) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>>
3759where
3760    T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
3761{
3762    let persist_client = persist
3763        .open(metadata.persist_location.clone())
3764        .await
3765        .unwrap();
3766
3767    // We create a new read handle every time someone requests a snapshot and then immediately
3768    // expire it instead of keeping a read handle permanently in our state to avoid having it
3769    // heartbeat continously. The assumption is that calls to snapshot are rare and therefore
3770    // worth it to always create a new handle.
3771    let read_handle = persist_client
3772        .open_leased_reader::<SourceData, (), _, _>(
3773            metadata.data_shard,
3774            Arc::new(metadata.relation_desc.clone()),
3775            Arc::new(UnitSchema),
3776            Diagnostics {
3777                shard_name: id.to_string(),
3778                handle_purpose: format!("snapshot {}", id),
3779            },
3780            USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3781        )
3782        .await
3783        .expect("invalid persist usage");
3784    Ok(read_handle)
3785}
3786
3787/// State maintained about individual collections.
3788#[derive(Debug)]
3789struct CollectionState<T: TimelyTimestamp> {
3790    /// The source of this collection's data.
3791    pub data_source: DataSource<T>,
3792
3793    pub collection_metadata: CollectionMetadata,
3794
3795    pub extra_state: CollectionStateExtra<T>,
3796
3797    /// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update.
3798    wallclock_lag_max: WallclockLag,
3799    /// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
3800    /// introspection update.
3801    ///
3802    /// Keys are `(period, lag, labels)` triples, values are counts.
3803    ///
3804    /// If this is `None`, wallclock lag is not tracked for this collection.
3805    wallclock_lag_histogram_stash: Option<
3806        BTreeMap<
3807            (
3808                WallclockLagHistogramPeriod,
3809                WallclockLag,
3810                BTreeMap<&'static str, String>,
3811            ),
3812            Diff,
3813        >,
3814    >,
3815    /// Frontier wallclock lag metrics tracked for this collection.
3816    wallclock_lag_metrics: WallclockLagMetrics,
3817}
3818
3819impl<T: TimelyTimestamp> CollectionState<T> {
3820    fn new(
3821        data_source: DataSource<T>,
3822        collection_metadata: CollectionMetadata,
3823        extra_state: CollectionStateExtra<T>,
3824        wallclock_lag_metrics: WallclockLagMetrics,
3825    ) -> Self {
3826        // Only collect wallclock lag histogram data for collections written by storage, to avoid
3827        // duplicate measurements. Collections written by other components (e.g. compute) have
3828        // their wallclock lags recorded by these components.
3829        let wallclock_lag_histogram_stash = match &data_source {
3830            DataSource::Other => None,
3831            _ => Some(Default::default()),
3832        };
3833
3834        Self {
3835            data_source,
3836            collection_metadata,
3837            extra_state,
3838            wallclock_lag_max: WallclockLag::MIN,
3839            wallclock_lag_histogram_stash,
3840            wallclock_lag_metrics,
3841        }
3842    }
3843}
3844
3845/// Additional state that the controller maintains for select collection types.
3846#[derive(Debug)]
3847enum CollectionStateExtra<T: TimelyTimestamp> {
3848    Ingestion(IngestionState<T>),
3849    Export(ExportState<T>),
3850    None,
3851}
3852
3853/// State maintained about ingestions and ingestion exports
3854#[derive(Debug)]
3855struct IngestionState<T: TimelyTimestamp> {
3856    /// Really only for keeping track of changes to the `derived_since`.
3857    pub read_capabilities: MutableAntichain<T>,
3858
3859    /// The current since frontier, derived from `write_frontier` using
3860    /// `hold_policy`.
3861    pub derived_since: Antichain<T>,
3862
3863    /// Holds that this ingestion (or ingestion export) has on its dependencies.
3864    pub dependency_read_holds: Vec<ReadHold<T>>,
3865
3866    /// Reported write frontier.
3867    pub write_frontier: Antichain<T>,
3868
3869    /// The policy that drives how we downgrade our read hold. That is how we
3870    /// derive our since from our upper.
3871    ///
3872    /// This is a _storage-controller-internal_ policy used to derive its
3873    /// personal read hold on the collection. It should not be confused with any
3874    /// read policies that the adapter might install at [StorageCollections].
3875    pub hold_policy: ReadPolicy<T>,
3876
3877    /// The ID of the instance in which the ingestion is running.
3878    pub instance_id: StorageInstanceId,
3879
3880    /// Set of replica IDs on which this ingestion is hydrated.
3881    pub hydrated_on: BTreeSet<ReplicaId>,
3882}
3883
3884/// A description of a status history collection.
3885///
3886/// Used to inform partial truncation, see
3887/// [`collection_mgmt::partially_truncate_status_history`].
3888struct StatusHistoryDesc<K> {
3889    retention_policy: StatusHistoryRetentionPolicy,
3890    extract_key: Box<dyn Fn(&[Datum]) -> K + Send>,
3891    extract_time: Box<dyn Fn(&[Datum]) -> CheckedTimestamp<DateTime<Utc>> + Send>,
3892}
3893enum StatusHistoryRetentionPolicy {
3894    // Truncates everything but the last N updates for each key.
3895    LastN(usize),
3896    // Truncates everything past the time window for each key.
3897    TimeWindow(Duration),
3898}
3899
3900fn source_status_history_desc(
3901    params: &StorageParameters,
3902) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3903    let desc = &MZ_SOURCE_STATUS_HISTORY_DESC;
3904    let (source_id_idx, _) = desc.get_by_name(&"source_id".into()).expect("exists");
3905    let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3906    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3907
3908    StatusHistoryDesc {
3909        retention_policy: StatusHistoryRetentionPolicy::LastN(
3910            params.keep_n_source_status_history_entries,
3911        ),
3912        extract_key: Box::new(move |datums| {
3913            (
3914                GlobalId::from_str(datums[source_id_idx].unwrap_str()).expect("GlobalId column"),
3915                if datums[replica_id_idx].is_null() {
3916                    None
3917                } else {
3918                    Some(
3919                        ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3920                            .expect("ReplicaId column"),
3921                    )
3922                },
3923            )
3924        }),
3925        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3926    }
3927}
3928
3929fn sink_status_history_desc(
3930    params: &StorageParameters,
3931) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3932    let desc = &MZ_SINK_STATUS_HISTORY_DESC;
3933    let (sink_id_idx, _) = desc.get_by_name(&"sink_id".into()).expect("exists");
3934    let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3935    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3936
3937    StatusHistoryDesc {
3938        retention_policy: StatusHistoryRetentionPolicy::LastN(
3939            params.keep_n_sink_status_history_entries,
3940        ),
3941        extract_key: Box::new(move |datums| {
3942            (
3943                GlobalId::from_str(datums[sink_id_idx].unwrap_str()).expect("GlobalId column"),
3944                if datums[replica_id_idx].is_null() {
3945                    None
3946                } else {
3947                    Some(
3948                        ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3949                            .expect("ReplicaId column"),
3950                    )
3951                },
3952            )
3953        }),
3954        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3955    }
3956}
3957
3958fn privatelink_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<GlobalId> {
3959    let desc = &MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC;
3960    let (key_idx, _) = desc.get_by_name(&"connection_id".into()).expect("exists");
3961    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3962
3963    StatusHistoryDesc {
3964        retention_policy: StatusHistoryRetentionPolicy::LastN(
3965            params.keep_n_privatelink_status_history_entries,
3966        ),
3967        extract_key: Box::new(move |datums| {
3968            GlobalId::from_str(datums[key_idx].unwrap_str()).expect("GlobalId column")
3969        }),
3970        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3971    }
3972}
3973
3974fn replica_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<(GlobalId, u64)> {
3975    let desc = &REPLICA_STATUS_HISTORY_DESC;
3976    let (replica_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3977    let (process_idx, _) = desc.get_by_name(&"process_id".into()).expect("exists");
3978    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3979
3980    StatusHistoryDesc {
3981        retention_policy: StatusHistoryRetentionPolicy::TimeWindow(
3982            params.replica_status_history_retention_window,
3983        ),
3984        extract_key: Box::new(move |datums| {
3985            (
3986                GlobalId::from_str(datums[replica_idx].unwrap_str()).expect("GlobalId column"),
3987                datums[process_idx].unwrap_uint64(),
3988            )
3989        }),
3990        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3991    }
3992}
3993
3994/// Replace one antichain with another, tracking the overall changes in the returned `ChangeBatch`.
3995fn swap_updates<T: Timestamp>(
3996    from: &mut Antichain<T>,
3997    mut replace_with: Antichain<T>,
3998) -> ChangeBatch<T> {
3999    let mut update = ChangeBatch::new();
4000    if PartialOrder::less_equal(from, &replace_with) {
4001        update.extend(replace_with.iter().map(|time| (time.clone(), 1)));
4002        std::mem::swap(from, &mut replace_with);
4003        update.extend(replace_with.iter().map(|time| (time.clone(), -1)));
4004    }
4005    update
4006}