Skip to main content

mz_storage_controller/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of the storage controller trait.
11
12use std::any::Any;
13use std::collections::btree_map;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt::{Debug, Display};
16use std::str::FromStr;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use crate::collection_mgmt::{
21    AppendOnlyIntrospectionConfig, CollectionManagerKind, DifferentialIntrospectionConfig,
22};
23use crate::instance::{Instance, ReplicaConfig};
24use async_trait::async_trait;
25use chrono::{DateTime, DurationRound, TimeDelta, Utc};
26use derivative::Derivative;
27use differential_dataflow::lattice::Lattice;
28use futures::FutureExt;
29use futures::StreamExt;
30use itertools::Itertools;
31use mz_build_info::BuildInfo;
32use mz_cluster_client::client::ClusterReplicaLocation;
33use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
34use mz_cluster_client::{ReplicaId, WallclockLagFn};
35use mz_controller_types::dyncfgs::{
36    ENABLE_0DT_DEPLOYMENT_SOURCES, WALLCLOCK_LAG_RECORDING_INTERVAL,
37};
38use mz_ore::collections::CollectionExt;
39use mz_ore::metrics::MetricsRegistry;
40use mz_ore::now::{EpochMillis, NowFn};
41use mz_ore::task::AbortOnDropHandle;
42use mz_ore::{assert_none, halt, instrument, soft_panic_or_log};
43use mz_persist_client::batch::ProtoBatch;
44use mz_persist_client::cache::PersistClientCache;
45use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
46use mz_persist_client::critical::Opaque;
47use mz_persist_client::read::ReadHandle;
48use mz_persist_client::schema::CaESchema;
49use mz_persist_client::write::WriteHandle;
50use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
51use mz_persist_types::Codec64;
52use mz_persist_types::codec_impls::UnitSchema;
53use mz_repr::adt::timestamp::CheckedTimestamp;
54use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
55use mz_storage_client::client::{
56    AppendOnlyUpdate, RunIngestionCommand, RunOneshotIngestion, RunSinkCommand, Status,
57    StatusUpdate, StorageCommand, StorageResponse, TableData,
58};
59use mz_storage_client::controller::{
60    BoxFuture, CollectionDescription, DataSource, ExportDescription, ExportState,
61    IntrospectionType, MonotonicAppender, PersistEpoch, Response, StorageController,
62    StorageMetadata, StorageTxn, StorageWriteOp, WallclockLag, WallclockLagHistogramPeriod,
63};
64use mz_storage_client::healthcheck::{
65    MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC,
66    MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_STATUS_HISTORY_DESC,
67};
68use mz_storage_client::metrics::StorageControllerMetrics;
69use mz_storage_client::statistics::{
70    ControllerSinkStatistics, ControllerSourceStatistics, WebhookStatistics,
71};
72use mz_storage_client::storage_collections::StorageCollections;
73use mz_storage_types::configuration::StorageConfiguration;
74use mz_storage_types::connections::ConnectionContext;
75use mz_storage_types::connections::inline::InlinedConnection;
76use mz_storage_types::controller::{AlterError, CollectionMetadata, StorageError, TxnsCodecRow};
77use mz_storage_types::errors::CollectionMissing;
78use mz_storage_types::instances::StorageInstanceId;
79use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
80use mz_storage_types::parameters::StorageParameters;
81use mz_storage_types::read_holds::ReadHold;
82use mz_storage_types::read_policy::ReadPolicy;
83use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
84use mz_storage_types::sources::{
85    GenericSourceConnection, IngestionDescription, SourceConnection, SourceData, SourceDesc,
86    SourceExport, SourceExportDataConfig,
87};
88use mz_storage_types::{AlterCompatible, StorageDiff, dyncfgs};
89use mz_txn_wal::metrics::Metrics as TxnMetrics;
90use mz_txn_wal::txn_read::TxnsRead;
91use mz_txn_wal::txns::TxnsHandle;
92use timely::order::{PartialOrder, TotalOrder};
93use timely::progress::Timestamp as TimelyTimestamp;
94use timely::progress::frontier::MutableAntichain;
95use timely::progress::{Antichain, ChangeBatch, Timestamp};
96use tokio::sync::watch::{Sender, channel};
97use tokio::sync::{mpsc, oneshot};
98use tokio::time::MissedTickBehavior;
99use tokio::time::error::Elapsed;
100use tracing::{debug, info, warn};
101
102mod collection_mgmt;
103mod history;
104mod instance;
105mod persist_handles;
106mod rtr;
107mod statistics;
108
109#[derive(Derivative)]
110#[derivative(Debug)]
111struct PendingOneshotIngestion {
112    /// Callback used to provide results of the ingestion.
113    #[derivative(Debug = "ignore")]
114    result_tx: OneshotResultCallback<ProtoBatch>,
115    /// Cluster currently running this ingestion
116    cluster_id: StorageInstanceId,
117}
118
119impl PendingOneshotIngestion {
120    /// Consume the pending ingestion, responding with a cancelation message.
121    ///
122    /// TODO(cf2): Refine these error messages so they're not stringly typed.
123    pub(crate) fn cancel(self) {
124        (self.result_tx)(vec![Err("canceled".to_string())])
125    }
126}
127
128/// A storage controller for a storage instance.
129#[derive(Derivative)]
130#[derivative(Debug)]
131pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation>
132{
133    /// The build information for this process.
134    build_info: &'static BuildInfo,
135    /// A function that returns the current time.
136    now: NowFn,
137
138    /// Whether or not this controller is in read-only mode.
139    ///
140    /// When in read-only mode, neither this controller nor the instances
141    /// controlled by it are allowed to affect changes to external systems
142    /// (largely persist).
143    read_only: bool,
144
145    /// Collections maintained by the storage controller.
146    ///
147    /// This collection only grows, although individual collections may be rendered unusable.
148    /// This is to prevent the re-binding of identifiers to other descriptions.
149    pub(crate) collections: BTreeMap<GlobalId, CollectionState<T>>,
150
151    /// Map from IDs of objects that have been dropped to replicas we are still
152    /// expecting DroppedId messages from. This is cleared out once all replicas
153    /// have responded.
154    ///
155    /// We use this only to catch problems in the protocol between controller
156    /// and replicas, for example we can differentiate between late messages for
157    /// objects that have already been dropped and unexpected (read erroneous)
158    /// messages from the replica.
159    dropped_objects: BTreeMap<GlobalId, BTreeSet<ReplicaId>>,
160
161    /// Write handle for table shards.
162    pub(crate) persist_table_worker: persist_handles::PersistTableWriteWorker<T>,
163    /// A shared TxnsCache running in a task and communicated with over a channel.
164    txns_read: TxnsRead<T>,
165    txns_metrics: Arc<TxnMetrics>,
166    stashed_responses: Vec<(Option<ReplicaId>, StorageResponse<T>)>,
167    /// Channel for sending table handle drops.
168    #[derivative(Debug = "ignore")]
169    pending_table_handle_drops_tx: mpsc::UnboundedSender<GlobalId>,
170    /// Channel for receiving table handle drops.
171    #[derivative(Debug = "ignore")]
172    pending_table_handle_drops_rx: mpsc::UnboundedReceiver<GlobalId>,
173    /// Closures that can be used to send responses from oneshot ingestions.
174    #[derivative(Debug = "ignore")]
175    pending_oneshot_ingestions: BTreeMap<uuid::Uuid, PendingOneshotIngestion>,
176
177    /// Interface for managed collections
178    pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
179
180    /// Tracks which collection is responsible for which [`IntrospectionType`].
181    pub(crate) introspection_ids: BTreeMap<IntrospectionType, GlobalId>,
182    /// Tokens for tasks that drive updating introspection collections. Dropping
183    /// this will make sure that any tasks (or other resources) will stop when
184    /// needed.
185    // TODO(aljoscha): Should these live somewhere else?
186    introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
187
188    // The following two fields must always be locked in order.
189    /// Consolidated metrics updates to periodically write. We do not eagerly initialize this,
190    /// and its contents are entirely driven by `StorageResponse::StatisticsUpdates`'s, as well
191    /// as webhook statistics.
192    source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
193    /// Consolidated metrics updates to periodically write. We do not eagerly initialize this,
194    /// and its contents are entirely driven by `StorageResponse::StatisticsUpdates`'s.
195    sink_statistics: Arc<Mutex<BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSinkStatistics>>>,
196    /// A way to update the statistics interval in the statistics tasks.
197    statistics_interval_sender: Sender<Duration>,
198
199    /// Clients for all known storage instances.
200    instances: BTreeMap<StorageInstanceId, Instance<T>>,
201    /// Set to `true` once `initialization_complete` has been called.
202    initialized: bool,
203    /// Storage configuration to apply to newly provisioned instances, and use during purification.
204    config: StorageConfiguration,
205    /// The persist location where all storage collections are being written to
206    persist_location: PersistLocation,
207    /// A persist client used to write to storage collections
208    persist: Arc<PersistClientCache>,
209    /// Metrics of the Storage controller
210    metrics: StorageControllerMetrics,
211    /// `(read, write)` frontiers that have been recorded in the `Frontiers` collection, kept to be
212    /// able to retract old rows.
213    recorded_frontiers: BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)>,
214    /// Write frontiers that have been recorded in the `ReplicaFrontiers` collection, kept to be
215    /// able to retract old rows.
216    recorded_replica_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<T>>,
217
218    /// A function that computes the lag between the given time and wallclock time.
219    #[derivative(Debug = "ignore")]
220    wallclock_lag: WallclockLagFn<T>,
221    /// The last time wallclock lag introspection was recorded.
222    wallclock_lag_last_recorded: DateTime<Utc>,
223
224    /// Handle to a [StorageCollections].
225    storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
226    /// Migrated storage collections that can be written even in read only mode.
227    migrated_storage_collections: BTreeSet<GlobalId>,
228
229    /// Ticker for scheduling periodic maintenance work.
230    maintenance_ticker: tokio::time::Interval,
231    /// Whether maintenance work was scheduled.
232    maintenance_scheduled: bool,
233
234    /// Shared transmit channel for replicas to send responses.
235    instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
236    /// Receive end for replica responses.
237    instance_response_rx: mpsc::UnboundedReceiver<(Option<ReplicaId>, StorageResponse<T>)>,
238
239    /// Background task run at startup to warm persist state.
240    persist_warm_task: Option<AbortOnDropHandle<Box<dyn Debug + Send>>>,
241}
242
243/// Warm up persist state for `shard_ids` in a background task.
244///
245/// With better parallelism during startup this would likely be unnecessary, but empirically we see
246/// some nice speedups with this relatively simple function.
247fn warm_persist_state_in_background(
248    client: PersistClient,
249    shard_ids: impl Iterator<Item = ShardId> + Send + 'static,
250) -> mz_ore::task::JoinHandle<Box<dyn Debug + Send>> {
251    /// Bound the number of shards that we warm at a single time, to limit our overall resource use.
252    const MAX_CONCURRENT_WARMS: usize = 16;
253    let logic = async move {
254        let fetchers: Vec<_> = tokio_stream::iter(shard_ids)
255            .map(|shard_id| {
256                let client = client.clone();
257                async move {
258                    client
259                        .create_batch_fetcher::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
260                            shard_id,
261                            Arc::new(RelationDesc::empty()),
262                            Arc::new(UnitSchema),
263                            true,
264                            Diagnostics::from_purpose("warm persist load state"),
265                        )
266                        .await
267                }
268            })
269            .buffer_unordered(MAX_CONCURRENT_WARMS)
270            .collect()
271            .await;
272        let fetchers: Box<dyn Debug + Send> = Box::new(fetchers);
273        fetchers
274    };
275    mz_ore::task::spawn(|| "warm_persist_load_state", logic)
276}
277
278#[async_trait(?Send)]
279impl<T> StorageController for Controller<T>
280where
281    T: Timestamp
282        + Lattice
283        + TotalOrder
284        + Codec64
285        + From<EpochMillis>
286        + TimestampManipulation
287        + Into<Datum<'static>>
288        + Display,
289{
290    type Timestamp = T;
291
292    fn initialization_complete(&mut self) {
293        self.reconcile_dangling_statistics();
294        self.initialized = true;
295
296        for instance in self.instances.values_mut() {
297            instance.send(StorageCommand::InitializationComplete);
298        }
299    }
300
301    fn update_parameters(&mut self, config_params: StorageParameters) {
302        self.storage_collections
303            .update_parameters(config_params.clone());
304
305        // We serialize the dyncfg updates in StorageParameters, but configure
306        // persist separately.
307        self.persist.cfg().apply_from(&config_params.dyncfg_updates);
308
309        for instance in self.instances.values_mut() {
310            let params = Box::new(config_params.clone());
311            instance.send(StorageCommand::UpdateConfiguration(params));
312        }
313        self.config.update(config_params);
314        self.statistics_interval_sender
315            .send_replace(self.config.parameters.statistics_interval);
316        self.collection_manager.update_user_batch_duration(
317            self.config
318                .parameters
319                .user_storage_managed_collections_batch_duration,
320        );
321    }
322
323    /// Get the current configuration
324    fn config(&self) -> &StorageConfiguration {
325        &self.config
326    }
327
328    fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing> {
329        self.storage_collections.collection_metadata(id)
330    }
331
332    fn collection_hydrated(
333        &self,
334        collection_id: GlobalId,
335    ) -> Result<bool, StorageError<Self::Timestamp>> {
336        let collection = self.collection(collection_id)?;
337
338        let instance_id = match &collection.data_source {
339            DataSource::Ingestion(ingestion_description) => ingestion_description.instance_id,
340            DataSource::IngestionExport { ingestion_id, .. } => {
341                let ingestion_state = self.collections.get(ingestion_id).expect("known to exist");
342
343                let instance_id = match &ingestion_state.data_source {
344                    DataSource::Ingestion(ingestion_desc) => ingestion_desc.instance_id,
345                    _ => unreachable!("SourceExport must only refer to primary source"),
346                };
347
348                instance_id
349            }
350            _ => return Ok(true),
351        };
352
353        let instance = self.instances.get(&instance_id).ok_or_else(|| {
354            StorageError::IngestionInstanceMissing {
355                storage_instance_id: instance_id,
356                ingestion_id: collection_id,
357            }
358        })?;
359
360        if instance.replica_ids().next().is_none() {
361            // Ingestions on zero-replica clusters are always considered
362            // hydrated.
363            return Ok(true);
364        }
365
366        match &collection.extra_state {
367            CollectionStateExtra::Ingestion(ingestion_state) => {
368                // An ingestion is hydrated if it is hydrated on at least one replica.
369                Ok(ingestion_state.hydrated_on.len() >= 1)
370            }
371            CollectionStateExtra::Export(_) => {
372                // For now, sinks are always considered hydrated. We rely on
373                // them starting up "instantly" and don't wait for them when
374                // checking hydration status of a replica.  TODO(sinks): base
375                // this off of the sink shard's frontier?
376                Ok(true)
377            }
378            CollectionStateExtra::None => {
379                // For now, objects that are not ingestions are always
380                // considered hydrated. This is tables and webhooks, as of
381                // today.
382                Ok(true)
383            }
384        }
385    }
386
387    #[mz_ore::instrument(level = "debug")]
388    fn collections_hydrated_on_replicas(
389        &self,
390        target_replica_ids: Option<Vec<ReplicaId>>,
391        target_cluster_id: &StorageInstanceId,
392        exclude_collections: &BTreeSet<GlobalId>,
393    ) -> Result<bool, StorageError<Self::Timestamp>> {
394        // If an empty set of replicas is provided there can be
395        // no collections on them, we'll count this as hydrated.
396        if target_replica_ids.as_ref().is_some_and(|v| v.is_empty()) {
397            return Ok(true);
398        }
399
400        // If target_replica_ids is provided, use it as the set of target
401        // replicas. Otherwise check for hydration on any replica.
402        let target_replicas: Option<BTreeSet<ReplicaId>> =
403            target_replica_ids.map(|ids| ids.into_iter().collect());
404
405        let mut all_hydrated = true;
406        for (collection_id, collection_state) in self.collections.iter() {
407            if collection_id.is_transient() || exclude_collections.contains(collection_id) {
408                continue;
409            }
410            let hydrated = match &collection_state.extra_state {
411                CollectionStateExtra::Ingestion(state) => {
412                    if &state.instance_id != target_cluster_id {
413                        continue;
414                    }
415                    match &target_replicas {
416                        Some(target_replicas) => !state.hydrated_on.is_disjoint(target_replicas),
417                        None => {
418                            // Not target replicas, so check that it's hydrated
419                            // on at least one replica.
420                            state.hydrated_on.len() >= 1
421                        }
422                    }
423                }
424                CollectionStateExtra::Export(_) => {
425                    // For now, sinks are always considered hydrated. We rely on
426                    // them starting up "instantly" and don't wait for them when
427                    // checking hydration status of a replica.  TODO(sinks):
428                    // base this off of the sink shard's frontier?
429                    true
430                }
431                CollectionStateExtra::None => {
432                    // For now, objects that are not ingestions are always
433                    // considered hydrated. This is tables and webhooks, as of
434                    // today.
435                    true
436                }
437            };
438            if !hydrated {
439                tracing::info!(%collection_id, "collection is not hydrated on any replica");
440                all_hydrated = false;
441                // We continue with our loop instead of breaking out early, so
442                // that we log all non-hydrated replicas.
443            }
444        }
445        Ok(all_hydrated)
446    }
447
448    fn collection_frontiers(
449        &self,
450        id: GlobalId,
451    ) -> Result<(Antichain<Self::Timestamp>, Antichain<Self::Timestamp>), CollectionMissing> {
452        let frontiers = self.storage_collections.collection_frontiers(id)?;
453        Ok((frontiers.implied_capability, frontiers.write_frontier))
454    }
455
456    fn collections_frontiers(
457        &self,
458        mut ids: Vec<GlobalId>,
459    ) -> Result<Vec<(GlobalId, Antichain<T>, Antichain<T>)>, CollectionMissing> {
460        let mut result = vec![];
461        // In theory, we could pull all our frontiers from storage collections...
462        // but in practice those frontiers may not be identical. For historical reasons, we use the
463        // locally-tracked frontier for sinks but the storage-collections-maintained frontier for
464        // sources.
465        ids.retain(|&id| match self.export(id) {
466            Ok(export) => {
467                result.push((
468                    id,
469                    export.input_hold().since().clone(),
470                    export.write_frontier.clone(),
471                ));
472                false
473            }
474            Err(_) => true,
475        });
476        result.extend(
477            self.storage_collections
478                .collections_frontiers(ids)?
479                .into_iter()
480                .map(|frontiers| {
481                    (
482                        frontiers.id,
483                        frontiers.implied_capability,
484                        frontiers.write_frontier,
485                    )
486                }),
487        );
488
489        Ok(result)
490    }
491
492    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
493        self.storage_collections.active_collection_metadatas()
494    }
495
496    fn active_ingestion_exports(
497        &self,
498        instance_id: StorageInstanceId,
499    ) -> Box<dyn Iterator<Item = &GlobalId> + '_> {
500        let active_storage_collections: BTreeMap<_, _> = self
501            .storage_collections
502            .active_collection_frontiers()
503            .into_iter()
504            .map(|c| (c.id, c))
505            .collect();
506
507        let active_exports = self.instances[&instance_id]
508            .active_ingestion_exports()
509            .filter(move |id| {
510                let frontiers = active_storage_collections.get(id);
511                match frontiers {
512                    Some(frontiers) => !frontiers.write_frontier.is_empty(),
513                    None => {
514                        // Not "active", so we don't care here.
515                        false
516                    }
517                }
518            });
519
520        Box::new(active_exports)
521    }
522
523    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
524        self.storage_collections.check_exists(id)
525    }
526
527    fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>) {
528        let metrics = self.metrics.for_instance(id);
529        let mut instance = Instance::new(
530            workload_class,
531            metrics,
532            self.now.clone(),
533            self.instance_response_tx.clone(),
534        );
535        if self.initialized {
536            instance.send(StorageCommand::InitializationComplete);
537        }
538        if !self.read_only {
539            instance.send(StorageCommand::AllowWrites);
540        }
541
542        let params = Box::new(self.config.parameters.clone());
543        instance.send(StorageCommand::UpdateConfiguration(params));
544
545        let old_instance = self.instances.insert(id, instance);
546        assert_none!(old_instance, "storage instance {id} already exists");
547    }
548
549    fn drop_instance(&mut self, id: StorageInstanceId) {
550        let instance = self.instances.remove(&id);
551        assert!(instance.is_some(), "storage instance {id} does not exist");
552    }
553
554    fn update_instance_workload_class(
555        &mut self,
556        id: StorageInstanceId,
557        workload_class: Option<String>,
558    ) {
559        let instance = self
560            .instances
561            .get_mut(&id)
562            .unwrap_or_else(|| panic!("instance {id} does not exist"));
563
564        instance.workload_class = workload_class;
565    }
566
567    fn connect_replica(
568        &mut self,
569        instance_id: StorageInstanceId,
570        replica_id: ReplicaId,
571        location: ClusterReplicaLocation,
572    ) {
573        let instance = self
574            .instances
575            .get_mut(&instance_id)
576            .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
577
578        let config = ReplicaConfig {
579            build_info: self.build_info,
580            location,
581            grpc_client: self.config.parameters.grpc_client.clone(),
582        };
583        instance.add_replica(replica_id, config);
584    }
585
586    fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId) {
587        let instance = self
588            .instances
589            .get_mut(&instance_id)
590            .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
591
592        let status_now = mz_ore::now::to_datetime((self.now)());
593        let mut source_status_updates = vec![];
594        let mut sink_status_updates = vec![];
595
596        let make_update = |id, object_type| StatusUpdate {
597            id,
598            status: Status::Paused,
599            timestamp: status_now,
600            error: None,
601            hints: BTreeSet::from([format!(
602                "The replica running this {object_type} has been dropped"
603            )]),
604            namespaced_errors: Default::default(),
605            replica_id: Some(replica_id),
606        };
607
608        for ingestion_id in instance.active_ingestions() {
609            if let Some(active_replicas) = self.dropped_objects.get_mut(ingestion_id) {
610                active_replicas.remove(&replica_id);
611                if active_replicas.is_empty() {
612                    self.dropped_objects.remove(ingestion_id);
613                }
614            }
615
616            let ingestion = self
617                .collections
618                .get_mut(ingestion_id)
619                .expect("instance contains unknown ingestion");
620
621            let ingestion_description = match &ingestion.data_source {
622                DataSource::Ingestion(ingestion_description) => ingestion_description.clone(),
623                _ => panic!(
624                    "unexpected data source for ingestion: {:?}",
625                    ingestion.data_source
626                ),
627            };
628
629            let old_style_ingestion = *ingestion_id != ingestion_description.remap_collection_id;
630            let subsource_ids = ingestion_description.collection_ids().filter(|id| {
631                // NOTE(aljoscha): We filter out the remap collection for old style
632                // ingestions because it doesn't get any status updates about it from the
633                // replica side. So we don't want to synthesize a 'paused' status here.
634                // New style ingestion do, since the source itself contains the remap data.
635                let should_discard =
636                    old_style_ingestion && id == &ingestion_description.remap_collection_id;
637                !should_discard
638            });
639            for id in subsource_ids {
640                source_status_updates.push(make_update(id, "source"));
641            }
642        }
643
644        for id in instance.active_exports() {
645            if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
646                active_replicas.remove(&replica_id);
647                if active_replicas.is_empty() {
648                    self.dropped_objects.remove(id);
649                }
650            }
651
652            sink_status_updates.push(make_update(*id, "sink"));
653        }
654
655        instance.drop_replica(replica_id);
656
657        if !self.read_only {
658            if !source_status_updates.is_empty() {
659                self.append_status_introspection_updates(
660                    IntrospectionType::SourceStatusHistory,
661                    source_status_updates,
662                );
663            }
664            if !sink_status_updates.is_empty() {
665                self.append_status_introspection_updates(
666                    IntrospectionType::SinkStatusHistory,
667                    sink_status_updates,
668                );
669            }
670        }
671    }
672
673    async fn evolve_nullability_for_bootstrap(
674        &mut self,
675        storage_metadata: &StorageMetadata,
676        collections: Vec<(GlobalId, RelationDesc)>,
677    ) -> Result<(), StorageError<Self::Timestamp>> {
678        let persist_client = self
679            .persist
680            .open(self.persist_location.clone())
681            .await
682            .unwrap();
683
684        for (global_id, relation_desc) in collections {
685            let shard_id = storage_metadata.get_collection_shard(global_id)?;
686            let diagnostics = Diagnostics {
687                shard_name: global_id.to_string(),
688                handle_purpose: "evolve nullability for bootstrap".to_string(),
689            };
690            let latest_schema = persist_client
691                .latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
692                .await
693                .expect("invalid persist usage");
694            let Some((schema_id, current_schema, _)) = latest_schema else {
695                tracing::debug!(?global_id, "no schema registered");
696                continue;
697            };
698            tracing::debug!(?global_id, ?current_schema, new_schema = ?relation_desc, "migrating schema");
699
700            let diagnostics = Diagnostics {
701                shard_name: global_id.to_string(),
702                handle_purpose: "evolve nullability for bootstrap".to_string(),
703            };
704            let evolve_result = persist_client
705                .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
706                    shard_id,
707                    schema_id,
708                    &relation_desc,
709                    &UnitSchema,
710                    diagnostics,
711                )
712                .await
713                .expect("invalid persist usage");
714            match evolve_result {
715                CaESchema::Ok(_) => (),
716                CaESchema::ExpectedMismatch {
717                    schema_id,
718                    key,
719                    val: _,
720                } => {
721                    return Err(StorageError::PersistSchemaEvolveRace {
722                        global_id,
723                        shard_id,
724                        schema_id,
725                        relation_desc: key,
726                    });
727                }
728                CaESchema::Incompatible => {
729                    return Err(StorageError::PersistInvalidSchemaEvolve {
730                        global_id,
731                        shard_id,
732                    });
733                }
734            };
735        }
736
737        Ok(())
738    }
739
740    /// Create and "execute" the described collection.
741    ///
742    /// "Execute" is in scare quotes because what executing a collection means
743    /// varies widely based on the type of collection you're creating.
744    ///
745    /// The general process creating a collection undergoes is:
746    /// 1. Enrich the description we get from the user with the metadata only
747    ///    the storage controller's metadata. This is mostly a matter of
748    ///    separating concerns.
749    /// 2. Generate write and read persist handles for the collection.
750    /// 3. Store the collection's metadata in the appropriate field.
751    /// 4. "Execute" the collection. What that means is contingent on the type of
752    ///    collection. so consult the code for more details.
753    ///
754    // TODO(aljoscha): It would be swell if we could refactor this Leviathan of
755    // a method/move individual parts to their own methods. @guswynn observes
756    // that a number of these operations could be moved into fns on
757    // `DataSource`.
758    #[instrument(name = "storage::create_collections")]
759    async fn create_collections_for_bootstrap(
760        &mut self,
761        storage_metadata: &StorageMetadata,
762        register_ts: Option<Self::Timestamp>,
763        mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
764        migrated_storage_collections: &BTreeSet<GlobalId>,
765    ) -> Result<(), StorageError<Self::Timestamp>> {
766        self.migrated_storage_collections
767            .extend(migrated_storage_collections.iter().cloned());
768
769        self.storage_collections
770            .create_collections_for_bootstrap(
771                storage_metadata,
772                register_ts.clone(),
773                collections.clone(),
774                migrated_storage_collections,
775            )
776            .await?;
777
778        // At this point we're connected to all the collection shards in persist. Our warming task
779        // is no longer useful, so abort it if it's still running.
780        drop(self.persist_warm_task.take());
781
782        // Validate first, to avoid corrupting state.
783        // 1. create a dropped identifier, or
784        // 2. create an existing identifier with a new description.
785        // Make sure to check for errors within `ingestions` as well.
786        collections.sort_by_key(|(id, _)| *id);
787        collections.dedup();
788        for pos in 1..collections.len() {
789            if collections[pos - 1].0 == collections[pos].0 {
790                return Err(StorageError::CollectionIdReused(collections[pos].0));
791            }
792        }
793
794        // We first enrich each collection description with some additional metadata...
795        let enriched_with_metadata = collections
796            .into_iter()
797            .map(|(id, description)| {
798                let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
799
800                // If the shard is being managed by txn-wal (initially, tables), then we need to
801                // pass along the shard id for the txns shard to dataflow rendering.
802                let txns_shard = description
803                    .data_source
804                    .in_txns()
805                    .then(|| *self.txns_read.txns_id());
806
807                let metadata = CollectionMetadata {
808                    persist_location: self.persist_location.clone(),
809                    data_shard,
810                    relation_desc: description.desc.clone(),
811                    txns_shard,
812                };
813
814                Ok((id, description, metadata))
815            })
816            .collect_vec();
817
818        // So that we can open persist handles for each collections concurrently.
819        let persist_client = self
820            .persist
821            .open(self.persist_location.clone())
822            .await
823            .unwrap();
824        let persist_client = &persist_client;
825
826        // Reborrow the `&mut self` as immutable, as all the concurrent work to be processed in
827        // this stream cannot all have exclusive access.
828        use futures::stream::{StreamExt, TryStreamExt};
829        let this = &*self;
830        let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
831            .map(|data: Result<_, StorageError<Self::Timestamp>>| {
832                async move {
833                    let (id, description, metadata) = data?;
834
835                    // should be replaced with real introspection (https://github.com/MaterializeInc/database-issues/issues/4078)
836                    // but for now, it's helpful to have this mapping written down somewhere
837                    debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
838
839                    let write = this
840                        .open_data_handles(
841                            &id,
842                            metadata.data_shard,
843                            metadata.relation_desc.clone(),
844                            persist_client,
845                        )
846                        .await;
847
848                    Ok::<_, StorageError<T>>((id, description, write, metadata))
849                }
850            })
851            // Poll each future for each collection concurrently, maximum of 50 at a time.
852            .buffer_unordered(50)
853            // HERE BE DRAGONS:
854            //
855            // There are at least 2 subtleties in using `FuturesUnordered` (which
856            // `buffer_unordered` uses underneath:
857            // - One is captured here <https://github.com/rust-lang/futures-rs/issues/2387>
858            // - And the other is deadlocking if processing an OUTPUT of a `FuturesUnordered`
859            // stream attempts to obtain an async mutex that is also obtained in the futures
860            // being polled.
861            //
862            // Both of these could potentially be issues in all usages of `buffer_unordered` in
863            // this method, so we stick the standard advice: only use `try_collect` or
864            // `collect`!
865            .try_collect()
866            .await?;
867
868        // The set of collections that we should render at the end of this
869        // function.
870        let mut to_execute = BTreeSet::new();
871        // New collections that are being created; this is distinct from the set
872        // of collections we plan to execute because
873        // `DataSource::IngestionExport` is added as a new collection, but is
874        // not executed directly.
875        let mut new_collections = BTreeSet::new();
876        let mut table_registers = Vec::with_capacity(to_register.len());
877
878        // Reorder in dependency order.
879        to_register.sort_by_key(|(id, ..)| *id);
880
881        // Register tables first, but register them in reverse order since earlier tables
882        // can depend on later tables.
883        //
884        // Note: We could do more complex sorting to avoid the allocations, but IMO it's
885        // easier to reason about it this way.
886        let (tables_to_register, collections_to_register): (Vec<_>, Vec<_>) = to_register
887            .into_iter()
888            .partition(|(_id, desc, ..)| desc.data_source == DataSource::Table);
889        let to_register = tables_to_register
890            .into_iter()
891            .rev()
892            .chain(collections_to_register.into_iter());
893
894        // Statistics need a level of indirection so we can mutably borrow
895        // `self` when registering collections and when we are inserting
896        // statistics.
897        let mut new_source_statistic_entries = BTreeSet::new();
898        let mut new_webhook_statistic_entries = BTreeSet::new();
899        let mut new_sink_statistic_entries = BTreeSet::new();
900
901        for (id, description, write, metadata) in to_register {
902            let is_in_txns = |id, metadata: &CollectionMetadata| {
903                metadata.txns_shard.is_some()
904                    && !(self.read_only && migrated_storage_collections.contains(&id))
905            };
906
907            to_execute.insert(id);
908            new_collections.insert(id);
909
910            let write_frontier = write.upper();
911
912            // Determine if this collection has another dependency.
913            let storage_dependencies = self.determine_collection_dependencies(id, &description)?;
914
915            let dependency_read_holds = self
916                .storage_collections
917                .acquire_read_holds(storage_dependencies)
918                .expect("can acquire read holds");
919
920            let mut dependency_since = Antichain::from_elem(T::minimum());
921            for read_hold in dependency_read_holds.iter() {
922                dependency_since.join_assign(read_hold.since());
923            }
924
925            let data_source = description.data_source;
926
927            // Assert some invariants.
928            //
929            // TODO(alter_table): Include Tables (is_in_txns) in this check. After
930            // supporting ALTER TABLE, it's now possible for a table to have a dependency
931            // and thus run this check. But tables are managed by txn-wal and thus the
932            // upper of the shard's `write_handle` generally isn't the logical upper of
933            // the shard. Instead we need to thread through the upper of the `txn` shard
934            // here so we can check this invariant.
935            if !dependency_read_holds.is_empty()
936                && !is_in_txns(id, &metadata)
937                && !matches!(&data_source, DataSource::Sink { .. })
938            {
939                // As the halt message says, this can happen when trying to come
940                // up in read-only mode and the current read-write
941                // environmentd/controller deletes a collection. We exit
942                // gracefully, which means we'll get restarted and get to try
943                // again.
944                if dependency_since.is_empty() {
945                    halt!(
946                        "dependency since frontier is empty while dependent upper \
947                        is not empty (dependent id={id}, write_frontier={:?}, dependency_read_holds={:?}), \
948                        this indicates concurrent deletion of a collection",
949                        write_frontier,
950                        dependency_read_holds,
951                    );
952                }
953
954                // If the dependency is between a remap shard and a source (and
955                // not a "primary" dependency), the dependency since cannot be
956                // beyond the dependent (our) upper unless the collection is
957                // new. If the since is allowed to "catch up" to the upper, that
958                // is `upper <= since`, a restarting ingestion cannot
959                // differentiate between updates that have already been written
960                // out to the backing persist shard and updates that have yet to
961                // be written. We would write duplicate updates.
962                //
963                // If this check fails, it means that the read hold installed on
964                // the dependency was probably not upheld –– if it were, the
965                // dependency's since could not have advanced as far the
966                // dependent's upper.
967                //
968                // We don't care about the dependency since when the write
969                // frontier is empty. In that case, no-one can write down any
970                // more updates.
971                if description.primary.is_none() {
972                    mz_ore::soft_assert_or_log!(
973                        write_frontier.elements() == &[T::minimum()]
974                            || write_frontier.is_empty()
975                            || PartialOrder::less_than(&dependency_since, write_frontier),
976                        "dependency since has advanced past dependent ({id}) upper \n
977                            dependent ({id}): upper {:?} \n
978                            dependency since {:?} \n
979                            dependency read holds: {:?}",
980                        write_frontier,
981                        dependency_since,
982                        dependency_read_holds,
983                    );
984                }
985            }
986
987            // Perform data source-specific setup.
988            let mut extra_state = CollectionStateExtra::None;
989            let mut maybe_instance_id = None;
990            match &data_source {
991                DataSource::Introspection(typ) => {
992                    debug!(
993                        ?data_source, meta = ?metadata,
994                        "registering {id} with persist monotonic worker",
995                    );
996                    // We always register the collection with the collection manager,
997                    // regardless of read-only mode. The CollectionManager itself is
998                    // aware of read-only mode and will not attempt to write before told
999                    // to do so.
1000                    //
1001                    self.register_introspection_collection(
1002                        id,
1003                        *typ,
1004                        write,
1005                        persist_client.clone(),
1006                    )?;
1007                }
1008                DataSource::Webhook => {
1009                    debug!(
1010                        ?data_source, meta = ?metadata,
1011                        "registering {id} with persist monotonic worker",
1012                    );
1013                    new_source_statistic_entries.insert(id);
1014                    // This collection of statistics is periodically aggregated into
1015                    // `source_statistics`.
1016                    new_webhook_statistic_entries.insert(id);
1017                    // Register the collection so our manager knows about it.
1018                    //
1019                    // NOTE: Maybe this shouldn't be in the collection manager,
1020                    // and collection manager should only be responsible for
1021                    // built-in introspection collections?
1022                    self.collection_manager
1023                        .register_append_only_collection(id, write, false, None);
1024                }
1025                DataSource::IngestionExport {
1026                    ingestion_id,
1027                    details,
1028                    data_config,
1029                } => {
1030                    debug!(
1031                        ?data_source, meta = ?metadata,
1032                        "not registering {id} with a controller persist worker",
1033                    );
1034                    // Adjust the source to contain this export.
1035                    let ingestion_state = self
1036                        .collections
1037                        .get_mut(ingestion_id)
1038                        .expect("known to exist");
1039
1040                    let instance_id = match &mut ingestion_state.data_source {
1041                        DataSource::Ingestion(ingestion_desc) => {
1042                            ingestion_desc.source_exports.insert(
1043                                id,
1044                                SourceExport {
1045                                    storage_metadata: (),
1046                                    details: details.clone(),
1047                                    data_config: data_config.clone(),
1048                                },
1049                            );
1050
1051                            // Record the ingestion's cluster ID for the
1052                            // ingestion export. This way we always have a
1053                            // record of it, even if the ingestion's collection
1054                            // description disappears.
1055                            ingestion_desc.instance_id
1056                        }
1057                        _ => unreachable!(
1058                            "SourceExport must only refer to primary sources that already exist"
1059                        ),
1060                    };
1061
1062                    // Executing the source export doesn't do anything, ensure we execute the source instead.
1063                    to_execute.remove(&id);
1064                    to_execute.insert(*ingestion_id);
1065
1066                    let ingestion_state = IngestionState {
1067                        read_capabilities: MutableAntichain::from(dependency_since.clone()),
1068                        dependency_read_holds,
1069                        derived_since: dependency_since,
1070                        write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1071                        hold_policy: ReadPolicy::step_back(),
1072                        instance_id,
1073                        hydrated_on: BTreeSet::new(),
1074                    };
1075
1076                    extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1077                    maybe_instance_id = Some(instance_id);
1078
1079                    new_source_statistic_entries.insert(id);
1080                }
1081                DataSource::Table => {
1082                    debug!(
1083                        ?data_source, meta = ?metadata,
1084                        "registering {id} with persist table worker",
1085                    );
1086                    table_registers.push((id, write));
1087                }
1088                DataSource::Progress | DataSource::Other => {
1089                    debug!(
1090                        ?data_source, meta = ?metadata,
1091                        "not registering {id} with a controller persist worker",
1092                    );
1093                }
1094                DataSource::Ingestion(ingestion_desc) => {
1095                    debug!(
1096                        ?data_source, meta = ?metadata,
1097                        "not registering {id} with a controller persist worker",
1098                    );
1099
1100                    let mut dependency_since = Antichain::from_elem(T::minimum());
1101                    for read_hold in dependency_read_holds.iter() {
1102                        dependency_since.join_assign(read_hold.since());
1103                    }
1104
1105                    let ingestion_state = IngestionState {
1106                        read_capabilities: MutableAntichain::from(dependency_since.clone()),
1107                        dependency_read_holds,
1108                        derived_since: dependency_since,
1109                        write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1110                        hold_policy: ReadPolicy::step_back(),
1111                        instance_id: ingestion_desc.instance_id,
1112                        hydrated_on: BTreeSet::new(),
1113                    };
1114
1115                    extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1116                    maybe_instance_id = Some(ingestion_desc.instance_id);
1117
1118                    new_source_statistic_entries.insert(id);
1119                }
1120                DataSource::Sink { desc } => {
1121                    let mut dependency_since = Antichain::from_elem(T::minimum());
1122                    for read_hold in dependency_read_holds.iter() {
1123                        dependency_since.join_assign(read_hold.since());
1124                    }
1125
1126                    let [self_hold, read_hold] =
1127                        dependency_read_holds.try_into().expect("two holds");
1128
1129                    let state = ExportState::new(
1130                        desc.instance_id,
1131                        read_hold,
1132                        self_hold,
1133                        write_frontier.clone(),
1134                        ReadPolicy::step_back(),
1135                    );
1136                    maybe_instance_id = Some(state.cluster_id);
1137                    extra_state = CollectionStateExtra::Export(state);
1138
1139                    new_sink_statistic_entries.insert(id);
1140                }
1141            }
1142
1143            let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(id, maybe_instance_id);
1144            let collection_state =
1145                CollectionState::new(data_source, metadata, extra_state, wallclock_lag_metrics);
1146
1147            self.collections.insert(id, collection_state);
1148        }
1149
1150        {
1151            let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1152
1153            // Webhooks don't run on clusters/replicas, so we initialize their
1154            // statistics collection here.
1155            for id in new_webhook_statistic_entries {
1156                source_statistics.webhook_statistics.entry(id).or_default();
1157            }
1158
1159            // Sources and sinks only have statistics in the collection when
1160            // there is a replica that is reporting them. No need to initialize
1161            // here.
1162        }
1163
1164        // Register the tables all in one batch.
1165        if !table_registers.is_empty() {
1166            let register_ts = register_ts
1167                .expect("caller should have provided a register_ts when creating a table");
1168
1169            if self.read_only {
1170                // In read-only mode, we use a special read-only table worker
1171                // that allows writing to migrated tables and will continually
1172                // bump their shard upper so that it tracks the txn shard upper.
1173                // We do this, so that they remain readable at a recent
1174                // timestamp, which in turn allows dataflows that depend on them
1175                // to (re-)hydrate.
1176                //
1177                // We only want to register migrated tables, though, and leave
1178                // existing tables out/never write to them in read-only mode.
1179                table_registers
1180                    .retain(|(id, _write_handle)| migrated_storage_collections.contains(id));
1181
1182                self.persist_table_worker
1183                    .register(register_ts, table_registers)
1184                    .await
1185                    .expect("table worker unexpectedly shut down");
1186            } else {
1187                self.persist_table_worker
1188                    .register(register_ts, table_registers)
1189                    .await
1190                    .expect("table worker unexpectedly shut down");
1191            }
1192        }
1193
1194        self.append_shard_mappings(new_collections.into_iter(), Diff::ONE);
1195
1196        // TODO(guswynn): perform the io in this final section concurrently.
1197        for id in to_execute {
1198            match &self.collection(id)?.data_source {
1199                DataSource::Ingestion(ingestion) => {
1200                    if !self.read_only
1201                        || (ENABLE_0DT_DEPLOYMENT_SOURCES.get(self.config.config_set())
1202                            && ingestion.desc.connection.supports_read_only())
1203                    {
1204                        self.run_ingestion(id)?;
1205                    }
1206                }
1207                DataSource::IngestionExport { .. } => unreachable!(
1208                    "ingestion exports do not execute directly, but instead schedule their source to be re-executed"
1209                ),
1210                DataSource::Introspection(_)
1211                | DataSource::Webhook
1212                | DataSource::Table
1213                | DataSource::Progress
1214                | DataSource::Other => {}
1215                DataSource::Sink { .. } => {
1216                    if !self.read_only {
1217                        self.run_export(id)?;
1218                    }
1219                }
1220            };
1221        }
1222
1223        Ok(())
1224    }
1225
1226    fn check_alter_ingestion_source_desc(
1227        &mut self,
1228        ingestion_id: GlobalId,
1229        source_desc: &SourceDesc,
1230    ) -> Result<(), StorageError<Self::Timestamp>> {
1231        let source_collection = self.collection(ingestion_id)?;
1232        let data_source = &source_collection.data_source;
1233        match &data_source {
1234            DataSource::Ingestion(cur_ingestion) => {
1235                cur_ingestion
1236                    .desc
1237                    .alter_compatible(ingestion_id, source_desc)?;
1238            }
1239            o => {
1240                tracing::info!(
1241                    "{ingestion_id} inalterable because its data source is {:?} and not an ingestion",
1242                    o
1243                );
1244                Err(AlterError { id: ingestion_id })?
1245            }
1246        }
1247
1248        Ok(())
1249    }
1250
1251    async fn alter_ingestion_source_desc(
1252        &mut self,
1253        ingestion_ids: BTreeMap<GlobalId, SourceDesc>,
1254    ) -> Result<(), StorageError<Self::Timestamp>> {
1255        let mut ingestions_to_run = BTreeSet::new();
1256
1257        for (id, new_desc) in ingestion_ids {
1258            let collection = self
1259                .collections
1260                .get_mut(&id)
1261                .ok_or_else(|| StorageError::IdentifierMissing(id))?;
1262
1263            match &mut collection.data_source {
1264                DataSource::Ingestion(ingestion) => {
1265                    if ingestion.desc != new_desc {
1266                        tracing::info!(
1267                            from = ?ingestion.desc,
1268                            to = ?new_desc,
1269                            "alter_ingestion_source_desc, updating"
1270                        );
1271                        ingestion.desc = new_desc;
1272                        ingestions_to_run.insert(id);
1273                    }
1274                }
1275                o => {
1276                    tracing::warn!("alter_ingestion_source_desc called on {:?}", o);
1277                    Err(StorageError::IdentifierInvalid(id))?;
1278                }
1279            }
1280        }
1281
1282        for id in ingestions_to_run {
1283            self.run_ingestion(id)?;
1284        }
1285        Ok(())
1286    }
1287
1288    async fn alter_ingestion_connections(
1289        &mut self,
1290        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
1291    ) -> Result<(), StorageError<Self::Timestamp>> {
1292        let mut ingestions_to_run = BTreeSet::new();
1293
1294        for (id, conn) in source_connections {
1295            let collection = self
1296                .collections
1297                .get_mut(&id)
1298                .ok_or_else(|| StorageError::IdentifierMissing(id))?;
1299
1300            match &mut collection.data_source {
1301                DataSource::Ingestion(ingestion) => {
1302                    // If the connection hasn't changed, there's no sense in
1303                    // re-rendering the dataflow.
1304                    if ingestion.desc.connection != conn {
1305                        tracing::info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
1306                        ingestion.desc.connection = conn;
1307                        ingestions_to_run.insert(id);
1308                    } else {
1309                        tracing::warn!(
1310                            "update_source_connection called on {id} but the \
1311                            connection was the same"
1312                        );
1313                    }
1314                }
1315                o => {
1316                    tracing::warn!("update_source_connection called on {:?}", o);
1317                    Err(StorageError::IdentifierInvalid(id))?;
1318                }
1319            }
1320        }
1321
1322        for id in ingestions_to_run {
1323            self.run_ingestion(id)?;
1324        }
1325        Ok(())
1326    }
1327
1328    async fn alter_ingestion_export_data_configs(
1329        &mut self,
1330        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
1331    ) -> Result<(), StorageError<Self::Timestamp>> {
1332        let mut ingestions_to_run = BTreeSet::new();
1333
1334        for (source_export_id, new_data_config) in source_exports {
1335            // We need to adjust the data config on the CollectionState for
1336            // the source export collection directly
1337            let source_export_collection = self
1338                .collections
1339                .get_mut(&source_export_id)
1340                .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1341            let ingestion_id = match &mut source_export_collection.data_source {
1342                DataSource::IngestionExport {
1343                    ingestion_id,
1344                    details: _,
1345                    data_config,
1346                } => {
1347                    *data_config = new_data_config.clone();
1348                    *ingestion_id
1349                }
1350                o => {
1351                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1352                    Err(StorageError::IdentifierInvalid(source_export_id))?
1353                }
1354            };
1355            // We also need to adjust the data config on the CollectionState of the
1356            // Ingestion that the export is associated with.
1357            let ingestion_collection = self
1358                .collections
1359                .get_mut(&ingestion_id)
1360                .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
1361
1362            match &mut ingestion_collection.data_source {
1363                DataSource::Ingestion(ingestion_desc) => {
1364                    let source_export = ingestion_desc
1365                        .source_exports
1366                        .get_mut(&source_export_id)
1367                        .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1368
1369                    // If the data config hasn't changed, there's no sense in
1370                    // re-rendering the dataflow.
1371                    if source_export.data_config != new_data_config {
1372                        tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
1373                        source_export.data_config = new_data_config;
1374
1375                        ingestions_to_run.insert(ingestion_id);
1376                    } else {
1377                        tracing::warn!(
1378                            "alter_ingestion_export_data_configs called on \
1379                                    export {source_export_id} of {ingestion_id} but \
1380                                    the data config was the same"
1381                        );
1382                    }
1383                }
1384                o => {
1385                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1386                    Err(StorageError::IdentifierInvalid(ingestion_id))?
1387                }
1388            }
1389        }
1390
1391        for id in ingestions_to_run {
1392            self.run_ingestion(id)?;
1393        }
1394        Ok(())
1395    }
1396
1397    async fn alter_table_desc(
1398        &mut self,
1399        existing_collection: GlobalId,
1400        new_collection: GlobalId,
1401        new_desc: RelationDesc,
1402        expected_version: RelationVersion,
1403        register_ts: Self::Timestamp,
1404    ) -> Result<(), StorageError<Self::Timestamp>> {
1405        let data_shard = {
1406            let Controller {
1407                collections,
1408                storage_collections,
1409                ..
1410            } = self;
1411
1412            let existing = collections
1413                .get(&existing_collection)
1414                .ok_or(StorageError::IdentifierMissing(existing_collection))?;
1415            if existing.data_source != DataSource::Table {
1416                return Err(StorageError::IdentifierInvalid(existing_collection));
1417            }
1418
1419            // Let StorageCollections know!
1420            storage_collections
1421                .alter_table_desc(
1422                    existing_collection,
1423                    new_collection,
1424                    new_desc.clone(),
1425                    expected_version,
1426                )
1427                .await?;
1428
1429            existing.collection_metadata.data_shard.clone()
1430        };
1431
1432        let persist_client = self
1433            .persist
1434            .open(self.persist_location.clone())
1435            .await
1436            .expect("invalid persist location");
1437        let write_handle = self
1438            .open_data_handles(
1439                &existing_collection,
1440                data_shard,
1441                new_desc.clone(),
1442                &persist_client,
1443            )
1444            .await;
1445
1446        let collection_meta = CollectionMetadata {
1447            persist_location: self.persist_location.clone(),
1448            data_shard,
1449            relation_desc: new_desc.clone(),
1450            // TODO(alter_table): Support schema evolution on sources.
1451            txns_shard: Some(self.txns_read.txns_id().clone()),
1452        };
1453        // TODO(alter_table): Support schema evolution on sources.
1454        let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(new_collection, None);
1455        let collection_state = CollectionState::new(
1456            DataSource::Table,
1457            collection_meta,
1458            CollectionStateExtra::None,
1459            wallclock_lag_metrics,
1460        );
1461
1462        // Great! We have successfully evolved the schema of our Table, now we need to update our
1463        // in-memory data structures.
1464        self.collections.insert(new_collection, collection_state);
1465
1466        self.persist_table_worker
1467            .register(register_ts, vec![(new_collection, write_handle)])
1468            .await
1469            .expect("table worker unexpectedly shut down");
1470
1471        self.append_shard_mappings([new_collection].into_iter(), Diff::ONE);
1472
1473        Ok(())
1474    }
1475
1476    fn export(
1477        &self,
1478        id: GlobalId,
1479    ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1480        self.collections
1481            .get(&id)
1482            .and_then(|c| match &c.extra_state {
1483                CollectionStateExtra::Export(state) => Some(state),
1484                _ => None,
1485            })
1486            .ok_or(StorageError::IdentifierMissing(id))
1487    }
1488
1489    fn export_mut(
1490        &mut self,
1491        id: GlobalId,
1492    ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1493        self.collections
1494            .get_mut(&id)
1495            .and_then(|c| match &mut c.extra_state {
1496                CollectionStateExtra::Export(state) => Some(state),
1497                _ => None,
1498            })
1499            .ok_or(StorageError::IdentifierMissing(id))
1500    }
1501
1502    /// Create a oneshot ingestion.
1503    async fn create_oneshot_ingestion(
1504        &mut self,
1505        ingestion_id: uuid::Uuid,
1506        collection_id: GlobalId,
1507        instance_id: StorageInstanceId,
1508        request: OneshotIngestionRequest,
1509        result_tx: OneshotResultCallback<ProtoBatch>,
1510    ) -> Result<(), StorageError<Self::Timestamp>> {
1511        let collection_meta = self
1512            .collections
1513            .get(&collection_id)
1514            .ok_or_else(|| StorageError::IdentifierMissing(collection_id))?
1515            .collection_metadata
1516            .clone();
1517        let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1518            // TODO(cf2): Refine this error.
1519            StorageError::Generic(anyhow::anyhow!("missing cluster {instance_id}"))
1520        })?;
1521        let oneshot_cmd = RunOneshotIngestion {
1522            ingestion_id,
1523            collection_id,
1524            collection_meta,
1525            request,
1526        };
1527
1528        if !self.read_only {
1529            instance.send(StorageCommand::RunOneshotIngestion(Box::new(oneshot_cmd)));
1530            let pending = PendingOneshotIngestion {
1531                result_tx,
1532                cluster_id: instance_id,
1533            };
1534            let novel = self
1535                .pending_oneshot_ingestions
1536                .insert(ingestion_id, pending);
1537            assert_none!(novel);
1538            Ok(())
1539        } else {
1540            Err(StorageError::ReadOnly)
1541        }
1542    }
1543
1544    fn cancel_oneshot_ingestion(
1545        &mut self,
1546        ingestion_id: uuid::Uuid,
1547    ) -> Result<(), StorageError<Self::Timestamp>> {
1548        if self.read_only {
1549            return Err(StorageError::ReadOnly);
1550        }
1551
1552        let pending = self
1553            .pending_oneshot_ingestions
1554            .remove(&ingestion_id)
1555            .ok_or_else(|| {
1556                // TODO(cf2): Refine this error.
1557                StorageError::Generic(anyhow::anyhow!("missing oneshot ingestion {ingestion_id}"))
1558            })?;
1559
1560        match self.instances.get_mut(&pending.cluster_id) {
1561            Some(instance) => {
1562                instance.send(StorageCommand::CancelOneshotIngestion(ingestion_id));
1563            }
1564            None => {
1565                mz_ore::soft_panic_or_log!(
1566                    "canceling oneshot ingestion on non-existent cluster, ingestion {:?}, instance {}",
1567                    ingestion_id,
1568                    pending.cluster_id,
1569                );
1570            }
1571        }
1572        // Respond to the user that the request has been canceled.
1573        pending.cancel();
1574
1575        Ok(())
1576    }
1577
1578    async fn alter_export(
1579        &mut self,
1580        id: GlobalId,
1581        new_description: ExportDescription<Self::Timestamp>,
1582    ) -> Result<(), StorageError<Self::Timestamp>> {
1583        let from_id = new_description.sink.from;
1584
1585        // Acquire read holds at StorageCollections to ensure that the
1586        // sinked collection is not dropped while we're sinking it.
1587        let desired_read_holds = vec![from_id.clone(), id.clone()];
1588        let [input_hold, self_hold] = self
1589            .storage_collections
1590            .acquire_read_holds(desired_read_holds)
1591            .expect("missing dependency")
1592            .try_into()
1593            .expect("expected number of holds");
1594        let from_storage_metadata = self.storage_collections.collection_metadata(from_id)?;
1595        let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1596
1597        // Check whether the sink's write frontier is beyond the read hold we got
1598        let cur_export = self.export_mut(id)?;
1599        let input_readable = cur_export
1600            .write_frontier
1601            .iter()
1602            .all(|t| input_hold.since().less_than(t));
1603        if !input_readable {
1604            return Err(StorageError::ReadBeforeSince(from_id));
1605        }
1606
1607        let new_export = ExportState {
1608            read_capabilities: cur_export.read_capabilities.clone(),
1609            cluster_id: new_description.instance_id,
1610            derived_since: cur_export.derived_since.clone(),
1611            read_holds: [input_hold, self_hold],
1612            read_policy: cur_export.read_policy.clone(),
1613            write_frontier: cur_export.write_frontier.clone(),
1614        };
1615        *cur_export = new_export;
1616
1617        // For `ALTER SINK`, the snapshot should only occur if the sink has not made any progress.
1618        // This prevents unnecessary decoding in the sink.
1619        // If the write frontier of the sink is strictly larger than its read hold, it must have at
1620        // least written out its snapshot, and we can skip reading it; otherwise assume we may have
1621        // to replay from the beginning.
1622        // TODO(database-issues#10002): unify this with run_export, if possible
1623        let with_snapshot = new_description.sink.with_snapshot
1624            && !PartialOrder::less_than(&new_description.sink.as_of, &cur_export.write_frontier);
1625
1626        let cmd = RunSinkCommand {
1627            id,
1628            description: StorageSinkDesc {
1629                from: from_id,
1630                from_desc: new_description.sink.from_desc,
1631                connection: new_description.sink.connection,
1632                envelope: new_description.sink.envelope,
1633                as_of: new_description.sink.as_of,
1634                version: new_description.sink.version,
1635                from_storage_metadata,
1636                with_snapshot,
1637                to_storage_metadata,
1638                commit_interval: new_description.sink.commit_interval,
1639            },
1640        };
1641
1642        // Fetch the client for this export's cluster.
1643        let instance = self
1644            .instances
1645            .get_mut(&new_description.instance_id)
1646            .ok_or_else(|| StorageError::ExportInstanceMissing {
1647                storage_instance_id: new_description.instance_id,
1648                export_id: id,
1649            })?;
1650
1651        instance.send(StorageCommand::RunSink(Box::new(cmd)));
1652        Ok(())
1653    }
1654
1655    /// Create the sinks described by the `ExportDescription`.
1656    async fn alter_export_connections(
1657        &mut self,
1658        exports: BTreeMap<GlobalId, StorageSinkConnection>,
1659    ) -> Result<(), StorageError<Self::Timestamp>> {
1660        let mut updates_by_instance =
1661            BTreeMap::<StorageInstanceId, Vec<(RunSinkCommand<T>, ExportDescription<T>)>>::new();
1662
1663        for (id, connection) in exports {
1664            // We stage changes in new_export_description and then apply all
1665            // updates to exports at the end.
1666            //
1667            // We don't just go ahead and clone the `ExportState` itself and
1668            // update that because `ExportState` is not clone, because it holds
1669            // a `ReadHandle` and cloning that would cause additional work for
1670            // whoever guarantees those read holds.
1671            let (mut new_export_description, as_of): (ExportDescription<Self::Timestamp>, _) = {
1672                let export = &self.collections[&id];
1673                let DataSource::Sink { desc } = &export.data_source else {
1674                    panic!("export exists")
1675                };
1676                let CollectionStateExtra::Export(state) = &export.extra_state else {
1677                    panic!("export exists")
1678                };
1679                let export_description = desc.clone();
1680                let as_of = state.input_hold().since().clone();
1681
1682                (export_description, as_of)
1683            };
1684            let current_sink = new_export_description.sink.clone();
1685
1686            new_export_description.sink.connection = connection;
1687
1688            // Ensure compatibility
1689            current_sink.alter_compatible(id, &new_export_description.sink)?;
1690
1691            let from_storage_metadata = self
1692                .storage_collections
1693                .collection_metadata(new_export_description.sink.from)?;
1694            let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1695
1696            let cmd = RunSinkCommand {
1697                id,
1698                description: StorageSinkDesc {
1699                    from: new_export_description.sink.from,
1700                    from_desc: new_export_description.sink.from_desc.clone(),
1701                    connection: new_export_description.sink.connection.clone(),
1702                    envelope: new_export_description.sink.envelope,
1703                    with_snapshot: new_export_description.sink.with_snapshot,
1704                    version: new_export_description.sink.version,
1705                    // Here we are about to send a RunSinkCommand with the current read capaibility
1706                    // held by this sink. However, clusters are already running a version of the
1707                    // sink and nothing guarantees that by the time this command arrives at the
1708                    // clusters they won't have made additional progress such that this read
1709                    // capability is invalidated.
1710                    // The solution to this problem is for the controller to track specific
1711                    // executions of dataflows such that it can track the shutdown of the current
1712                    // instance and the initialization of the new instance separately and ensure
1713                    // read holds are held for the correct amount of time.
1714                    // TODO(petrosagg): change the controller to explicitly track dataflow executions
1715                    as_of: as_of.to_owned(),
1716                    from_storage_metadata,
1717                    to_storage_metadata,
1718                    commit_interval: new_export_description.sink.commit_interval,
1719                },
1720            };
1721
1722            let update = updates_by_instance
1723                .entry(new_export_description.instance_id)
1724                .or_default();
1725            update.push((cmd, new_export_description));
1726        }
1727
1728        for (instance_id, updates) in updates_by_instance {
1729            let mut export_updates = BTreeMap::new();
1730            let mut cmds = Vec::with_capacity(updates.len());
1731
1732            for (cmd, export_state) in updates {
1733                export_updates.insert(cmd.id, export_state);
1734                cmds.push(cmd);
1735            }
1736
1737            // Fetch the client for this exports's cluster.
1738            let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1739                StorageError::ExportInstanceMissing {
1740                    storage_instance_id: instance_id,
1741                    export_id: *export_updates
1742                        .keys()
1743                        .next()
1744                        .expect("set of exports not empty"),
1745                }
1746            })?;
1747
1748            for cmd in cmds {
1749                instance.send(StorageCommand::RunSink(Box::new(cmd)));
1750            }
1751
1752            // Update state only after all possible errors have occurred.
1753            for (id, new_export_description) in export_updates {
1754                let Some(state) = self.collections.get_mut(&id) else {
1755                    panic!("export known to exist")
1756                };
1757                let DataSource::Sink { desc } = &mut state.data_source else {
1758                    panic!("export known to exist")
1759                };
1760                *desc = new_export_description;
1761            }
1762        }
1763
1764        Ok(())
1765    }
1766
1767    // Dropping a table takes roughly the following flow:
1768    //
1769    // First determine if this is a TableWrites table or a source-fed table (an IngestionExport):
1770    //
1771    // If this is a TableWrites table:
1772    //   1. We remove the table from the persist table write worker.
1773    //   2. The table removal is awaited in an async task.
1774    //   3. A message is sent to the storage controller that the table has been removed from the
1775    //      table write worker.
1776    //   4. The controller drains all table drop messages during `process`.
1777    //   5. `process` calls `drop_sources` with the dropped tables.
1778    //
1779    // If this is an IngestionExport table:
1780    //   1. We validate the ids and then call drop_sources_unvalidated to proceed dropping.
1781    fn drop_tables(
1782        &mut self,
1783        storage_metadata: &StorageMetadata,
1784        identifiers: Vec<GlobalId>,
1785        ts: Self::Timestamp,
1786    ) -> Result<(), StorageError<Self::Timestamp>> {
1787        // Collect tables by their data_source
1788        let (table_write_ids, data_source_ids): (Vec<_>, Vec<_>) = identifiers
1789            .into_iter()
1790            .partition(|id| match self.collections[id].data_source {
1791                DataSource::Table => true,
1792                DataSource::IngestionExport { .. } | DataSource::Webhook => false,
1793                _ => panic!("identifier is not a table: {}", id),
1794            });
1795
1796        // Drop table write tables
1797        if table_write_ids.len() > 0 {
1798            let drop_notif = self
1799                .persist_table_worker
1800                .drop_handles(table_write_ids.clone(), ts);
1801            let tx = self.pending_table_handle_drops_tx.clone();
1802            mz_ore::task::spawn(|| "table-cleanup".to_string(), async move {
1803                drop_notif.await;
1804                for identifier in table_write_ids {
1805                    let _ = tx.send(identifier);
1806                }
1807            });
1808        }
1809
1810        // Drop source-fed tables
1811        if data_source_ids.len() > 0 {
1812            self.validate_collection_ids(data_source_ids.iter().cloned())?;
1813            self.drop_sources_unvalidated(storage_metadata, data_source_ids)?;
1814        }
1815
1816        Ok(())
1817    }
1818
1819    fn drop_sources(
1820        &mut self,
1821        storage_metadata: &StorageMetadata,
1822        identifiers: Vec<GlobalId>,
1823    ) -> Result<(), StorageError<Self::Timestamp>> {
1824        self.validate_collection_ids(identifiers.iter().cloned())?;
1825        self.drop_sources_unvalidated(storage_metadata, identifiers)
1826    }
1827
1828    fn drop_sources_unvalidated(
1829        &mut self,
1830        storage_metadata: &StorageMetadata,
1831        ids: Vec<GlobalId>,
1832    ) -> Result<(), StorageError<Self::Timestamp>> {
1833        // Keep track of which ingestions we have to execute still, and which we
1834        // have to change because of dropped subsources.
1835        let mut ingestions_to_execute = BTreeSet::new();
1836        let mut ingestions_to_drop = BTreeSet::new();
1837        let mut source_statistics_to_drop = Vec::new();
1838
1839        // Ingestions (and their exports) are also collections, but we keep
1840        // track of non-ingestion collections separately, because they have
1841        // slightly different cleanup logic below.
1842        let mut collections_to_drop = Vec::new();
1843
1844        for id in ids.iter() {
1845            let collection_state = self.collections.get(id);
1846
1847            if let Some(collection_state) = collection_state {
1848                match collection_state.data_source {
1849                    DataSource::Webhook => {
1850                        // TODO(parkmycar): The Collection Manager and PersistMonotonicWriter
1851                        // could probably use some love and maybe get merged together?
1852                        let fut = self.collection_manager.unregister_collection(*id);
1853                        mz_ore::task::spawn(|| format!("storage-webhook-cleanup-{id}"), fut);
1854
1855                        collections_to_drop.push(*id);
1856                        source_statistics_to_drop.push(*id);
1857                    }
1858                    DataSource::Ingestion(_) => {
1859                        ingestions_to_drop.insert(*id);
1860                        source_statistics_to_drop.push(*id);
1861                    }
1862                    DataSource::IngestionExport { ingestion_id, .. } => {
1863                        // If we are dropping source exports, we need to modify the
1864                        // ingestion that it runs on.
1865                        //
1866                        // If we remove this export, we need to stop producing data to
1867                        // it, so plan to re-execute the ingestion with the amended
1868                        // description.
1869                        ingestions_to_execute.insert(ingestion_id);
1870
1871                        // Adjust the source to remove this export.
1872                        let ingestion_state = match self.collections.get_mut(&ingestion_id) {
1873                            Some(ingestion_collection) => ingestion_collection,
1874                            // Primary ingestion already dropped.
1875                            None => {
1876                                tracing::error!(
1877                                    "primary source {ingestion_id} seemingly dropped before subsource {id}"
1878                                );
1879                                continue;
1880                            }
1881                        };
1882
1883                        match &mut ingestion_state.data_source {
1884                            DataSource::Ingestion(ingestion_desc) => {
1885                                let removed = ingestion_desc.source_exports.remove(id);
1886                                mz_ore::soft_assert_or_log!(
1887                                    removed.is_some(),
1888                                    "dropped subsource {id} already removed from source exports"
1889                                );
1890                            }
1891                            _ => unreachable!(
1892                                "SourceExport must only refer to primary sources that already exist"
1893                            ),
1894                        };
1895
1896                        // Ingestion exports also have ReadHolds that we need to
1897                        // downgrade, and much of their drop machinery is the
1898                        // same as for the "main" ingestion.
1899                        ingestions_to_drop.insert(*id);
1900                        source_statistics_to_drop.push(*id);
1901                    }
1902                    DataSource::Progress | DataSource::Table | DataSource::Other => {
1903                        collections_to_drop.push(*id);
1904                    }
1905                    DataSource::Introspection(_) | DataSource::Sink { .. } => {
1906                        // Collections of these types are either not sources and should be dropped
1907                        // through other means, or are sources but should never be dropped.
1908                        soft_panic_or_log!(
1909                            "drop_sources called on a {:?} (id={id}))",
1910                            collection_state.data_source,
1911                        );
1912                    }
1913                }
1914            }
1915        }
1916
1917        // Do not bother re-executing ingestions we know we plan to drop.
1918        ingestions_to_execute.retain(|id| !ingestions_to_drop.contains(id));
1919        for ingestion_id in ingestions_to_execute {
1920            self.run_ingestion(ingestion_id)?;
1921        }
1922
1923        // For ingestions, we fabricate a new hold that will propagate through
1924        // the cluster and then back to us.
1925
1926        // We don't explicitly remove read capabilities! Downgrading the
1927        // frontier of the source to `[]` (the empty Antichain), will propagate
1928        // to the storage dependencies.
1929        let ingestion_policies = ingestions_to_drop
1930            .iter()
1931            .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
1932            .collect();
1933
1934        tracing::debug!(
1935            ?ingestion_policies,
1936            "dropping sources by setting read hold policies"
1937        );
1938        self.set_hold_policies(ingestion_policies);
1939
1940        // Delete all collection->shard mappings
1941        let shards_to_update: BTreeSet<_> = ingestions_to_drop
1942            .iter()
1943            .chain(collections_to_drop.iter())
1944            .cloned()
1945            .collect();
1946        self.append_shard_mappings(shards_to_update.into_iter(), Diff::MINUS_ONE);
1947
1948        let status_now = mz_ore::now::to_datetime((self.now)());
1949        let mut status_updates = vec![];
1950        for id in ingestions_to_drop.iter() {
1951            status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
1952        }
1953
1954        if !self.read_only {
1955            self.append_status_introspection_updates(
1956                IntrospectionType::SourceStatusHistory,
1957                status_updates,
1958            );
1959        }
1960
1961        {
1962            let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1963            for id in source_statistics_to_drop {
1964                source_statistics
1965                    .source_statistics
1966                    .retain(|(stats_id, _), _| stats_id != &id);
1967                source_statistics
1968                    .webhook_statistics
1969                    .retain(|stats_id, _| stats_id != &id);
1970            }
1971        }
1972
1973        // Remove collection state
1974        for id in ingestions_to_drop.iter().chain(collections_to_drop.iter()) {
1975            tracing::info!(%id, "dropping collection state");
1976            let collection = self
1977                .collections
1978                .remove(id)
1979                .expect("list populated after checking that self.collections contains it");
1980
1981            let instance = match &collection.extra_state {
1982                CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
1983                CollectionStateExtra::Export(export) => Some(export.cluster_id()),
1984                CollectionStateExtra::None => None,
1985            }
1986            .and_then(|i| self.instances.get(&i));
1987
1988            // Record which replicas were running a collection, so that we can
1989            // match DroppedId messages against them and eventually remove state
1990            // from self.dropped_objects
1991            if let Some(instance) = instance {
1992                let active_replicas = instance.get_active_replicas_for_object(id);
1993                if !active_replicas.is_empty() {
1994                    // The remap collection of an ingestion doesn't have extra
1995                    // state and doesn't have an instance_id, but we still get
1996                    // upper updates for them, so want to make sure to populate
1997                    // dropped_ids.
1998                    // TODO(aljoscha): All this is a bit icky. But here we are
1999                    // for now...
2000                    match &collection.data_source {
2001                        DataSource::Ingestion(ingestion_desc) => {
2002                            if *id != ingestion_desc.remap_collection_id {
2003                                self.dropped_objects.insert(
2004                                    ingestion_desc.remap_collection_id,
2005                                    active_replicas.clone(),
2006                                );
2007                            }
2008                        }
2009                        _ => {}
2010                    }
2011
2012                    self.dropped_objects.insert(*id, active_replicas);
2013                }
2014            }
2015        }
2016
2017        // Also let StorageCollections know!
2018        self.storage_collections
2019            .drop_collections_unvalidated(storage_metadata, ids);
2020
2021        Ok(())
2022    }
2023
2024    /// Drops the read capability for the sinks and allows their resources to be reclaimed.
2025    fn drop_sinks(
2026        &mut self,
2027        storage_metadata: &StorageMetadata,
2028        identifiers: Vec<GlobalId>,
2029    ) -> Result<(), StorageError<Self::Timestamp>> {
2030        self.validate_export_ids(identifiers.iter().cloned())?;
2031        self.drop_sinks_unvalidated(storage_metadata, identifiers);
2032        Ok(())
2033    }
2034
2035    fn drop_sinks_unvalidated(
2036        &mut self,
2037        storage_metadata: &StorageMetadata,
2038        mut sinks_to_drop: Vec<GlobalId>,
2039    ) {
2040        // Ignore exports that have already been removed.
2041        sinks_to_drop.retain(|id| self.export(*id).is_ok());
2042
2043        // TODO: ideally we'd advance the write frontier ourselves here, but this function's
2044        // not yet marked async.
2045
2046        // We don't explicitly remove read capabilities! Downgrading the
2047        // frontier of the source to `[]` (the empty Antichain), will propagate
2048        // to the storage dependencies.
2049        let drop_policy = sinks_to_drop
2050            .iter()
2051            .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
2052            .collect();
2053
2054        tracing::debug!(
2055            ?drop_policy,
2056            "dropping sources by setting read hold policies"
2057        );
2058        self.set_hold_policies(drop_policy);
2059
2060        // Record the drop status for all sink drops.
2061        //
2062        // We also delete the items' statistics objects.
2063        //
2064        // The locks are held for a short time, only while we do some removals from a map.
2065
2066        let status_now = mz_ore::now::to_datetime((self.now)());
2067
2068        // Record the drop status for all pending sink drops.
2069        let mut status_updates = vec![];
2070        {
2071            let mut sink_statistics = self.sink_statistics.lock().expect("poisoned");
2072            for id in sinks_to_drop.iter() {
2073                status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
2074                sink_statistics.retain(|(stats_id, _), _| stats_id != id);
2075            }
2076        }
2077
2078        if !self.read_only {
2079            self.append_status_introspection_updates(
2080                IntrospectionType::SinkStatusHistory,
2081                status_updates,
2082            );
2083        }
2084
2085        // Remove collection/export state
2086        for id in sinks_to_drop.iter() {
2087            tracing::info!(%id, "dropping export state");
2088            let collection = self
2089                .collections
2090                .remove(id)
2091                .expect("list populated after checking that self.collections contains it");
2092
2093            let instance = match &collection.extra_state {
2094                CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
2095                CollectionStateExtra::Export(export) => Some(export.cluster_id()),
2096                CollectionStateExtra::None => None,
2097            }
2098            .and_then(|i| self.instances.get(&i));
2099
2100            // Record how many replicas were running an export, so that we can
2101            // match `DroppedId` messages against it and eventually remove state
2102            // from `self.dropped_objects`.
2103            if let Some(instance) = instance {
2104                let active_replicas = instance.get_active_replicas_for_object(id);
2105                if !active_replicas.is_empty() {
2106                    self.dropped_objects.insert(*id, active_replicas);
2107                }
2108            }
2109        }
2110
2111        // Also let StorageCollections know!
2112        self.storage_collections
2113            .drop_collections_unvalidated(storage_metadata, sinks_to_drop);
2114    }
2115
2116    #[instrument(level = "debug")]
2117    fn append_table(
2118        &mut self,
2119        write_ts: Self::Timestamp,
2120        advance_to: Self::Timestamp,
2121        commands: Vec<(GlobalId, Vec<TableData>)>,
2122    ) -> Result<
2123        tokio::sync::oneshot::Receiver<Result<(), StorageError<Self::Timestamp>>>,
2124        StorageError<Self::Timestamp>,
2125    > {
2126        if self.read_only {
2127            // While in read only mode, ONLY collections that have been migrated
2128            // and need to be re-hydrated in read only mode can be written to.
2129            if !commands
2130                .iter()
2131                .all(|(id, _)| id.is_system() && self.migrated_storage_collections.contains(id))
2132            {
2133                return Err(StorageError::ReadOnly);
2134            }
2135        }
2136
2137        // TODO(petrosagg): validate appends against the expected RelationDesc of the collection
2138        for (id, updates) in commands.iter() {
2139            if !updates.is_empty() {
2140                if !write_ts.less_than(&advance_to) {
2141                    return Err(StorageError::UpdateBeyondUpper(*id));
2142                }
2143            }
2144        }
2145
2146        Ok(self
2147            .persist_table_worker
2148            .append(write_ts, advance_to, commands))
2149    }
2150
2151    fn monotonic_appender(
2152        &self,
2153        id: GlobalId,
2154    ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>> {
2155        self.collection_manager.monotonic_appender(id)
2156    }
2157
2158    fn webhook_statistics(
2159        &self,
2160        id: GlobalId,
2161    ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>> {
2162        // Call to this method are usually cached so the lock is not in the critical path.
2163        let source_statistics = self.source_statistics.lock().expect("poisoned");
2164        source_statistics
2165            .webhook_statistics
2166            .get(&id)
2167            .cloned()
2168            .ok_or(StorageError::IdentifierMissing(id))
2169    }
2170
2171    async fn ready(&mut self) {
2172        if self.maintenance_scheduled {
2173            return;
2174        }
2175
2176        if !self.pending_table_handle_drops_rx.is_empty() {
2177            return;
2178        }
2179
2180        tokio::select! {
2181            Some(m) = self.instance_response_rx.recv() => {
2182                self.stashed_responses.push(m);
2183                while let Ok(m) = self.instance_response_rx.try_recv() {
2184                    self.stashed_responses.push(m);
2185                }
2186            }
2187            _ = self.maintenance_ticker.tick() => {
2188                self.maintenance_scheduled = true;
2189            },
2190        };
2191    }
2192
2193    #[instrument(level = "debug")]
2194    fn process(
2195        &mut self,
2196        storage_metadata: &StorageMetadata,
2197    ) -> Result<Option<Response<T>>, anyhow::Error> {
2198        // Perform periodic maintenance work.
2199        if self.maintenance_scheduled {
2200            self.maintain();
2201            self.maintenance_scheduled = false;
2202        }
2203
2204        for instance in self.instances.values_mut() {
2205            instance.rehydrate_failed_replicas();
2206        }
2207
2208        let mut status_updates = vec![];
2209        let mut updated_frontiers = BTreeMap::new();
2210
2211        // Take the currently stashed responses so that we can call mut receiver functions in the loop.
2212        let stashed_responses = std::mem::take(&mut self.stashed_responses);
2213        for resp in stashed_responses {
2214            match resp {
2215                (_replica_id, StorageResponse::FrontierUpper(id, upper)) => {
2216                    self.update_write_frontier(id, &upper);
2217                    updated_frontiers.insert(id, upper);
2218                }
2219                (replica_id, StorageResponse::DroppedId(id)) => {
2220                    let replica_id = replica_id.expect("DroppedId from unknown replica");
2221                    if let Some(remaining_replicas) = self.dropped_objects.get_mut(&id) {
2222                        remaining_replicas.remove(&replica_id);
2223                        if remaining_replicas.is_empty() {
2224                            self.dropped_objects.remove(&id);
2225                        }
2226                    } else {
2227                        soft_panic_or_log!("unexpected DroppedId for {id}");
2228                    }
2229                }
2230                (replica_id, StorageResponse::StatisticsUpdates(source_stats, sink_stats)) => {
2231                    // Note we only hold the locks while moving some plain-old-data around here.
2232                    {
2233                        // NOTE(aljoscha): We explicitly unwrap the `Option`,
2234                        // because we expect that stats coming from replicas
2235                        // have a replica id.
2236                        //
2237                        // If this is `None` and we would use that to access
2238                        // state below we might clobber something unexpectedly.
2239                        let replica_id = if let Some(replica_id) = replica_id {
2240                            replica_id
2241                        } else {
2242                            tracing::error!(
2243                                ?source_stats,
2244                                "missing replica_id for source statistics update"
2245                            );
2246                            continue;
2247                        };
2248
2249                        let mut shared_stats = self.source_statistics.lock().expect("poisoned");
2250
2251                        for stat in source_stats {
2252                            let collection_id = stat.id.clone();
2253
2254                            if self.collection(collection_id).is_err() {
2255                                // We can get updates for collections that have
2256                                // already been deleted, ignore those.
2257                                continue;
2258                            }
2259
2260                            let entry = shared_stats
2261                                .source_statistics
2262                                .entry((stat.id, Some(replica_id)));
2263
2264                            match entry {
2265                                btree_map::Entry::Vacant(vacant_entry) => {
2266                                    let mut stats = ControllerSourceStatistics::new(
2267                                        collection_id,
2268                                        Some(replica_id),
2269                                    );
2270                                    stats.incorporate(stat);
2271                                    vacant_entry.insert(stats);
2272                                }
2273                                btree_map::Entry::Occupied(mut occupied_entry) => {
2274                                    occupied_entry.get_mut().incorporate(stat);
2275                                }
2276                            }
2277                        }
2278                    }
2279
2280                    {
2281                        // NOTE(aljoscha); Same as above. We want to be
2282                        // explicit.
2283                        //
2284                        // Also, technically for sinks there is no webhook
2285                        // "sources" that would force us to use an `Option`. But
2286                        // we still have to use an option for other reasons: the
2287                        // scraper expects a trait for working with stats, and
2288                        // that in the end forces the sink stats map to also
2289                        // have `Option` in the key. Do I like that? No, but
2290                        // here we are.
2291                        let replica_id = if let Some(replica_id) = replica_id {
2292                            replica_id
2293                        } else {
2294                            tracing::error!(
2295                                ?sink_stats,
2296                                "missing replica_id for sink statistics update"
2297                            );
2298                            continue;
2299                        };
2300
2301                        let mut shared_stats = self.sink_statistics.lock().expect("poisoned");
2302
2303                        for stat in sink_stats {
2304                            let collection_id = stat.id.clone();
2305
2306                            if self.collection(collection_id).is_err() {
2307                                // We can get updates for collections that have
2308                                // already been deleted, ignore those.
2309                                continue;
2310                            }
2311
2312                            let entry = shared_stats.entry((stat.id, Some(replica_id)));
2313
2314                            match entry {
2315                                btree_map::Entry::Vacant(vacant_entry) => {
2316                                    let mut stats =
2317                                        ControllerSinkStatistics::new(collection_id, replica_id);
2318                                    stats.incorporate(stat);
2319                                    vacant_entry.insert(stats);
2320                                }
2321                                btree_map::Entry::Occupied(mut occupied_entry) => {
2322                                    occupied_entry.get_mut().incorporate(stat);
2323                                }
2324                            }
2325                        }
2326                    }
2327                }
2328                (replica_id, StorageResponse::StatusUpdate(mut status_update)) => {
2329                    // NOTE(aljoscha): We sniff out the hydration status for
2330                    // ingestions from status updates. This is the easiest we
2331                    // can do right now, without going deeper into changing the
2332                    // comms protocol between controller and cluster. We cannot,
2333                    // for example use `StorageResponse::FrontierUpper`,
2334                    // because those will already get sent when the ingestion is
2335                    // just being created.
2336                    //
2337                    // Sources differ in when they will report as Running. Kafka
2338                    // UPSERT sources will only switch to `Running` once their
2339                    // state has been initialized from persist, which is the
2340                    // first case that we care about right now.
2341                    //
2342                    // I wouldn't say it's ideal, but it's workable until we
2343                    // find something better.
2344                    match status_update.status {
2345                        Status::Running => {
2346                            let collection = self.collections.get_mut(&status_update.id);
2347                            match collection {
2348                                Some(collection) => {
2349                                    match collection.extra_state {
2350                                        CollectionStateExtra::Ingestion(
2351                                            ref mut ingestion_state,
2352                                        ) => {
2353                                            if ingestion_state.hydrated_on.is_empty() {
2354                                                tracing::debug!(ingestion_id = %status_update.id, "ingestion is hydrated");
2355                                            }
2356                                            ingestion_state.hydrated_on.insert(replica_id.expect(
2357                                                "replica id should be present for status running",
2358                                            ));
2359                                        }
2360                                        CollectionStateExtra::Export(_) => {
2361                                            // TODO(sinks): track sink hydration?
2362                                        }
2363                                        CollectionStateExtra::None => {
2364                                            // Nothing to do
2365                                        }
2366                                    }
2367                                }
2368                                None => (), // no collection, let's say that's fine
2369                                            // here
2370                            }
2371                        }
2372                        Status::Paused => {
2373                            let collection = self.collections.get_mut(&status_update.id);
2374                            match collection {
2375                                Some(collection) => {
2376                                    match collection.extra_state {
2377                                        CollectionStateExtra::Ingestion(
2378                                            ref mut ingestion_state,
2379                                        ) => {
2380                                            // TODO: Paused gets send when there
2381                                            // are no active replicas. We should
2382                                            // change this to send a targeted
2383                                            // Pause for each replica, and do
2384                                            // more fine-grained hydration
2385                                            // tracking here.
2386                                            tracing::debug!(ingestion_id = %status_update.id, "ingestion is now paused");
2387                                            ingestion_state.hydrated_on.clear();
2388                                        }
2389                                        CollectionStateExtra::Export(_) => {
2390                                            // TODO(sinks): track sink hydration?
2391                                        }
2392                                        CollectionStateExtra::None => {
2393                                            // Nothing to do
2394                                        }
2395                                    }
2396                                }
2397                                None => (), // no collection, let's say that's fine
2398                                            // here
2399                            }
2400                        }
2401                        _ => (),
2402                    }
2403
2404                    // Set replica_id in the status update if available
2405                    if let Some(id) = replica_id {
2406                        status_update.replica_id = Some(id);
2407                    }
2408                    status_updates.push(status_update);
2409                }
2410                (_replica_id, StorageResponse::StagedBatches(batches)) => {
2411                    for (ingestion_id, batches) in batches {
2412                        match self.pending_oneshot_ingestions.remove(&ingestion_id) {
2413                            Some(pending) => {
2414                                // Send a cancel command so our command history is correct. And to
2415                                // avoid duplicate work once we have active replication.
2416                                if let Some(instance) = self.instances.get_mut(&pending.cluster_id)
2417                                {
2418                                    instance
2419                                        .send(StorageCommand::CancelOneshotIngestion(ingestion_id));
2420                                }
2421                                // Send the results down our channel.
2422                                (pending.result_tx)(batches)
2423                            }
2424                            None => {
2425                                // We might not be tracking this oneshot ingestion anymore because
2426                                // it was canceled.
2427                            }
2428                        }
2429                    }
2430                }
2431            }
2432        }
2433
2434        self.record_status_updates(status_updates);
2435
2436        // Process dropped tables in a single batch.
2437        let mut dropped_table_ids = Vec::new();
2438        while let Ok(dropped_id) = self.pending_table_handle_drops_rx.try_recv() {
2439            dropped_table_ids.push(dropped_id);
2440        }
2441        if !dropped_table_ids.is_empty() {
2442            self.drop_sources(storage_metadata, dropped_table_ids)?;
2443        }
2444
2445        if updated_frontiers.is_empty() {
2446            Ok(None)
2447        } else {
2448            Ok(Some(Response::FrontierUpdates(
2449                updated_frontiers.into_iter().collect(),
2450            )))
2451        }
2452    }
2453
2454    async fn inspect_persist_state(
2455        &self,
2456        id: GlobalId,
2457    ) -> Result<serde_json::Value, anyhow::Error> {
2458        let collection = &self.storage_collections.collection_metadata(id)?;
2459        let client = self
2460            .persist
2461            .open(collection.persist_location.clone())
2462            .await?;
2463        let shard_state = client
2464            .inspect_shard::<Self::Timestamp>(&collection.data_shard)
2465            .await?;
2466        let json_state = serde_json::to_value(shard_state)?;
2467        Ok(json_state)
2468    }
2469
2470    fn append_introspection_updates(
2471        &mut self,
2472        type_: IntrospectionType,
2473        updates: Vec<(Row, Diff)>,
2474    ) {
2475        let id = self.introspection_ids[&type_];
2476        let updates = updates.into_iter().map(|update| update.into()).collect();
2477        self.collection_manager.blind_write(id, updates);
2478    }
2479
2480    fn append_status_introspection_updates(
2481        &mut self,
2482        type_: IntrospectionType,
2483        updates: Vec<StatusUpdate>,
2484    ) {
2485        let id = self.introspection_ids[&type_];
2486        let updates: Vec<_> = updates.into_iter().map(|update| update.into()).collect();
2487        if !updates.is_empty() {
2488            self.collection_manager.blind_write(id, updates);
2489        }
2490    }
2491
2492    fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp) {
2493        let id = self.introspection_ids[&type_];
2494        self.collection_manager.differential_write(id, op);
2495    }
2496
2497    fn append_only_introspection_tx(
2498        &self,
2499        type_: IntrospectionType,
2500    ) -> mpsc::UnboundedSender<(
2501        Vec<AppendOnlyUpdate>,
2502        oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2503    )> {
2504        let id = self.introspection_ids[&type_];
2505        self.collection_manager.append_only_write_sender(id)
2506    }
2507
2508    fn differential_introspection_tx(
2509        &self,
2510        type_: IntrospectionType,
2511    ) -> mpsc::UnboundedSender<(
2512        StorageWriteOp,
2513        oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2514    )> {
2515        let id = self.introspection_ids[&type_];
2516        self.collection_manager.differential_write_sender(id)
2517    }
2518
2519    async fn real_time_recent_timestamp(
2520        &self,
2521        timestamp_objects: BTreeSet<GlobalId>,
2522        timeout: Duration,
2523    ) -> Result<
2524        BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>,
2525        StorageError<Self::Timestamp>,
2526    > {
2527        use mz_storage_types::sources::GenericSourceConnection;
2528
2529        let mut rtr_futures = BTreeMap::new();
2530
2531        // Only user sources can be read from w/ RTR.
2532        for id in timestamp_objects.into_iter().filter(GlobalId::is_user) {
2533            let collection = match self.collection(id) {
2534                Ok(c) => c,
2535                // Not a storage item, which we accept.
2536                Err(_) => continue,
2537            };
2538
2539            let (source_conn, remap_id) = match &collection.data_source {
2540                DataSource::Ingestion(IngestionDescription {
2541                    desc: SourceDesc { connection, .. },
2542                    remap_collection_id,
2543                    ..
2544                }) => match connection {
2545                    GenericSourceConnection::Kafka(_)
2546                    | GenericSourceConnection::Postgres(_)
2547                    | GenericSourceConnection::MySql(_)
2548                    | GenericSourceConnection::SqlServer(_) => {
2549                        (connection.clone(), *remap_collection_id)
2550                    }
2551
2552                    // These internal sources do not yet (and might never)
2553                    // support RTR. However, erroring if they're selected from
2554                    // poses an annoying user experience, so instead just skip
2555                    // over them.
2556                    GenericSourceConnection::LoadGenerator(_) => continue,
2557                },
2558                // Skip over all other objects
2559                _ => {
2560                    continue;
2561                }
2562            };
2563
2564            // Prepare for getting the external system's frontier.
2565            let config = self.config().clone();
2566
2567            // Determine the remap collection we plan to read from.
2568            //
2569            // Note that the process of reading from the remap shard is the same
2570            // as other areas in this code that do the same thing, but we inline
2571            // it here because we must prove that we have not taken ownership of
2572            // `self` to move the stream of data from the remap shard into a
2573            // future.
2574            let read_handle = self.read_handle_for_snapshot(remap_id).await?;
2575
2576            // Have to acquire a read hold to prevent the since from advancing
2577            // while we read.
2578            let remap_read_hold = self
2579                .storage_collections
2580                .acquire_read_holds(vec![remap_id])
2581                .map_err(|_e| StorageError::ReadBeforeSince(remap_id))?
2582                .expect_element(|| "known to be exactly one");
2583
2584            let remap_as_of = remap_read_hold
2585                .since()
2586                .to_owned()
2587                .into_option()
2588                .ok_or(StorageError::ReadBeforeSince(remap_id))?;
2589
2590            rtr_futures.insert(
2591                id,
2592                tokio::time::timeout(timeout, async move {
2593                    use mz_storage_types::sources::SourceConnection as _;
2594
2595                    // Fetch the remap shard's contents; we must do this first so
2596                    // that the `as_of` doesn't change.
2597                    let as_of = Antichain::from_elem(remap_as_of);
2598                    let remap_subscribe = read_handle
2599                        .subscribe(as_of.clone())
2600                        .await
2601                        .map_err(|_| StorageError::ReadBeforeSince(remap_id))?;
2602
2603                    tracing::debug!(?id, type_ = source_conn.name(), upstream = ?source_conn.external_reference(), "fetching real time recency");
2604
2605                    let result = rtr::real_time_recency_ts(
2606                        source_conn,
2607                        id,
2608                        config,
2609                        as_of,
2610                        remap_subscribe,
2611                    )
2612                    .await
2613                    .map_err(|e| {
2614                            tracing::debug!(?id, "real time recency error: {:?}", e);
2615                            e
2616                        });
2617
2618                    // Drop once we have read succesfully.
2619                    drop(remap_read_hold);
2620
2621                    result
2622                }),
2623            );
2624        }
2625
2626        Ok(Box::pin(async move {
2627            let (ids, futs): (Vec<_>, Vec<_>) = rtr_futures.into_iter().unzip();
2628            ids.into_iter()
2629                .zip_eq(futures::future::join_all(futs).await)
2630                .try_fold(T::minimum(), |curr, (id, per_source_res)| {
2631                    let new =
2632                        per_source_res.map_err(|_e: Elapsed| StorageError::RtrTimeout(id))??;
2633                    Ok::<_, StorageError<Self::Timestamp>>(std::cmp::max(curr, new))
2634                })
2635        }))
2636    }
2637
2638    fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2639        // Destructure `self` here so we don't forget to consider dumping newly added fields.
2640        let Self {
2641            build_info: _,
2642            now: _,
2643            read_only,
2644            collections,
2645            dropped_objects,
2646            persist_table_worker: _,
2647            txns_read: _,
2648            txns_metrics: _,
2649            stashed_responses,
2650            pending_table_handle_drops_tx: _,
2651            pending_table_handle_drops_rx: _,
2652            pending_oneshot_ingestions,
2653            collection_manager: _,
2654            introspection_ids,
2655            introspection_tokens: _,
2656            source_statistics: _,
2657            sink_statistics: _,
2658            statistics_interval_sender: _,
2659            instances,
2660            initialized,
2661            config,
2662            persist_location,
2663            persist: _,
2664            metrics: _,
2665            recorded_frontiers,
2666            recorded_replica_frontiers,
2667            wallclock_lag: _,
2668            wallclock_lag_last_recorded,
2669            storage_collections: _,
2670            migrated_storage_collections,
2671            maintenance_ticker: _,
2672            maintenance_scheduled,
2673            instance_response_tx: _,
2674            instance_response_rx: _,
2675            persist_warm_task: _,
2676        } = self;
2677
2678        let collections: BTreeMap<_, _> = collections
2679            .iter()
2680            .map(|(id, c)| (id.to_string(), format!("{c:?}")))
2681            .collect();
2682        let dropped_objects: BTreeMap<_, _> = dropped_objects
2683            .iter()
2684            .map(|(id, rs)| (id.to_string(), format!("{rs:?}")))
2685            .collect();
2686        let stashed_responses: Vec<_> =
2687            stashed_responses.iter().map(|r| format!("{r:?}")).collect();
2688        let pending_oneshot_ingestions: BTreeMap<_, _> = pending_oneshot_ingestions
2689            .iter()
2690            .map(|(uuid, i)| (uuid.to_string(), format!("{i:?}")))
2691            .collect();
2692        let introspection_ids: BTreeMap<_, _> = introspection_ids
2693            .iter()
2694            .map(|(typ, id)| (format!("{typ:?}"), id.to_string()))
2695            .collect();
2696        let instances: BTreeMap<_, _> = instances
2697            .iter()
2698            .map(|(id, i)| (id.to_string(), format!("{i:?}")))
2699            .collect();
2700        let recorded_frontiers: BTreeMap<_, _> = recorded_frontiers
2701            .iter()
2702            .map(|(id, fs)| (id.to_string(), format!("{fs:?}")))
2703            .collect();
2704        let recorded_replica_frontiers: Vec<_> = recorded_replica_frontiers
2705            .iter()
2706            .map(|((gid, rid), f)| (gid.to_string(), rid.to_string(), format!("{f:?}")))
2707            .collect();
2708        let migrated_storage_collections: Vec<_> = migrated_storage_collections
2709            .iter()
2710            .map(|id| id.to_string())
2711            .collect();
2712
2713        Ok(serde_json::json!({
2714            "read_only": read_only,
2715            "collections": collections,
2716            "dropped_objects": dropped_objects,
2717            "stashed_responses": stashed_responses,
2718            "pending_oneshot_ingestions": pending_oneshot_ingestions,
2719            "introspection_ids": introspection_ids,
2720            "instances": instances,
2721            "initialized": initialized,
2722            "config": format!("{config:?}"),
2723            "persist_location": format!("{persist_location:?}"),
2724            "recorded_frontiers": recorded_frontiers,
2725            "recorded_replica_frontiers": recorded_replica_frontiers,
2726            "wallclock_lag_last_recorded": format!("{wallclock_lag_last_recorded:?}"),
2727            "migrated_storage_collections": migrated_storage_collections,
2728            "maintenance_scheduled": maintenance_scheduled,
2729        }))
2730    }
2731}
2732
2733/// Seed [`StorageTxn`] with any state required to instantiate a
2734/// [`StorageController`].
2735///
2736/// This cannot be a member of [`StorageController`] because it cannot take a
2737/// `self` parameter.
2738///
2739pub fn prepare_initialization<T>(txn: &mut dyn StorageTxn<T>) -> Result<(), StorageError<T>> {
2740    if txn.get_txn_wal_shard().is_none() {
2741        let txns_id = ShardId::new();
2742        txn.write_txn_wal_shard(txns_id)?;
2743    }
2744
2745    Ok(())
2746}
2747
2748impl<T> Controller<T>
2749where
2750    T: Timestamp
2751        + Lattice
2752        + TotalOrder
2753        + Codec64
2754        + From<EpochMillis>
2755        + TimestampManipulation
2756        + Into<Datum<'static>>,
2757    Self: StorageController<Timestamp = T>,
2758{
2759    /// Create a new storage controller from a client it should wrap.
2760    ///
2761    /// Note that when creating a new storage controller, you must also
2762    /// reconcile it with the previous state.
2763    ///
2764    /// # Panics
2765    /// If this function is called before [`prepare_initialization`].
2766    pub async fn new(
2767        build_info: &'static BuildInfo,
2768        persist_location: PersistLocation,
2769        persist_clients: Arc<PersistClientCache>,
2770        now: NowFn,
2771        wallclock_lag: WallclockLagFn<T>,
2772        txns_metrics: Arc<TxnMetrics>,
2773        read_only: bool,
2774        metrics_registry: &MetricsRegistry,
2775        controller_metrics: ControllerMetrics,
2776        connection_context: ConnectionContext,
2777        txn: &dyn StorageTxn<T>,
2778        storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
2779    ) -> Self {
2780        let txns_client = persist_clients
2781            .open(persist_location.clone())
2782            .await
2783            .expect("location should be valid");
2784
2785        let persist_warm_task = warm_persist_state_in_background(
2786            txns_client.clone(),
2787            txn.get_collection_metadata().into_values(),
2788        );
2789        let persist_warm_task = Some(persist_warm_task.abort_on_drop());
2790
2791        // This value must be already installed because we must ensure it's
2792        // durably recorded before it is used, otherwise we risk leaking persist
2793        // state.
2794        let txns_id = txn
2795            .get_txn_wal_shard()
2796            .expect("must call prepare initialization before creating storage controller");
2797
2798        let persist_table_worker = if read_only {
2799            let txns_write = txns_client
2800                .open_writer(
2801                    txns_id,
2802                    Arc::new(TxnsCodecRow::desc()),
2803                    Arc::new(UnitSchema),
2804                    Diagnostics {
2805                        shard_name: "txns".to_owned(),
2806                        handle_purpose: "follow txns upper".to_owned(),
2807                    },
2808                )
2809                .await
2810                .expect("txns schema shouldn't change");
2811            persist_handles::PersistTableWriteWorker::new_read_only_mode(txns_write)
2812        } else {
2813            let mut txns = TxnsHandle::open(
2814                T::minimum(),
2815                txns_client.clone(),
2816                txns_client.dyncfgs().clone(),
2817                Arc::clone(&txns_metrics),
2818                txns_id,
2819                Opaque::encode(&PersistEpoch::default()),
2820            )
2821            .await;
2822            txns.upgrade_version().await;
2823            persist_handles::PersistTableWriteWorker::new_txns(txns)
2824        };
2825        let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
2826
2827        let collection_manager = collection_mgmt::CollectionManager::new(read_only, now.clone());
2828
2829        let introspection_ids = BTreeMap::new();
2830        let introspection_tokens = Arc::new(Mutex::new(BTreeMap::new()));
2831
2832        let (statistics_interval_sender, _) =
2833            channel(mz_storage_types::parameters::STATISTICS_INTERVAL_DEFAULT);
2834
2835        let (pending_table_handle_drops_tx, pending_table_handle_drops_rx) =
2836            tokio::sync::mpsc::unbounded_channel();
2837
2838        let mut maintenance_ticker = tokio::time::interval(Duration::from_secs(1));
2839        maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2840
2841        let (instance_response_tx, instance_response_rx) = mpsc::unbounded_channel();
2842
2843        let metrics = StorageControllerMetrics::new(metrics_registry, controller_metrics);
2844
2845        let now_dt = mz_ore::now::to_datetime(now());
2846
2847        Self {
2848            build_info,
2849            collections: BTreeMap::default(),
2850            dropped_objects: Default::default(),
2851            persist_table_worker,
2852            txns_read,
2853            txns_metrics,
2854            stashed_responses: vec![],
2855            pending_table_handle_drops_tx,
2856            pending_table_handle_drops_rx,
2857            pending_oneshot_ingestions: BTreeMap::default(),
2858            collection_manager,
2859            introspection_ids,
2860            introspection_tokens,
2861            now,
2862            read_only,
2863            source_statistics: Arc::new(Mutex::new(statistics::SourceStatistics {
2864                source_statistics: BTreeMap::new(),
2865                webhook_statistics: BTreeMap::new(),
2866            })),
2867            sink_statistics: Arc::new(Mutex::new(BTreeMap::new())),
2868            statistics_interval_sender,
2869            instances: BTreeMap::new(),
2870            initialized: false,
2871            config: StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs()),
2872            persist_location,
2873            persist: persist_clients,
2874            metrics,
2875            recorded_frontiers: BTreeMap::new(),
2876            recorded_replica_frontiers: BTreeMap::new(),
2877            wallclock_lag,
2878            wallclock_lag_last_recorded: now_dt,
2879            storage_collections,
2880            migrated_storage_collections: BTreeSet::new(),
2881            maintenance_ticker,
2882            maintenance_scheduled: false,
2883            instance_response_rx,
2884            instance_response_tx,
2885            persist_warm_task,
2886        }
2887    }
2888
2889    // This is different from `set_read_policies`, which is for external users.
2890    // This method is for setting the policy that the controller uses when
2891    // maintaining the read holds that it has for collections/exports at the
2892    // StorageCollections.
2893    //
2894    // This is really only used when dropping things, where we set the
2895    // ReadPolicy to the empty Antichain.
2896    #[instrument(level = "debug")]
2897    fn set_hold_policies(&mut self, policies: Vec<(GlobalId, ReadPolicy<T>)>) {
2898        let mut read_capability_changes = BTreeMap::default();
2899
2900        for (id, policy) in policies.into_iter() {
2901            if let Some(collection) = self.collections.get_mut(&id) {
2902                let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state
2903                {
2904                    CollectionStateExtra::Ingestion(ingestion) => (
2905                        ingestion.write_frontier.borrow(),
2906                        &mut ingestion.derived_since,
2907                        &mut ingestion.hold_policy,
2908                    ),
2909                    CollectionStateExtra::None => {
2910                        unreachable!("set_hold_policies is only called for ingestions");
2911                    }
2912                    CollectionStateExtra::Export(export) => (
2913                        export.write_frontier.borrow(),
2914                        &mut export.derived_since,
2915                        &mut export.read_policy,
2916                    ),
2917                };
2918
2919                let new_derived_since = policy.frontier(write_frontier);
2920                let mut update = swap_updates(derived_since, new_derived_since);
2921                if !update.is_empty() {
2922                    read_capability_changes.insert(id, update);
2923                }
2924
2925                *hold_policy = policy;
2926            }
2927        }
2928
2929        if !read_capability_changes.is_empty() {
2930            self.update_hold_capabilities(&mut read_capability_changes);
2931        }
2932    }
2933
2934    #[instrument(level = "debug", fields(updates))]
2935    fn update_write_frontier(&mut self, id: GlobalId, new_upper: &Antichain<T>) {
2936        let mut read_capability_changes = BTreeMap::default();
2937
2938        if let Some(collection) = self.collections.get_mut(&id) {
2939            let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state {
2940                CollectionStateExtra::Ingestion(ingestion) => (
2941                    &mut ingestion.write_frontier,
2942                    &mut ingestion.derived_since,
2943                    &ingestion.hold_policy,
2944                ),
2945                CollectionStateExtra::None => {
2946                    if matches!(collection.data_source, DataSource::Progress) {
2947                        // We do get these, but can't do anything with it!
2948                    } else {
2949                        tracing::error!(
2950                            ?collection,
2951                            ?new_upper,
2952                            "updated write frontier for collection which is not an ingestion"
2953                        );
2954                    }
2955                    return;
2956                }
2957                CollectionStateExtra::Export(export) => (
2958                    &mut export.write_frontier,
2959                    &mut export.derived_since,
2960                    &export.read_policy,
2961                ),
2962            };
2963
2964            if PartialOrder::less_than(write_frontier, new_upper) {
2965                write_frontier.clone_from(new_upper);
2966            }
2967
2968            let new_derived_since = hold_policy.frontier(write_frontier.borrow());
2969            let mut update = swap_updates(derived_since, new_derived_since);
2970            if !update.is_empty() {
2971                read_capability_changes.insert(id, update);
2972            }
2973        } else if self.dropped_objects.contains_key(&id) {
2974            // We dropped an object but might still get updates from cluster
2975            // side, before it notices the drop. This is expected and fine.
2976        } else {
2977            soft_panic_or_log!("spurious upper update for {id}: {new_upper:?}");
2978        }
2979
2980        if !read_capability_changes.is_empty() {
2981            self.update_hold_capabilities(&mut read_capability_changes);
2982        }
2983    }
2984
2985    // This is different from `update_read_capabilities`, which is for external users.
2986    // This method is for maintaining the read holds that the controller has at
2987    // the StorageCollections, for storage dependencies.
2988    #[instrument(level = "debug", fields(updates))]
2989    fn update_hold_capabilities(&mut self, updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>) {
2990        // Location to record consequences that we need to act on.
2991        let mut collections_net = BTreeMap::new();
2992
2993        // We must not rely on any specific relative ordering of `GlobalId`s.
2994        // That said, it is reasonable to assume that collections generally have
2995        // greater IDs than their dependencies, so starting with the largest is
2996        // a useful optimization.
2997        while let Some(key) = updates.keys().rev().next().cloned() {
2998            let mut update = updates.remove(&key).unwrap();
2999
3000            if key.is_user() {
3001                debug!(id = %key, ?update, "update_hold_capability");
3002            }
3003
3004            if let Some(collection) = self.collections.get_mut(&key) {
3005                match &mut collection.extra_state {
3006                    CollectionStateExtra::Ingestion(ingestion) => {
3007                        let changes = ingestion.read_capabilities.update_iter(update.drain());
3008                        update.extend(changes);
3009
3010                        let (changes, frontier, _cluster_id) =
3011                            collections_net.entry(key).or_insert_with(|| {
3012                                (
3013                                    <ChangeBatch<_>>::new(),
3014                                    Antichain::new(),
3015                                    ingestion.instance_id,
3016                                )
3017                            });
3018
3019                        changes.extend(update.drain());
3020                        *frontier = ingestion.read_capabilities.frontier().to_owned();
3021                    }
3022                    CollectionStateExtra::None => {
3023                        // WIP: See if this ever panics in ci.
3024                        soft_panic_or_log!(
3025                            "trying to update holds for collection {collection:?} which is not \
3026                             an ingestion: {update:?}"
3027                        );
3028                        continue;
3029                    }
3030                    CollectionStateExtra::Export(export) => {
3031                        let changes = export.read_capabilities.update_iter(update.drain());
3032                        update.extend(changes);
3033
3034                        let (changes, frontier, _cluster_id) =
3035                            collections_net.entry(key).or_insert_with(|| {
3036                                (<ChangeBatch<_>>::new(), Antichain::new(), export.cluster_id)
3037                            });
3038
3039                        changes.extend(update.drain());
3040                        *frontier = export.read_capabilities.frontier().to_owned();
3041                    }
3042                }
3043            } else {
3044                // This is confusing and we should probably error.
3045                tracing::warn!(id = ?key, ?update, "update_hold_capabilities for unknown object");
3046            }
3047        }
3048
3049        // Translate our net compute actions into `AllowCompaction` commands and
3050        // downgrade persist sinces.
3051        for (key, (mut changes, frontier, cluster_id)) in collections_net {
3052            if !changes.is_empty() {
3053                if key.is_user() {
3054                    debug!(id = %key, ?frontier, "downgrading ingestion read holds!");
3055                }
3056
3057                let collection = self
3058                    .collections
3059                    .get_mut(&key)
3060                    .expect("missing collection state");
3061
3062                let read_holds = match &mut collection.extra_state {
3063                    CollectionStateExtra::Ingestion(ingestion) => {
3064                        ingestion.dependency_read_holds.as_mut_slice()
3065                    }
3066                    CollectionStateExtra::Export(export) => export.read_holds.as_mut_slice(),
3067                    CollectionStateExtra::None => {
3068                        soft_panic_or_log!(
3069                            "trying to downgrade read holds for collection which is not an \
3070                             ingestion: {collection:?}"
3071                        );
3072                        continue;
3073                    }
3074                };
3075
3076                for read_hold in read_holds.iter_mut() {
3077                    read_hold
3078                        .try_downgrade(frontier.clone())
3079                        .expect("we only advance the frontier");
3080                }
3081
3082                // Send AllowCompaction command directly to the instance
3083                if let Some(instance) = self.instances.get_mut(&cluster_id) {
3084                    instance.send(StorageCommand::AllowCompaction(key, frontier.clone()));
3085                } else {
3086                    soft_panic_or_log!(
3087                        "missing instance client for cluster {cluster_id} while we still have outstanding AllowCompaction command {frontier:?} for {key}"
3088                    );
3089                }
3090            }
3091        }
3092    }
3093
3094    /// Validate that a collection exists for all identifiers, and error if any do not.
3095    fn validate_collection_ids(
3096        &self,
3097        ids: impl Iterator<Item = GlobalId>,
3098    ) -> Result<(), StorageError<T>> {
3099        for id in ids {
3100            self.storage_collections.check_exists(id)?;
3101        }
3102        Ok(())
3103    }
3104
3105    /// Validate that a collection exists for all identifiers, and error if any do not.
3106    fn validate_export_ids(
3107        &self,
3108        ids: impl Iterator<Item = GlobalId>,
3109    ) -> Result<(), StorageError<T>> {
3110        for id in ids {
3111            self.export(id)?;
3112        }
3113        Ok(())
3114    }
3115
3116    /// Opens a write and critical since handles for the given `shard`.
3117    ///
3118    /// `since` is an optional `since` that the read handle will be forwarded to if it is less than
3119    /// its current since.
3120    ///
3121    /// This will `halt!` the process if we cannot successfully acquire a critical handle with our
3122    /// current epoch.
3123    async fn open_data_handles(
3124        &self,
3125        id: &GlobalId,
3126        shard: ShardId,
3127        relation_desc: RelationDesc,
3128        persist_client: &PersistClient,
3129    ) -> WriteHandle<SourceData, (), T, StorageDiff> {
3130        let diagnostics = Diagnostics {
3131            shard_name: id.to_string(),
3132            handle_purpose: format!("controller data for {}", id),
3133        };
3134
3135        let mut write = persist_client
3136            .open_writer(
3137                shard,
3138                Arc::new(relation_desc),
3139                Arc::new(UnitSchema),
3140                diagnostics.clone(),
3141            )
3142            .await
3143            .expect("invalid persist usage");
3144
3145        // N.B.
3146        // Fetch the most recent upper for the write handle. Otherwise, this may be behind
3147        // the since of the since handle. Its vital this happens AFTER we create
3148        // the since handle as it needs to be linearized with that operation. It may be true
3149        // that creating the write handle after the since handle already ensures this, but we
3150        // do this out of an abundance of caution.
3151        //
3152        // Note that this returns the upper, but also sets it on the handle to be fetched later.
3153        write.fetch_recent_upper().await;
3154
3155        write
3156    }
3157
3158    /// Registers the given introspection collection and does any preparatory
3159    /// work that we have to do before we start writing to it. This
3160    /// preparatory work will include partial truncation or other cleanup
3161    /// schemes, depending on introspection type.
3162    fn register_introspection_collection(
3163        &mut self,
3164        id: GlobalId,
3165        introspection_type: IntrospectionType,
3166        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
3167        persist_client: PersistClient,
3168    ) -> Result<(), StorageError<T>> {
3169        tracing::info!(%id, ?introspection_type, "registering introspection collection");
3170
3171        // In read-only mode we create a new shard for all migrated storage collections. So we
3172        // "trick" the write task into thinking that it's not in read-only mode so something is
3173        // advancing this new shard.
3174        let force_writable = self.read_only && self.migrated_storage_collections.contains(&id);
3175        if force_writable {
3176            assert!(id.is_system(), "unexpected non-system global id: {id:?}");
3177            info!("writing to migrated storage collection {id} in read-only mode");
3178        }
3179
3180        let prev = self.introspection_ids.insert(introspection_type, id);
3181        assert!(
3182            prev.is_none(),
3183            "cannot have multiple IDs for introspection type"
3184        );
3185
3186        let metadata = self.storage_collections.collection_metadata(id)?.clone();
3187
3188        let read_handle_fn = move || {
3189            let persist_client = persist_client.clone();
3190            let metadata = metadata.clone();
3191
3192            let fut = async move {
3193                let read_handle = persist_client
3194                    .open_leased_reader::<SourceData, (), T, StorageDiff>(
3195                        metadata.data_shard,
3196                        Arc::new(metadata.relation_desc.clone()),
3197                        Arc::new(UnitSchema),
3198                        Diagnostics {
3199                            shard_name: id.to_string(),
3200                            handle_purpose: format!("snapshot {}", id),
3201                        },
3202                        USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3203                    )
3204                    .await
3205                    .expect("invalid persist usage");
3206                read_handle
3207            };
3208
3209            fut.boxed()
3210        };
3211
3212        let recent_upper = write_handle.shared_upper();
3213
3214        match CollectionManagerKind::from(&introspection_type) {
3215            // For these, we first register the collection and then prepare it,
3216            // because the code that prepares differential collection expects to
3217            // be able to update desired state via the collection manager
3218            // already.
3219            CollectionManagerKind::Differential => {
3220                let statistics_retention_duration =
3221                    dyncfgs::STATISTICS_RETENTION_DURATION.get(self.config().config_set());
3222
3223                // These do a shallow copy.
3224                let introspection_config = DifferentialIntrospectionConfig {
3225                    recent_upper,
3226                    introspection_type,
3227                    storage_collections: Arc::clone(&self.storage_collections),
3228                    collection_manager: self.collection_manager.clone(),
3229                    source_statistics: Arc::clone(&self.source_statistics),
3230                    sink_statistics: Arc::clone(&self.sink_statistics),
3231                    statistics_interval: self.config.parameters.statistics_interval.clone(),
3232                    statistics_interval_receiver: self.statistics_interval_sender.subscribe(),
3233                    statistics_retention_duration,
3234                    metrics: self.metrics.clone(),
3235                    introspection_tokens: Arc::clone(&self.introspection_tokens),
3236                };
3237                self.collection_manager.register_differential_collection(
3238                    id,
3239                    write_handle,
3240                    read_handle_fn,
3241                    force_writable,
3242                    introspection_config,
3243                );
3244            }
3245            // For these, we first have to prepare and then register with
3246            // collection manager, because the preparation logic wants to read
3247            // the shard's contents and then do uncontested writes.
3248            //
3249            // TODO(aljoscha): We should make the truncation/cleanup work that
3250            // happens when we take over instead be a periodic thing, and make
3251            // it resilient to the upper moving concurrently.
3252            CollectionManagerKind::AppendOnly => {
3253                let introspection_config = AppendOnlyIntrospectionConfig {
3254                    introspection_type,
3255                    config_set: Arc::clone(self.config.config_set()),
3256                    parameters: self.config.parameters.clone(),
3257                    storage_collections: Arc::clone(&self.storage_collections),
3258                };
3259                self.collection_manager.register_append_only_collection(
3260                    id,
3261                    write_handle,
3262                    force_writable,
3263                    Some(introspection_config),
3264                );
3265            }
3266        }
3267
3268        Ok(())
3269    }
3270
3271    /// Remove statistics for sources/sinks that were dropped but still have statistics rows
3272    /// hanging around.
3273    fn reconcile_dangling_statistics(&self) {
3274        self.source_statistics
3275            .lock()
3276            .expect("poisoned")
3277            .source_statistics
3278            // collections should also contain subsources.
3279            .retain(|(k, _replica_id), _| self.storage_collections.check_exists(*k).is_ok());
3280        self.sink_statistics
3281            .lock()
3282            .expect("poisoned")
3283            .retain(|(k, _replica_id), _| self.export(*k).is_ok());
3284    }
3285
3286    /// Appends a new global ID, shard ID pair to the appropriate collection.
3287    /// Use a `diff` of 1 to append a new entry; -1 to retract an existing
3288    /// entry.
3289    ///
3290    /// # Panics
3291    /// - If `self.collections` does not have an entry for `global_id`.
3292    /// - If `IntrospectionType::ShardMapping`'s `GlobalId` is not registered as
3293    ///   a managed collection.
3294    /// - If diff is any value other than `1` or `-1`.
3295    #[instrument(level = "debug")]
3296    fn append_shard_mappings<I>(&self, global_ids: I, diff: Diff)
3297    where
3298        I: Iterator<Item = GlobalId>,
3299    {
3300        mz_ore::soft_assert_or_log!(
3301            diff == Diff::MINUS_ONE || diff == Diff::ONE,
3302            "use 1 for insert or -1 for delete"
3303        );
3304
3305        let id = *self
3306            .introspection_ids
3307            .get(&IntrospectionType::ShardMapping)
3308            .expect("should be registered before this call");
3309
3310        let mut updates = vec![];
3311        // Pack updates into rows
3312        let mut row_buf = Row::default();
3313
3314        for global_id in global_ids {
3315            let shard_id = if let Some(collection) = self.collections.get(&global_id) {
3316                collection.collection_metadata.data_shard.clone()
3317            } else {
3318                panic!("unknown global id: {}", global_id);
3319            };
3320
3321            let mut packer = row_buf.packer();
3322            packer.push(Datum::from(global_id.to_string().as_str()));
3323            packer.push(Datum::from(shard_id.to_string().as_str()));
3324            updates.push((row_buf.clone(), diff));
3325        }
3326
3327        self.collection_manager.differential_append(id, updates);
3328    }
3329
3330    /// Determines and returns this collection's dependencies, if any.
3331    fn determine_collection_dependencies(
3332        &self,
3333        self_id: GlobalId,
3334        collection_desc: &CollectionDescription<T>,
3335    ) -> Result<Vec<GlobalId>, StorageError<T>> {
3336        let mut dependencies = Vec::new();
3337
3338        if let Some(id) = collection_desc.primary {
3339            dependencies.push(id);
3340        }
3341
3342        match &collection_desc.data_source {
3343            DataSource::Introspection(_)
3344            | DataSource::Webhook
3345            | DataSource::Table
3346            | DataSource::Progress
3347            | DataSource::Other => (),
3348            DataSource::IngestionExport { ingestion_id, .. } => {
3349                // Ingestion exports depend on their primary source's remap
3350                // collection.
3351                let source_collection = self.collection(*ingestion_id)?;
3352                let ingestion_remap_collection_id = match &source_collection.data_source {
3353                    DataSource::Ingestion(ingestion) => ingestion.remap_collection_id,
3354                    _ => unreachable!(
3355                        "SourceExport must only refer to primary sources that already exist"
3356                    ),
3357                };
3358
3359                // Ingestion exports (aka. subsources) must make sure that 1)
3360                // their own collection's since stays one step behind the upper,
3361                // and, 2) that the remap shard's since stays one step behind
3362                // their upper. Hence they track themselves and the remap shard
3363                // as dependencies.
3364                dependencies.extend([self_id, ingestion_remap_collection_id]);
3365            }
3366            // Ingestions depend on their remap collection.
3367            DataSource::Ingestion(ingestion) => {
3368                // Ingestions must make sure that 1) their own collection's
3369                // since stays one step behind the upper, and, 2) that the remap
3370                // shard's since stays one step behind their upper. Hence they
3371                // track themselves and the remap shard as dependencies.
3372                dependencies.push(self_id);
3373                if self_id != ingestion.remap_collection_id {
3374                    dependencies.push(ingestion.remap_collection_id);
3375                }
3376            }
3377            DataSource::Sink { desc } => {
3378                // Sinks hold back their own frontier and the frontier of their input.
3379                dependencies.extend([self_id, desc.sink.from]);
3380            }
3381        };
3382
3383        Ok(dependencies)
3384    }
3385
3386    async fn read_handle_for_snapshot(
3387        &self,
3388        id: GlobalId,
3389    ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
3390        let metadata = self.storage_collections.collection_metadata(id)?;
3391        read_handle_for_snapshot(&self.persist, id, &metadata).await
3392    }
3393
3394    /// Handles writing of status updates for sources/sinks to the appropriate
3395    /// status relation
3396    fn record_status_updates(&mut self, updates: Vec<StatusUpdate>) {
3397        if self.read_only {
3398            return;
3399        }
3400
3401        let mut sink_status_updates = vec![];
3402        let mut source_status_updates = vec![];
3403
3404        for update in updates {
3405            let id = update.id;
3406            if self.export(id).is_ok() {
3407                sink_status_updates.push(update);
3408            } else if self.storage_collections.check_exists(id).is_ok() {
3409                source_status_updates.push(update);
3410            }
3411        }
3412
3413        self.append_status_introspection_updates(
3414            IntrospectionType::SourceStatusHistory,
3415            source_status_updates,
3416        );
3417        self.append_status_introspection_updates(
3418            IntrospectionType::SinkStatusHistory,
3419            sink_status_updates,
3420        );
3421    }
3422
3423    fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, StorageError<T>> {
3424        self.collections
3425            .get(&id)
3426            .ok_or(StorageError::IdentifierMissing(id))
3427    }
3428
3429    /// Runs the identified ingestion using the current definition of the
3430    /// ingestion in-memory.
3431    fn run_ingestion(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3432        tracing::info!(%id, "starting ingestion");
3433
3434        let collection = self.collection(id)?;
3435        let ingestion_description = match &collection.data_source {
3436            DataSource::Ingestion(i) => i.clone(),
3437            _ => {
3438                tracing::warn!("run_ingestion called on non-ingestion ID {}", id);
3439                Err(StorageError::IdentifierInvalid(id))?
3440            }
3441        };
3442
3443        // Enrich all of the exports with their metadata
3444        let mut source_exports = BTreeMap::new();
3445        for (export_id, export) in ingestion_description.source_exports.clone() {
3446            let export_storage_metadata = self.collection(export_id)?.collection_metadata.clone();
3447            source_exports.insert(
3448                export_id,
3449                SourceExport {
3450                    storage_metadata: export_storage_metadata,
3451                    details: export.details,
3452                    data_config: export.data_config,
3453                },
3454            );
3455        }
3456
3457        let remap_collection = self.collection(ingestion_description.remap_collection_id)?;
3458
3459        let description = IngestionDescription::<CollectionMetadata> {
3460            source_exports,
3461            remap_metadata: remap_collection.collection_metadata.clone(),
3462            // The rest of the fields are identical
3463            desc: ingestion_description.desc.clone(),
3464            instance_id: ingestion_description.instance_id,
3465            remap_collection_id: ingestion_description.remap_collection_id,
3466        };
3467
3468        let storage_instance_id = description.instance_id;
3469        // Fetch the client for this ingestion's instance.
3470        let instance = self
3471            .instances
3472            .get_mut(&storage_instance_id)
3473            .ok_or_else(|| StorageError::IngestionInstanceMissing {
3474                storage_instance_id,
3475                ingestion_id: id,
3476            })?;
3477
3478        let augmented_ingestion = Box::new(RunIngestionCommand { id, description });
3479        instance.send(StorageCommand::RunIngestion(augmented_ingestion));
3480
3481        Ok(())
3482    }
3483
3484    /// Runs the identified export using the current definition of the export
3485    /// that we have in memory.
3486    fn run_export(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3487        let DataSource::Sink { desc: description } = &self.collections[&id].data_source else {
3488            return Err(StorageError::IdentifierMissing(id));
3489        };
3490
3491        let from_storage_metadata = self
3492            .storage_collections
3493            .collection_metadata(description.sink.from)?;
3494        let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
3495
3496        // Choose an as-of frontier for this execution of the sink. If the write frontier of the sink
3497        // is strictly larger than its read hold, it must have at least written out its snapshot, and we can skip
3498        // reading it; otherwise assume we may have to replay from the beginning.
3499        let export_state = self.storage_collections.collection_frontiers(id)?;
3500        let mut as_of = description.sink.as_of.clone();
3501        as_of.join_assign(&export_state.implied_capability);
3502        let with_snapshot = description.sink.with_snapshot
3503            && !PartialOrder::less_than(&as_of, &export_state.write_frontier);
3504
3505        info!(
3506            sink_id = %id,
3507            from_id = %description.sink.from,
3508            write_frontier = ?export_state.write_frontier,
3509            ?as_of,
3510            ?with_snapshot,
3511            "run_export"
3512        );
3513
3514        let cmd = RunSinkCommand {
3515            id,
3516            description: StorageSinkDesc {
3517                from: description.sink.from,
3518                from_desc: description.sink.from_desc.clone(),
3519                connection: description.sink.connection.clone(),
3520                envelope: description.sink.envelope,
3521                as_of,
3522                version: description.sink.version,
3523                from_storage_metadata,
3524                with_snapshot,
3525                to_storage_metadata,
3526                commit_interval: description.sink.commit_interval,
3527            },
3528        };
3529
3530        let storage_instance_id = description.instance_id.clone();
3531
3532        let instance = self
3533            .instances
3534            .get_mut(&storage_instance_id)
3535            .ok_or_else(|| StorageError::ExportInstanceMissing {
3536                storage_instance_id,
3537                export_id: id,
3538            })?;
3539
3540        instance.send(StorageCommand::RunSink(Box::new(cmd)));
3541
3542        Ok(())
3543    }
3544
3545    /// Update introspection with the current frontiers of storage objects.
3546    ///
3547    /// This method is invoked by `Controller::maintain`, which we expect to be called once per
3548    /// second during normal operation.
3549    fn update_frontier_introspection(&mut self) {
3550        let mut global_frontiers = BTreeMap::new();
3551        let mut replica_frontiers = BTreeMap::new();
3552
3553        for collection_frontiers in self.storage_collections.active_collection_frontiers() {
3554            let id = collection_frontiers.id;
3555            let since = collection_frontiers.read_capabilities;
3556            let upper = collection_frontiers.write_frontier;
3557
3558            let instance = self
3559                .collections
3560                .get(&id)
3561                .and_then(|collection_state| match &collection_state.extra_state {
3562                    CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
3563                    CollectionStateExtra::Export(export) => Some(export.cluster_id()),
3564                    CollectionStateExtra::None => None,
3565                })
3566                .and_then(|i| self.instances.get(&i));
3567
3568            if let Some(instance) = instance {
3569                for replica_id in instance.replica_ids() {
3570                    replica_frontiers.insert((id, replica_id), upper.clone());
3571                }
3572            }
3573
3574            global_frontiers.insert(id, (since, upper));
3575        }
3576
3577        let mut global_updates = Vec::new();
3578        let mut replica_updates = Vec::new();
3579
3580        let mut push_global_update =
3581            |id: GlobalId, (since, upper): (Antichain<T>, Antichain<T>), diff: Diff| {
3582                let read_frontier = since.into_option().map_or(Datum::Null, |t| t.into());
3583                let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3584                let row = Row::pack_slice(&[
3585                    Datum::String(&id.to_string()),
3586                    read_frontier,
3587                    write_frontier,
3588                ]);
3589                global_updates.push((row, diff));
3590            };
3591
3592        let mut push_replica_update =
3593            |(id, replica_id): (GlobalId, ReplicaId), upper: Antichain<T>, diff: Diff| {
3594                let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3595                let row = Row::pack_slice(&[
3596                    Datum::String(&id.to_string()),
3597                    Datum::String(&replica_id.to_string()),
3598                    write_frontier,
3599                ]);
3600                replica_updates.push((row, diff));
3601            };
3602
3603        let mut old_global_frontiers =
3604            std::mem::replace(&mut self.recorded_frontiers, global_frontiers);
3605        for (&id, new) in &self.recorded_frontiers {
3606            match old_global_frontiers.remove(&id) {
3607                Some(old) if &old != new => {
3608                    push_global_update(id, new.clone(), Diff::ONE);
3609                    push_global_update(id, old, Diff::MINUS_ONE);
3610                }
3611                Some(_) => (),
3612                None => push_global_update(id, new.clone(), Diff::ONE),
3613            }
3614        }
3615        for (id, old) in old_global_frontiers {
3616            push_global_update(id, old, Diff::MINUS_ONE);
3617        }
3618
3619        let mut old_replica_frontiers =
3620            std::mem::replace(&mut self.recorded_replica_frontiers, replica_frontiers);
3621        for (&key, new) in &self.recorded_replica_frontiers {
3622            match old_replica_frontiers.remove(&key) {
3623                Some(old) if &old != new => {
3624                    push_replica_update(key, new.clone(), Diff::ONE);
3625                    push_replica_update(key, old, Diff::MINUS_ONE);
3626                }
3627                Some(_) => (),
3628                None => push_replica_update(key, new.clone(), Diff::ONE),
3629            }
3630        }
3631        for (key, old) in old_replica_frontiers {
3632            push_replica_update(key, old, Diff::MINUS_ONE);
3633        }
3634
3635        let id = self.introspection_ids[&IntrospectionType::Frontiers];
3636        self.collection_manager
3637            .differential_append(id, global_updates);
3638
3639        let id = self.introspection_ids[&IntrospectionType::ReplicaFrontiers];
3640        self.collection_manager
3641            .differential_append(id, replica_updates);
3642    }
3643
3644    /// Refresh the wallclock lag introspection and metrics with the current lag values.
3645    ///
3646    /// This method produces wallclock lag metrics of two different shapes:
3647    ///
3648    /// * Histories: For each replica and each collection, we measure the lag of the write frontier
3649    ///   behind the wallclock time every second. Every minute we emit the maximum lag observed
3650    ///   over the last minute, together with the current time.
3651    /// * Histograms: For each collection, we measure the lag of the write frontier behind
3652    ///   wallclock time every second. Every minute we emit all lags observed over the last minute,
3653    ///   together with the current histogram period.
3654    ///
3655    /// Histories are emitted to both Mz introspection and Prometheus, histograms only to
3656    /// introspection. We treat lags of unreadable collections (i.e. collections that contain no
3657    /// readable times) as undefined and set them to NULL in introspection and `u64::MAX` in
3658    /// Prometheus.
3659    ///
3660    /// This method is invoked by `Controller::maintain`, which we expect to be called once per
3661    /// second during normal operation.
3662    fn refresh_wallclock_lag(&mut self) {
3663        let now_ms = (self.now)();
3664        let histogram_period =
3665            WallclockLagHistogramPeriod::from_epoch_millis(now_ms, self.config.config_set());
3666
3667        let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
3668            Some(ts) => (self.wallclock_lag)(ts.clone()),
3669            None => Duration::ZERO,
3670        };
3671
3672        for frontiers in self.storage_collections.active_collection_frontiers() {
3673            let id = frontiers.id;
3674            let Some(collection) = self.collections.get_mut(&id) else {
3675                continue;
3676            };
3677
3678            let collection_unreadable =
3679                PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
3680            let lag = if collection_unreadable {
3681                WallclockLag::Undefined
3682            } else {
3683                let lag = frontier_lag(&frontiers.write_frontier);
3684                WallclockLag::Seconds(lag.as_secs())
3685            };
3686
3687            collection.wallclock_lag_max = collection.wallclock_lag_max.max(lag);
3688
3689            // No way to specify values as undefined in Prometheus metrics, so we use the
3690            // maximum value instead.
3691            let secs = lag.unwrap_seconds_or(u64::MAX);
3692            collection.wallclock_lag_metrics.observe(secs);
3693
3694            if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
3695                let bucket = lag.map_seconds(|secs| secs.next_power_of_two());
3696
3697                let instance_id = match &collection.extra_state {
3698                    CollectionStateExtra::Ingestion(i) => Some(i.instance_id),
3699                    CollectionStateExtra::Export(e) => Some(e.cluster_id()),
3700                    CollectionStateExtra::None => None,
3701                };
3702                let workload_class = instance_id
3703                    .and_then(|id| self.instances.get(&id))
3704                    .and_then(|i| i.workload_class.clone());
3705                let labels = match workload_class {
3706                    Some(wc) => [("workload_class", wc.clone())].into(),
3707                    None => BTreeMap::new(),
3708                };
3709
3710                let key = (histogram_period, bucket, labels);
3711                *stash.entry(key).or_default() += Diff::ONE;
3712            }
3713        }
3714
3715        // Record lags to persist, if it's time.
3716        self.maybe_record_wallclock_lag();
3717    }
3718
3719    /// Produce new wallclock lag introspection updates, provided enough time has passed since the
3720    /// last recording.
3721    ///
3722    /// We emit new introspection updates if the system time has passed into a new multiple of the
3723    /// recording interval (typically 1 minute) since the last refresh. The compute controller uses
3724    /// the same approach, ensuring that both controllers commit their lags at roughly the same
3725    /// time, avoiding confusion caused by inconsistencies.
3726    fn maybe_record_wallclock_lag(&mut self) {
3727        if self.read_only {
3728            return;
3729        }
3730
3731        let duration_trunc = |datetime: DateTime<_>, interval| {
3732            let td = TimeDelta::from_std(interval).ok()?;
3733            datetime.duration_trunc(td).ok()
3734        };
3735
3736        let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(self.config.config_set());
3737        let now_dt = mz_ore::now::to_datetime((self.now)());
3738        let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
3739            soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
3740            let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
3741            duration_trunc(now_dt, *default).unwrap()
3742        });
3743        if now_trunc <= self.wallclock_lag_last_recorded {
3744            return;
3745        }
3746
3747        let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
3748
3749        let mut history_updates = Vec::new();
3750        let mut histogram_updates = Vec::new();
3751        let mut row_buf = Row::default();
3752        for frontiers in self.storage_collections.active_collection_frontiers() {
3753            let id = frontiers.id;
3754            let Some(collection) = self.collections.get_mut(&id) else {
3755                continue;
3756            };
3757
3758            let max_lag = std::mem::replace(&mut collection.wallclock_lag_max, WallclockLag::MIN);
3759            let row = Row::pack_slice(&[
3760                Datum::String(&id.to_string()),
3761                Datum::Null,
3762                max_lag.into_interval_datum(),
3763                Datum::TimestampTz(now_ts),
3764            ]);
3765            history_updates.push((row, Diff::ONE));
3766
3767            let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
3768                continue;
3769            };
3770
3771            for ((period, lag, labels), count) in std::mem::take(stash) {
3772                let mut packer = row_buf.packer();
3773                packer.extend([
3774                    Datum::TimestampTz(period.start),
3775                    Datum::TimestampTz(period.end),
3776                    Datum::String(&id.to_string()),
3777                    lag.into_uint64_datum(),
3778                ]);
3779                let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
3780                packer.push_dict(labels);
3781
3782                histogram_updates.push((row_buf.clone(), count));
3783            }
3784        }
3785
3786        if !history_updates.is_empty() {
3787            self.append_introspection_updates(
3788                IntrospectionType::WallclockLagHistory,
3789                history_updates,
3790            );
3791        }
3792        if !histogram_updates.is_empty() {
3793            self.append_introspection_updates(
3794                IntrospectionType::WallclockLagHistogram,
3795                histogram_updates,
3796            );
3797        }
3798
3799        self.wallclock_lag_last_recorded = now_trunc;
3800    }
3801
3802    /// Run periodic tasks.
3803    ///
3804    /// This method is invoked roughly once per second during normal operation. It is a good place
3805    /// for tasks that need to run periodically, such as state cleanup or updating of metrics.
3806    fn maintain(&mut self) {
3807        self.update_frontier_introspection();
3808        self.refresh_wallclock_lag();
3809
3810        // Perform instance maintenance work.
3811        for instance in self.instances.values_mut() {
3812            instance.refresh_state_metrics();
3813        }
3814    }
3815}
3816
3817impl From<&IntrospectionType> for CollectionManagerKind {
3818    fn from(value: &IntrospectionType) -> Self {
3819        match value {
3820            IntrospectionType::ShardMapping
3821            | IntrospectionType::Frontiers
3822            | IntrospectionType::ReplicaFrontiers
3823            | IntrospectionType::StorageSourceStatistics
3824            | IntrospectionType::StorageSinkStatistics
3825            | IntrospectionType::ComputeDependencies
3826            | IntrospectionType::ComputeOperatorHydrationStatus
3827            | IntrospectionType::ComputeMaterializedViewRefreshes
3828            | IntrospectionType::ComputeErrorCounts
3829            | IntrospectionType::ComputeHydrationTimes => CollectionManagerKind::Differential,
3830
3831            IntrospectionType::SourceStatusHistory
3832            | IntrospectionType::SinkStatusHistory
3833            | IntrospectionType::PrivatelinkConnectionStatusHistory
3834            | IntrospectionType::ReplicaStatusHistory
3835            | IntrospectionType::ReplicaMetricsHistory
3836            | IntrospectionType::WallclockLagHistory
3837            | IntrospectionType::WallclockLagHistogram
3838            | IntrospectionType::PreparedStatementHistory
3839            | IntrospectionType::StatementExecutionHistory
3840            | IntrospectionType::SessionHistory
3841            | IntrospectionType::StatementLifecycleHistory
3842            | IntrospectionType::SqlText => CollectionManagerKind::AppendOnly,
3843        }
3844    }
3845}
3846
3847/// Get the current rows in the given statistics table. This is used to bootstrap
3848/// the statistics tasks.
3849///
3850// TODO(guswynn): we need to be more careful about the update time we get here:
3851// <https://github.com/MaterializeInc/database-issues/issues/7564>
3852async fn snapshot_statistics<T>(
3853    id: GlobalId,
3854    upper: Antichain<T>,
3855    storage_collections: &Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
3856) -> Vec<Row>
3857where
3858    T: Codec64 + From<EpochMillis> + TimestampManipulation,
3859{
3860    match upper.as_option() {
3861        Some(f) if f > &T::minimum() => {
3862            let as_of = f.step_back().unwrap();
3863
3864            let snapshot = storage_collections.snapshot(id, as_of).await.unwrap();
3865            snapshot
3866                .into_iter()
3867                .map(|(row, diff)| {
3868                    assert_eq!(diff, 1);
3869                    row
3870                })
3871                .collect()
3872        }
3873        // If collection is closed or the frontier is the minimum, we cannot
3874        // or don't need to truncate (respectively).
3875        _ => Vec::new(),
3876    }
3877}
3878
3879async fn read_handle_for_snapshot<T>(
3880    persist: &PersistClientCache,
3881    id: GlobalId,
3882    metadata: &CollectionMetadata,
3883) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>>
3884where
3885    T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
3886{
3887    let persist_client = persist
3888        .open(metadata.persist_location.clone())
3889        .await
3890        .unwrap();
3891
3892    // We create a new read handle every time someone requests a snapshot and then immediately
3893    // expire it instead of keeping a read handle permanently in our state to avoid having it
3894    // heartbeat continously. The assumption is that calls to snapshot are rare and therefore
3895    // worth it to always create a new handle.
3896    let read_handle = persist_client
3897        .open_leased_reader::<SourceData, (), _, _>(
3898            metadata.data_shard,
3899            Arc::new(metadata.relation_desc.clone()),
3900            Arc::new(UnitSchema),
3901            Diagnostics {
3902                shard_name: id.to_string(),
3903                handle_purpose: format!("snapshot {}", id),
3904            },
3905            USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3906        )
3907        .await
3908        .expect("invalid persist usage");
3909    Ok(read_handle)
3910}
3911
3912/// State maintained about individual collections.
3913#[derive(Debug)]
3914struct CollectionState<T: TimelyTimestamp> {
3915    /// The source of this collection's data.
3916    pub data_source: DataSource<T>,
3917
3918    pub collection_metadata: CollectionMetadata,
3919
3920    pub extra_state: CollectionStateExtra<T>,
3921
3922    /// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update.
3923    wallclock_lag_max: WallclockLag,
3924    /// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
3925    /// introspection update.
3926    ///
3927    /// Keys are `(period, lag, labels)` triples, values are counts.
3928    ///
3929    /// If this is `None`, wallclock lag is not tracked for this collection.
3930    wallclock_lag_histogram_stash: Option<
3931        BTreeMap<
3932            (
3933                WallclockLagHistogramPeriod,
3934                WallclockLag,
3935                BTreeMap<&'static str, String>,
3936            ),
3937            Diff,
3938        >,
3939    >,
3940    /// Frontier wallclock lag metrics tracked for this collection.
3941    wallclock_lag_metrics: WallclockLagMetrics,
3942}
3943
3944impl<T: TimelyTimestamp> CollectionState<T> {
3945    fn new(
3946        data_source: DataSource<T>,
3947        collection_metadata: CollectionMetadata,
3948        extra_state: CollectionStateExtra<T>,
3949        wallclock_lag_metrics: WallclockLagMetrics,
3950    ) -> Self {
3951        // Only collect wallclock lag histogram data for collections written by storage, to avoid
3952        // duplicate measurements. Collections written by other components (e.g. compute) have
3953        // their wallclock lags recorded by these components.
3954        let wallclock_lag_histogram_stash = match &data_source {
3955            DataSource::Other => None,
3956            _ => Some(Default::default()),
3957        };
3958
3959        Self {
3960            data_source,
3961            collection_metadata,
3962            extra_state,
3963            wallclock_lag_max: WallclockLag::MIN,
3964            wallclock_lag_histogram_stash,
3965            wallclock_lag_metrics,
3966        }
3967    }
3968}
3969
3970/// Additional state that the controller maintains for select collection types.
3971#[derive(Debug)]
3972enum CollectionStateExtra<T: TimelyTimestamp> {
3973    Ingestion(IngestionState<T>),
3974    Export(ExportState<T>),
3975    None,
3976}
3977
3978/// State maintained about ingestions and ingestion exports
3979#[derive(Debug)]
3980struct IngestionState<T: TimelyTimestamp> {
3981    /// Really only for keeping track of changes to the `derived_since`.
3982    pub read_capabilities: MutableAntichain<T>,
3983
3984    /// The current since frontier, derived from `write_frontier` using
3985    /// `hold_policy`.
3986    pub derived_since: Antichain<T>,
3987
3988    /// Holds that this ingestion (or ingestion export) has on its dependencies.
3989    pub dependency_read_holds: Vec<ReadHold<T>>,
3990
3991    /// Reported write frontier.
3992    pub write_frontier: Antichain<T>,
3993
3994    /// The policy that drives how we downgrade our read hold. That is how we
3995    /// derive our since from our upper.
3996    ///
3997    /// This is a _storage-controller-internal_ policy used to derive its
3998    /// personal read hold on the collection. It should not be confused with any
3999    /// read policies that the adapter might install at [StorageCollections].
4000    pub hold_policy: ReadPolicy<T>,
4001
4002    /// The ID of the instance in which the ingestion is running.
4003    pub instance_id: StorageInstanceId,
4004
4005    /// Set of replica IDs on which this ingestion is hydrated.
4006    pub hydrated_on: BTreeSet<ReplicaId>,
4007}
4008
4009/// A description of a status history collection.
4010///
4011/// Used to inform partial truncation, see
4012/// [`collection_mgmt::partially_truncate_status_history`].
4013struct StatusHistoryDesc<K> {
4014    retention_policy: StatusHistoryRetentionPolicy,
4015    extract_key: Box<dyn Fn(&[Datum]) -> K + Send>,
4016    extract_time: Box<dyn Fn(&[Datum]) -> CheckedTimestamp<DateTime<Utc>> + Send>,
4017}
4018enum StatusHistoryRetentionPolicy {
4019    // Truncates everything but the last N updates for each key.
4020    LastN(usize),
4021    // Truncates everything past the time window for each key.
4022    TimeWindow(Duration),
4023}
4024
4025fn source_status_history_desc(
4026    params: &StorageParameters,
4027) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
4028    let desc = &MZ_SOURCE_STATUS_HISTORY_DESC;
4029    let (source_id_idx, _) = desc.get_by_name(&"source_id".into()).expect("exists");
4030    let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
4031    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4032
4033    StatusHistoryDesc {
4034        retention_policy: StatusHistoryRetentionPolicy::LastN(
4035            params.keep_n_source_status_history_entries,
4036        ),
4037        extract_key: Box::new(move |datums| {
4038            (
4039                GlobalId::from_str(datums[source_id_idx].unwrap_str()).expect("GlobalId column"),
4040                if datums[replica_id_idx].is_null() {
4041                    None
4042                } else {
4043                    Some(
4044                        ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
4045                            .expect("ReplicaId column"),
4046                    )
4047                },
4048            )
4049        }),
4050        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4051    }
4052}
4053
4054fn sink_status_history_desc(
4055    params: &StorageParameters,
4056) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
4057    let desc = &MZ_SINK_STATUS_HISTORY_DESC;
4058    let (sink_id_idx, _) = desc.get_by_name(&"sink_id".into()).expect("exists");
4059    let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
4060    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4061
4062    StatusHistoryDesc {
4063        retention_policy: StatusHistoryRetentionPolicy::LastN(
4064            params.keep_n_sink_status_history_entries,
4065        ),
4066        extract_key: Box::new(move |datums| {
4067            (
4068                GlobalId::from_str(datums[sink_id_idx].unwrap_str()).expect("GlobalId column"),
4069                if datums[replica_id_idx].is_null() {
4070                    None
4071                } else {
4072                    Some(
4073                        ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
4074                            .expect("ReplicaId column"),
4075                    )
4076                },
4077            )
4078        }),
4079        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4080    }
4081}
4082
4083fn privatelink_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<GlobalId> {
4084    let desc = &MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC;
4085    let (key_idx, _) = desc.get_by_name(&"connection_id".into()).expect("exists");
4086    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4087
4088    StatusHistoryDesc {
4089        retention_policy: StatusHistoryRetentionPolicy::LastN(
4090            params.keep_n_privatelink_status_history_entries,
4091        ),
4092        extract_key: Box::new(move |datums| {
4093            GlobalId::from_str(datums[key_idx].unwrap_str()).expect("GlobalId column")
4094        }),
4095        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4096    }
4097}
4098
4099fn replica_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<(GlobalId, u64)> {
4100    let desc = &REPLICA_STATUS_HISTORY_DESC;
4101    let (replica_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
4102    let (process_idx, _) = desc.get_by_name(&"process_id".into()).expect("exists");
4103    let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4104
4105    StatusHistoryDesc {
4106        retention_policy: StatusHistoryRetentionPolicy::TimeWindow(
4107            params.replica_status_history_retention_window,
4108        ),
4109        extract_key: Box::new(move |datums| {
4110            (
4111                GlobalId::from_str(datums[replica_idx].unwrap_str()).expect("GlobalId column"),
4112                datums[process_idx].unwrap_uint64(),
4113            )
4114        }),
4115        extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4116    }
4117}
4118
4119/// Replace one antichain with another, tracking the overall changes in the returned `ChangeBatch`.
4120fn swap_updates<T: Timestamp>(
4121    from: &mut Antichain<T>,
4122    mut replace_with: Antichain<T>,
4123) -> ChangeBatch<T> {
4124    let mut update = ChangeBatch::new();
4125    if PartialOrder::less_equal(from, &replace_with) {
4126        update.extend(replace_with.iter().map(|time| (time.clone(), 1)));
4127        std::mem::swap(from, &mut replace_with);
4128        update.extend(replace_with.iter().map(|time| (time.clone(), -1)));
4129    }
4130    update
4131}