1use std::any::Any;
13use std::collections::btree_map;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt::{Debug, Display};
16use std::str::FromStr;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use crate::collection_mgmt::{
21 AppendOnlyIntrospectionConfig, CollectionManagerKind, DifferentialIntrospectionConfig,
22};
23use crate::instance::{Instance, ReplicaConfig};
24use async_trait::async_trait;
25use chrono::{DateTime, DurationRound, TimeDelta, Utc};
26use derivative::Derivative;
27use differential_dataflow::lattice::Lattice;
28use futures::FutureExt;
29use futures::StreamExt;
30use itertools::Itertools;
31use mz_build_info::BuildInfo;
32use mz_cluster_client::client::ClusterReplicaLocation;
33use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
34use mz_cluster_client::{ReplicaId, WallclockLagFn};
35use mz_controller_types::dyncfgs::{
36 ENABLE_0DT_DEPLOYMENT_SOURCES, WALLCLOCK_LAG_RECORDING_INTERVAL,
37};
38use mz_ore::collections::CollectionExt;
39use mz_ore::metrics::MetricsRegistry;
40use mz_ore::now::{EpochMillis, NowFn};
41use mz_ore::task::AbortOnDropHandle;
42use mz_ore::{assert_none, halt, instrument, soft_panic_or_log};
43use mz_persist_client::batch::ProtoBatch;
44use mz_persist_client::cache::PersistClientCache;
45use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
46use mz_persist_client::critical::Opaque;
47use mz_persist_client::read::ReadHandle;
48use mz_persist_client::schema::CaESchema;
49use mz_persist_client::write::WriteHandle;
50use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
51use mz_persist_types::Codec64;
52use mz_persist_types::codec_impls::UnitSchema;
53use mz_repr::adt::timestamp::CheckedTimestamp;
54use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
55use mz_storage_client::client::{
56 AppendOnlyUpdate, RunIngestionCommand, RunOneshotIngestion, RunSinkCommand, Status,
57 StatusUpdate, StorageCommand, StorageResponse, TableData,
58};
59use mz_storage_client::controller::{
60 BoxFuture, CollectionDescription, DataSource, ExportDescription, ExportState,
61 IntrospectionType, MonotonicAppender, PersistEpoch, Response, StorageController,
62 StorageMetadata, StorageTxn, StorageWriteOp, WallclockLag, WallclockLagHistogramPeriod,
63};
64use mz_storage_client::healthcheck::{
65 MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC,
66 MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_STATUS_HISTORY_DESC,
67};
68use mz_storage_client::metrics::StorageControllerMetrics;
69use mz_storage_client::statistics::{
70 ControllerSinkStatistics, ControllerSourceStatistics, WebhookStatistics,
71};
72use mz_storage_client::storage_collections::StorageCollections;
73use mz_storage_types::configuration::StorageConfiguration;
74use mz_storage_types::connections::ConnectionContext;
75use mz_storage_types::connections::inline::InlinedConnection;
76use mz_storage_types::controller::{AlterError, CollectionMetadata, StorageError, TxnsCodecRow};
77use mz_storage_types::errors::CollectionMissing;
78use mz_storage_types::instances::StorageInstanceId;
79use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
80use mz_storage_types::parameters::StorageParameters;
81use mz_storage_types::read_holds::ReadHold;
82use mz_storage_types::read_policy::ReadPolicy;
83use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
84use mz_storage_types::sources::{
85 GenericSourceConnection, IngestionDescription, SourceConnection, SourceData, SourceDesc,
86 SourceExport, SourceExportDataConfig,
87};
88use mz_storage_types::{AlterCompatible, StorageDiff, dyncfgs};
89use mz_txn_wal::metrics::Metrics as TxnMetrics;
90use mz_txn_wal::txn_read::TxnsRead;
91use mz_txn_wal::txns::TxnsHandle;
92use timely::order::{PartialOrder, TotalOrder};
93use timely::progress::Timestamp as TimelyTimestamp;
94use timely::progress::frontier::MutableAntichain;
95use timely::progress::{Antichain, ChangeBatch, Timestamp};
96use tokio::sync::watch::{Sender, channel};
97use tokio::sync::{mpsc, oneshot};
98use tokio::time::MissedTickBehavior;
99use tokio::time::error::Elapsed;
100use tracing::{debug, info, warn};
101
102mod collection_mgmt;
103mod history;
104mod instance;
105mod persist_handles;
106mod rtr;
107mod statistics;
108
109#[derive(Derivative)]
110#[derivative(Debug)]
111struct PendingOneshotIngestion {
112 #[derivative(Debug = "ignore")]
114 result_tx: OneshotResultCallback<ProtoBatch>,
115 cluster_id: StorageInstanceId,
117}
118
119impl PendingOneshotIngestion {
120 pub(crate) fn cancel(self) {
124 (self.result_tx)(vec![Err("canceled".to_string())])
125 }
126}
127
128#[derive(Derivative)]
130#[derivative(Debug)]
131pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation>
132{
133 build_info: &'static BuildInfo,
135 now: NowFn,
137
138 read_only: bool,
144
145 pub(crate) collections: BTreeMap<GlobalId, CollectionState<T>>,
150
151 dropped_objects: BTreeMap<GlobalId, BTreeSet<ReplicaId>>,
160
161 pub(crate) persist_table_worker: persist_handles::PersistTableWriteWorker<T>,
163 txns_read: TxnsRead<T>,
165 txns_metrics: Arc<TxnMetrics>,
166 stashed_responses: Vec<(Option<ReplicaId>, StorageResponse<T>)>,
167 #[derivative(Debug = "ignore")]
169 pending_table_handle_drops_tx: mpsc::UnboundedSender<GlobalId>,
170 #[derivative(Debug = "ignore")]
172 pending_table_handle_drops_rx: mpsc::UnboundedReceiver<GlobalId>,
173 #[derivative(Debug = "ignore")]
175 pending_oneshot_ingestions: BTreeMap<uuid::Uuid, PendingOneshotIngestion>,
176
177 pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
179
180 pub(crate) introspection_ids: BTreeMap<IntrospectionType, GlobalId>,
182 introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
187
188 source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
193 sink_statistics: Arc<Mutex<BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSinkStatistics>>>,
196 statistics_interval_sender: Sender<Duration>,
198
199 instances: BTreeMap<StorageInstanceId, Instance<T>>,
201 initialized: bool,
203 config: StorageConfiguration,
205 persist_location: PersistLocation,
207 persist: Arc<PersistClientCache>,
209 metrics: StorageControllerMetrics,
211 recorded_frontiers: BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)>,
214 recorded_replica_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<T>>,
217
218 #[derivative(Debug = "ignore")]
220 wallclock_lag: WallclockLagFn<T>,
221 wallclock_lag_last_recorded: DateTime<Utc>,
223
224 storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
226 migrated_storage_collections: BTreeSet<GlobalId>,
228
229 maintenance_ticker: tokio::time::Interval,
231 maintenance_scheduled: bool,
233
234 instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
236 instance_response_rx: mpsc::UnboundedReceiver<(Option<ReplicaId>, StorageResponse<T>)>,
238
239 persist_warm_task: Option<AbortOnDropHandle<Box<dyn Debug + Send>>>,
241}
242
243fn warm_persist_state_in_background(
248 client: PersistClient,
249 shard_ids: impl Iterator<Item = ShardId> + Send + 'static,
250) -> mz_ore::task::JoinHandle<Box<dyn Debug + Send>> {
251 const MAX_CONCURRENT_WARMS: usize = 16;
253 let logic = async move {
254 let fetchers: Vec<_> = tokio_stream::iter(shard_ids)
255 .map(|shard_id| {
256 let client = client.clone();
257 async move {
258 client
259 .create_batch_fetcher::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
260 shard_id,
261 Arc::new(RelationDesc::empty()),
262 Arc::new(UnitSchema),
263 true,
264 Diagnostics::from_purpose("warm persist load state"),
265 )
266 .await
267 }
268 })
269 .buffer_unordered(MAX_CONCURRENT_WARMS)
270 .collect()
271 .await;
272 let fetchers: Box<dyn Debug + Send> = Box::new(fetchers);
273 fetchers
274 };
275 mz_ore::task::spawn(|| "warm_persist_load_state", logic)
276}
277
278#[async_trait(?Send)]
279impl<T> StorageController for Controller<T>
280where
281 T: Timestamp
282 + Lattice
283 + TotalOrder
284 + Codec64
285 + From<EpochMillis>
286 + TimestampManipulation
287 + Into<Datum<'static>>
288 + Display,
289{
290 type Timestamp = T;
291
292 fn initialization_complete(&mut self) {
293 self.reconcile_dangling_statistics();
294 self.initialized = true;
295
296 for instance in self.instances.values_mut() {
297 instance.send(StorageCommand::InitializationComplete);
298 }
299 }
300
301 fn update_parameters(&mut self, config_params: StorageParameters) {
302 self.storage_collections
303 .update_parameters(config_params.clone());
304
305 self.persist.cfg().apply_from(&config_params.dyncfg_updates);
308
309 for instance in self.instances.values_mut() {
310 let params = Box::new(config_params.clone());
311 instance.send(StorageCommand::UpdateConfiguration(params));
312 }
313 self.config.update(config_params);
314 self.statistics_interval_sender
315 .send_replace(self.config.parameters.statistics_interval);
316 self.collection_manager.update_user_batch_duration(
317 self.config
318 .parameters
319 .user_storage_managed_collections_batch_duration,
320 );
321 }
322
323 fn config(&self) -> &StorageConfiguration {
325 &self.config
326 }
327
328 fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing> {
329 self.storage_collections.collection_metadata(id)
330 }
331
332 fn collection_hydrated(
333 &self,
334 collection_id: GlobalId,
335 ) -> Result<bool, StorageError<Self::Timestamp>> {
336 let collection = self.collection(collection_id)?;
337
338 let instance_id = match &collection.data_source {
339 DataSource::Ingestion(ingestion_description) => ingestion_description.instance_id,
340 DataSource::IngestionExport { ingestion_id, .. } => {
341 let ingestion_state = self.collections.get(ingestion_id).expect("known to exist");
342
343 let instance_id = match &ingestion_state.data_source {
344 DataSource::Ingestion(ingestion_desc) => ingestion_desc.instance_id,
345 _ => unreachable!("SourceExport must only refer to primary source"),
346 };
347
348 instance_id
349 }
350 _ => return Ok(true),
351 };
352
353 let instance = self.instances.get(&instance_id).ok_or_else(|| {
354 StorageError::IngestionInstanceMissing {
355 storage_instance_id: instance_id,
356 ingestion_id: collection_id,
357 }
358 })?;
359
360 if instance.replica_ids().next().is_none() {
361 return Ok(true);
364 }
365
366 match &collection.extra_state {
367 CollectionStateExtra::Ingestion(ingestion_state) => {
368 Ok(ingestion_state.hydrated_on.len() >= 1)
370 }
371 CollectionStateExtra::Export(_) => {
372 Ok(true)
377 }
378 CollectionStateExtra::None => {
379 Ok(true)
383 }
384 }
385 }
386
387 #[mz_ore::instrument(level = "debug")]
388 fn collections_hydrated_on_replicas(
389 &self,
390 target_replica_ids: Option<Vec<ReplicaId>>,
391 target_cluster_id: &StorageInstanceId,
392 exclude_collections: &BTreeSet<GlobalId>,
393 ) -> Result<bool, StorageError<Self::Timestamp>> {
394 if target_replica_ids.as_ref().is_some_and(|v| v.is_empty()) {
397 return Ok(true);
398 }
399
400 let target_replicas: Option<BTreeSet<ReplicaId>> =
403 target_replica_ids.map(|ids| ids.into_iter().collect());
404
405 let mut all_hydrated = true;
406 for (collection_id, collection_state) in self.collections.iter() {
407 if collection_id.is_transient() || exclude_collections.contains(collection_id) {
408 continue;
409 }
410 let hydrated = match &collection_state.extra_state {
411 CollectionStateExtra::Ingestion(state) => {
412 if &state.instance_id != target_cluster_id {
413 continue;
414 }
415 match &target_replicas {
416 Some(target_replicas) => !state.hydrated_on.is_disjoint(target_replicas),
417 None => {
418 state.hydrated_on.len() >= 1
421 }
422 }
423 }
424 CollectionStateExtra::Export(_) => {
425 true
430 }
431 CollectionStateExtra::None => {
432 true
436 }
437 };
438 if !hydrated {
439 tracing::info!(%collection_id, "collection is not hydrated on any replica");
440 all_hydrated = false;
441 }
444 }
445 Ok(all_hydrated)
446 }
447
448 fn collection_frontiers(
449 &self,
450 id: GlobalId,
451 ) -> Result<(Antichain<Self::Timestamp>, Antichain<Self::Timestamp>), CollectionMissing> {
452 let frontiers = self.storage_collections.collection_frontiers(id)?;
453 Ok((frontiers.implied_capability, frontiers.write_frontier))
454 }
455
456 fn collections_frontiers(
457 &self,
458 mut ids: Vec<GlobalId>,
459 ) -> Result<Vec<(GlobalId, Antichain<T>, Antichain<T>)>, CollectionMissing> {
460 let mut result = vec![];
461 ids.retain(|&id| match self.export(id) {
466 Ok(export) => {
467 result.push((
468 id,
469 export.input_hold().since().clone(),
470 export.write_frontier.clone(),
471 ));
472 false
473 }
474 Err(_) => true,
475 });
476 result.extend(
477 self.storage_collections
478 .collections_frontiers(ids)?
479 .into_iter()
480 .map(|frontiers| {
481 (
482 frontiers.id,
483 frontiers.implied_capability,
484 frontiers.write_frontier,
485 )
486 }),
487 );
488
489 Ok(result)
490 }
491
492 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
493 self.storage_collections.active_collection_metadatas()
494 }
495
496 fn active_ingestion_exports(
497 &self,
498 instance_id: StorageInstanceId,
499 ) -> Box<dyn Iterator<Item = &GlobalId> + '_> {
500 let active_storage_collections: BTreeMap<_, _> = self
501 .storage_collections
502 .active_collection_frontiers()
503 .into_iter()
504 .map(|c| (c.id, c))
505 .collect();
506
507 let active_exports = self.instances[&instance_id]
508 .active_ingestion_exports()
509 .filter(move |id| {
510 let frontiers = active_storage_collections.get(id);
511 match frontiers {
512 Some(frontiers) => !frontiers.write_frontier.is_empty(),
513 None => {
514 false
516 }
517 }
518 });
519
520 Box::new(active_exports)
521 }
522
523 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
524 self.storage_collections.check_exists(id)
525 }
526
527 fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>) {
528 let metrics = self.metrics.for_instance(id);
529 let mut instance = Instance::new(
530 workload_class,
531 metrics,
532 self.now.clone(),
533 self.instance_response_tx.clone(),
534 );
535 if self.initialized {
536 instance.send(StorageCommand::InitializationComplete);
537 }
538 if !self.read_only {
539 instance.send(StorageCommand::AllowWrites);
540 }
541
542 let params = Box::new(self.config.parameters.clone());
543 instance.send(StorageCommand::UpdateConfiguration(params));
544
545 let old_instance = self.instances.insert(id, instance);
546 assert_none!(old_instance, "storage instance {id} already exists");
547 }
548
549 fn drop_instance(&mut self, id: StorageInstanceId) {
550 let instance = self.instances.remove(&id);
551 assert!(instance.is_some(), "storage instance {id} does not exist");
552 }
553
554 fn update_instance_workload_class(
555 &mut self,
556 id: StorageInstanceId,
557 workload_class: Option<String>,
558 ) {
559 let instance = self
560 .instances
561 .get_mut(&id)
562 .unwrap_or_else(|| panic!("instance {id} does not exist"));
563
564 instance.workload_class = workload_class;
565 }
566
567 fn connect_replica(
568 &mut self,
569 instance_id: StorageInstanceId,
570 replica_id: ReplicaId,
571 location: ClusterReplicaLocation,
572 ) {
573 let instance = self
574 .instances
575 .get_mut(&instance_id)
576 .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
577
578 let config = ReplicaConfig {
579 build_info: self.build_info,
580 location,
581 grpc_client: self.config.parameters.grpc_client.clone(),
582 };
583 instance.add_replica(replica_id, config);
584 }
585
586 fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId) {
587 let instance = self
588 .instances
589 .get_mut(&instance_id)
590 .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
591
592 let status_now = mz_ore::now::to_datetime((self.now)());
593 let mut source_status_updates = vec![];
594 let mut sink_status_updates = vec![];
595
596 let make_update = |id, object_type| StatusUpdate {
597 id,
598 status: Status::Paused,
599 timestamp: status_now,
600 error: None,
601 hints: BTreeSet::from([format!(
602 "The replica running this {object_type} has been dropped"
603 )]),
604 namespaced_errors: Default::default(),
605 replica_id: Some(replica_id),
606 };
607
608 for ingestion_id in instance.active_ingestions() {
609 if let Some(active_replicas) = self.dropped_objects.get_mut(ingestion_id) {
610 active_replicas.remove(&replica_id);
611 if active_replicas.is_empty() {
612 self.dropped_objects.remove(ingestion_id);
613 }
614 }
615
616 let ingestion = self
617 .collections
618 .get_mut(ingestion_id)
619 .expect("instance contains unknown ingestion");
620
621 let ingestion_description = match &ingestion.data_source {
622 DataSource::Ingestion(ingestion_description) => ingestion_description.clone(),
623 _ => panic!(
624 "unexpected data source for ingestion: {:?}",
625 ingestion.data_source
626 ),
627 };
628
629 let old_style_ingestion = *ingestion_id != ingestion_description.remap_collection_id;
630 let subsource_ids = ingestion_description.collection_ids().filter(|id| {
631 let should_discard =
636 old_style_ingestion && id == &ingestion_description.remap_collection_id;
637 !should_discard
638 });
639 for id in subsource_ids {
640 source_status_updates.push(make_update(id, "source"));
641 }
642 }
643
644 for id in instance.active_exports() {
645 if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
646 active_replicas.remove(&replica_id);
647 if active_replicas.is_empty() {
648 self.dropped_objects.remove(id);
649 }
650 }
651
652 sink_status_updates.push(make_update(*id, "sink"));
653 }
654
655 instance.drop_replica(replica_id);
656
657 if !self.read_only {
658 if !source_status_updates.is_empty() {
659 self.append_status_introspection_updates(
660 IntrospectionType::SourceStatusHistory,
661 source_status_updates,
662 );
663 }
664 if !sink_status_updates.is_empty() {
665 self.append_status_introspection_updates(
666 IntrospectionType::SinkStatusHistory,
667 sink_status_updates,
668 );
669 }
670 }
671 }
672
673 async fn evolve_nullability_for_bootstrap(
674 &mut self,
675 storage_metadata: &StorageMetadata,
676 collections: Vec<(GlobalId, RelationDesc)>,
677 ) -> Result<(), StorageError<Self::Timestamp>> {
678 let persist_client = self
679 .persist
680 .open(self.persist_location.clone())
681 .await
682 .unwrap();
683
684 for (global_id, relation_desc) in collections {
685 let shard_id = storage_metadata.get_collection_shard(global_id)?;
686 let diagnostics = Diagnostics {
687 shard_name: global_id.to_string(),
688 handle_purpose: "evolve nullability for bootstrap".to_string(),
689 };
690 let latest_schema = persist_client
691 .latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
692 .await
693 .expect("invalid persist usage");
694 let Some((schema_id, current_schema, _)) = latest_schema else {
695 tracing::debug!(?global_id, "no schema registered");
696 continue;
697 };
698 tracing::debug!(?global_id, ?current_schema, new_schema = ?relation_desc, "migrating schema");
699
700 let diagnostics = Diagnostics {
701 shard_name: global_id.to_string(),
702 handle_purpose: "evolve nullability for bootstrap".to_string(),
703 };
704 let evolve_result = persist_client
705 .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
706 shard_id,
707 schema_id,
708 &relation_desc,
709 &UnitSchema,
710 diagnostics,
711 )
712 .await
713 .expect("invalid persist usage");
714 match evolve_result {
715 CaESchema::Ok(_) => (),
716 CaESchema::ExpectedMismatch {
717 schema_id,
718 key,
719 val: _,
720 } => {
721 return Err(StorageError::PersistSchemaEvolveRace {
722 global_id,
723 shard_id,
724 schema_id,
725 relation_desc: key,
726 });
727 }
728 CaESchema::Incompatible => {
729 return Err(StorageError::PersistInvalidSchemaEvolve {
730 global_id,
731 shard_id,
732 });
733 }
734 };
735 }
736
737 Ok(())
738 }
739
740 #[instrument(name = "storage::create_collections")]
759 async fn create_collections_for_bootstrap(
760 &mut self,
761 storage_metadata: &StorageMetadata,
762 register_ts: Option<Self::Timestamp>,
763 mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
764 migrated_storage_collections: &BTreeSet<GlobalId>,
765 ) -> Result<(), StorageError<Self::Timestamp>> {
766 self.migrated_storage_collections
767 .extend(migrated_storage_collections.iter().cloned());
768
769 self.storage_collections
770 .create_collections_for_bootstrap(
771 storage_metadata,
772 register_ts.clone(),
773 collections.clone(),
774 migrated_storage_collections,
775 )
776 .await?;
777
778 drop(self.persist_warm_task.take());
781
782 collections.sort_by_key(|(id, _)| *id);
787 collections.dedup();
788 for pos in 1..collections.len() {
789 if collections[pos - 1].0 == collections[pos].0 {
790 return Err(StorageError::CollectionIdReused(collections[pos].0));
791 }
792 }
793
794 let enriched_with_metadata = collections
796 .into_iter()
797 .map(|(id, description)| {
798 let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
799
800 let txns_shard = description
803 .data_source
804 .in_txns()
805 .then(|| *self.txns_read.txns_id());
806
807 let metadata = CollectionMetadata {
808 persist_location: self.persist_location.clone(),
809 data_shard,
810 relation_desc: description.desc.clone(),
811 txns_shard,
812 };
813
814 Ok((id, description, metadata))
815 })
816 .collect_vec();
817
818 let persist_client = self
820 .persist
821 .open(self.persist_location.clone())
822 .await
823 .unwrap();
824 let persist_client = &persist_client;
825
826 use futures::stream::{StreamExt, TryStreamExt};
829 let this = &*self;
830 let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
831 .map(|data: Result<_, StorageError<Self::Timestamp>>| {
832 async move {
833 let (id, description, metadata) = data?;
834
835 debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
838
839 let write = this
840 .open_data_handles(
841 &id,
842 metadata.data_shard,
843 metadata.relation_desc.clone(),
844 persist_client,
845 )
846 .await;
847
848 Ok::<_, StorageError<T>>((id, description, write, metadata))
849 }
850 })
851 .buffer_unordered(50)
853 .try_collect()
866 .await?;
867
868 let mut to_execute = BTreeSet::new();
871 let mut new_collections = BTreeSet::new();
876 let mut table_registers = Vec::with_capacity(to_register.len());
877
878 to_register.sort_by_key(|(id, ..)| *id);
880
881 let (tables_to_register, collections_to_register): (Vec<_>, Vec<_>) = to_register
887 .into_iter()
888 .partition(|(_id, desc, ..)| desc.data_source == DataSource::Table);
889 let to_register = tables_to_register
890 .into_iter()
891 .rev()
892 .chain(collections_to_register.into_iter());
893
894 let mut new_webhook_statistic_entries = BTreeSet::new();
898
899 for (id, description, write, metadata) in to_register {
900 let is_in_txns = |id, metadata: &CollectionMetadata| {
901 metadata.txns_shard.is_some()
902 && !(self.read_only && migrated_storage_collections.contains(&id))
903 };
904
905 to_execute.insert(id);
906 new_collections.insert(id);
907
908 let write_frontier = write.upper();
909
910 let storage_dependencies = self.determine_collection_dependencies(id, &description)?;
912
913 let dependency_read_holds = self
914 .storage_collections
915 .acquire_read_holds(storage_dependencies)
916 .expect("can acquire read holds");
917
918 let mut dependency_since = Antichain::from_elem(T::minimum());
919 for read_hold in dependency_read_holds.iter() {
920 dependency_since.join_assign(read_hold.since());
921 }
922
923 let data_source = description.data_source;
924
925 if !dependency_read_holds.is_empty()
934 && !is_in_txns(id, &metadata)
935 && !matches!(&data_source, DataSource::Sink { .. })
936 {
937 if dependency_since.is_empty() {
943 halt!(
944 "dependency since frontier is empty while dependent upper \
945 is not empty (dependent id={id}, write_frontier={:?}, dependency_read_holds={:?}), \
946 this indicates concurrent deletion of a collection",
947 write_frontier,
948 dependency_read_holds,
949 );
950 }
951
952 if description.primary.is_none() {
970 mz_ore::soft_assert_or_log!(
971 write_frontier.elements() == &[T::minimum()]
972 || write_frontier.is_empty()
973 || PartialOrder::less_than(&dependency_since, write_frontier),
974 "dependency since has advanced past dependent ({id}) upper \n
975 dependent ({id}): upper {:?} \n
976 dependency since {:?} \n
977 dependency read holds: {:?}",
978 write_frontier,
979 dependency_since,
980 dependency_read_holds,
981 );
982 }
983 }
984
985 let mut extra_state = CollectionStateExtra::None;
987 let mut maybe_instance_id = None;
988 match &data_source {
989 DataSource::Introspection(typ) => {
990 debug!(
991 ?data_source, meta = ?metadata,
992 "registering {id} with persist monotonic worker",
993 );
994 self.register_introspection_collection(
1000 id,
1001 *typ,
1002 write,
1003 persist_client.clone(),
1004 )?;
1005 }
1006 DataSource::Webhook => {
1007 debug!(
1008 ?data_source, meta = ?metadata,
1009 "registering {id} with persist monotonic worker",
1010 );
1011 new_webhook_statistic_entries.insert(id);
1014 self.collection_manager
1020 .register_append_only_collection(id, write, false, None);
1021 }
1022 DataSource::IngestionExport {
1023 ingestion_id,
1024 details,
1025 data_config,
1026 } => {
1027 debug!(
1028 ?data_source, meta = ?metadata,
1029 "not registering {id} with a controller persist worker",
1030 );
1031 let ingestion_state = self
1033 .collections
1034 .get_mut(ingestion_id)
1035 .expect("known to exist");
1036
1037 let instance_id = match &mut ingestion_state.data_source {
1038 DataSource::Ingestion(ingestion_desc) => {
1039 ingestion_desc.source_exports.insert(
1040 id,
1041 SourceExport {
1042 storage_metadata: (),
1043 details: details.clone(),
1044 data_config: data_config.clone(),
1045 },
1046 );
1047
1048 ingestion_desc.instance_id
1053 }
1054 _ => unreachable!(
1055 "SourceExport must only refer to primary sources that already exist"
1056 ),
1057 };
1058
1059 to_execute.remove(&id);
1061 to_execute.insert(*ingestion_id);
1062
1063 let ingestion_state = IngestionState {
1064 read_capabilities: MutableAntichain::from(dependency_since.clone()),
1065 dependency_read_holds,
1066 derived_since: dependency_since,
1067 write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1068 hold_policy: ReadPolicy::step_back(),
1069 instance_id,
1070 hydrated_on: BTreeSet::new(),
1071 };
1072
1073 extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1074 maybe_instance_id = Some(instance_id);
1075 }
1076 DataSource::Table => {
1077 debug!(
1078 ?data_source, meta = ?metadata,
1079 "registering {id} with persist table worker",
1080 );
1081 table_registers.push((id, write));
1082 }
1083 DataSource::Progress | DataSource::Other => {
1084 debug!(
1085 ?data_source, meta = ?metadata,
1086 "not registering {id} with a controller persist worker",
1087 );
1088 }
1089 DataSource::Ingestion(ingestion_desc) => {
1090 debug!(
1091 ?data_source, meta = ?metadata,
1092 "not registering {id} with a controller persist worker",
1093 );
1094
1095 let mut dependency_since = Antichain::from_elem(T::minimum());
1096 for read_hold in dependency_read_holds.iter() {
1097 dependency_since.join_assign(read_hold.since());
1098 }
1099
1100 let ingestion_state = IngestionState {
1101 read_capabilities: MutableAntichain::from(dependency_since.clone()),
1102 dependency_read_holds,
1103 derived_since: dependency_since,
1104 write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1105 hold_policy: ReadPolicy::step_back(),
1106 instance_id: ingestion_desc.instance_id,
1107 hydrated_on: BTreeSet::new(),
1108 };
1109
1110 extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1111 maybe_instance_id = Some(ingestion_desc.instance_id);
1112 }
1113 DataSource::Sink { desc } => {
1114 let mut dependency_since = Antichain::from_elem(T::minimum());
1115 for read_hold in dependency_read_holds.iter() {
1116 dependency_since.join_assign(read_hold.since());
1117 }
1118
1119 let [self_hold, read_hold] =
1120 dependency_read_holds.try_into().expect("two holds");
1121
1122 let state = ExportState::new(
1123 desc.instance_id,
1124 read_hold,
1125 self_hold,
1126 write_frontier.clone(),
1127 ReadPolicy::step_back(),
1128 );
1129 maybe_instance_id = Some(state.cluster_id);
1130 extra_state = CollectionStateExtra::Export(state);
1131 }
1132 }
1133
1134 let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(id, maybe_instance_id);
1135 let collection_state =
1136 CollectionState::new(data_source, metadata, extra_state, wallclock_lag_metrics);
1137
1138 self.collections.insert(id, collection_state);
1139 }
1140
1141 {
1142 let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1143
1144 for id in new_webhook_statistic_entries {
1147 source_statistics.webhook_statistics.entry(id).or_default();
1148 }
1149
1150 }
1154
1155 if !table_registers.is_empty() {
1157 let register_ts = register_ts
1158 .expect("caller should have provided a register_ts when creating a table");
1159
1160 if self.read_only {
1161 table_registers
1171 .retain(|(id, _write_handle)| migrated_storage_collections.contains(id));
1172
1173 self.persist_table_worker
1174 .register(register_ts, table_registers)
1175 .await
1176 .expect("table worker unexpectedly shut down");
1177 } else {
1178 self.persist_table_worker
1179 .register(register_ts, table_registers)
1180 .await
1181 .expect("table worker unexpectedly shut down");
1182 }
1183 }
1184
1185 self.append_shard_mappings(new_collections.into_iter(), Diff::ONE);
1186
1187 for id in to_execute {
1189 match &self.collection(id)?.data_source {
1190 DataSource::Ingestion(ingestion) => {
1191 if !self.read_only
1192 || (ENABLE_0DT_DEPLOYMENT_SOURCES.get(self.config.config_set())
1193 && ingestion.desc.connection.supports_read_only())
1194 {
1195 self.run_ingestion(id)?;
1196 }
1197 }
1198 DataSource::IngestionExport { .. } => unreachable!(
1199 "ingestion exports do not execute directly, but instead schedule their source to be re-executed"
1200 ),
1201 DataSource::Introspection(_)
1202 | DataSource::Webhook
1203 | DataSource::Table
1204 | DataSource::Progress
1205 | DataSource::Other => {}
1206 DataSource::Sink { .. } => {
1207 if !self.read_only {
1208 self.run_export(id)?;
1209 }
1210 }
1211 };
1212 }
1213
1214 Ok(())
1215 }
1216
1217 fn check_alter_ingestion_source_desc(
1218 &mut self,
1219 ingestion_id: GlobalId,
1220 source_desc: &SourceDesc,
1221 ) -> Result<(), StorageError<Self::Timestamp>> {
1222 let source_collection = self.collection(ingestion_id)?;
1223 let data_source = &source_collection.data_source;
1224 match &data_source {
1225 DataSource::Ingestion(cur_ingestion) => {
1226 cur_ingestion
1227 .desc
1228 .alter_compatible(ingestion_id, source_desc)?;
1229 }
1230 o => {
1231 tracing::info!(
1232 "{ingestion_id} inalterable because its data source is {:?} and not an ingestion",
1233 o
1234 );
1235 Err(AlterError { id: ingestion_id })?
1236 }
1237 }
1238
1239 Ok(())
1240 }
1241
1242 async fn alter_ingestion_source_desc(
1243 &mut self,
1244 ingestion_ids: BTreeMap<GlobalId, SourceDesc>,
1245 ) -> Result<(), StorageError<Self::Timestamp>> {
1246 let mut ingestions_to_run = BTreeSet::new();
1247
1248 for (id, new_desc) in ingestion_ids {
1249 let collection = self
1250 .collections
1251 .get_mut(&id)
1252 .ok_or_else(|| StorageError::IdentifierMissing(id))?;
1253
1254 match &mut collection.data_source {
1255 DataSource::Ingestion(ingestion) => {
1256 if ingestion.desc != new_desc {
1257 tracing::info!(
1258 from = ?ingestion.desc,
1259 to = ?new_desc,
1260 "alter_ingestion_source_desc, updating"
1261 );
1262 ingestion.desc = new_desc;
1263 ingestions_to_run.insert(id);
1264 }
1265 }
1266 o => {
1267 tracing::warn!("alter_ingestion_source_desc called on {:?}", o);
1268 Err(StorageError::IdentifierInvalid(id))?;
1269 }
1270 }
1271 }
1272
1273 for id in ingestions_to_run {
1274 self.run_ingestion(id)?;
1275 }
1276 Ok(())
1277 }
1278
1279 async fn alter_ingestion_connections(
1280 &mut self,
1281 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
1282 ) -> Result<(), StorageError<Self::Timestamp>> {
1283 let mut ingestions_to_run = BTreeSet::new();
1284
1285 for (id, conn) in source_connections {
1286 let collection = self
1287 .collections
1288 .get_mut(&id)
1289 .ok_or_else(|| StorageError::IdentifierMissing(id))?;
1290
1291 match &mut collection.data_source {
1292 DataSource::Ingestion(ingestion) => {
1293 if ingestion.desc.connection != conn {
1296 tracing::info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
1297 ingestion.desc.connection = conn;
1298 ingestions_to_run.insert(id);
1299 } else {
1300 tracing::warn!(
1301 "update_source_connection called on {id} but the \
1302 connection was the same"
1303 );
1304 }
1305 }
1306 o => {
1307 tracing::warn!("update_source_connection called on {:?}", o);
1308 Err(StorageError::IdentifierInvalid(id))?;
1309 }
1310 }
1311 }
1312
1313 for id in ingestions_to_run {
1314 self.run_ingestion(id)?;
1315 }
1316 Ok(())
1317 }
1318
1319 async fn alter_ingestion_export_data_configs(
1320 &mut self,
1321 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
1322 ) -> Result<(), StorageError<Self::Timestamp>> {
1323 let mut ingestions_to_run = BTreeSet::new();
1324
1325 for (source_export_id, new_data_config) in source_exports {
1326 let source_export_collection = self
1329 .collections
1330 .get_mut(&source_export_id)
1331 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1332 let ingestion_id = match &mut source_export_collection.data_source {
1333 DataSource::IngestionExport {
1334 ingestion_id,
1335 details: _,
1336 data_config,
1337 } => {
1338 *data_config = new_data_config.clone();
1339 *ingestion_id
1340 }
1341 o => {
1342 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1343 Err(StorageError::IdentifierInvalid(source_export_id))?
1344 }
1345 };
1346 let ingestion_collection = self
1349 .collections
1350 .get_mut(&ingestion_id)
1351 .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
1352
1353 match &mut ingestion_collection.data_source {
1354 DataSource::Ingestion(ingestion_desc) => {
1355 let source_export = ingestion_desc
1356 .source_exports
1357 .get_mut(&source_export_id)
1358 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1359
1360 if source_export.data_config != new_data_config {
1363 tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
1364 source_export.data_config = new_data_config;
1365
1366 ingestions_to_run.insert(ingestion_id);
1367 } else {
1368 tracing::warn!(
1369 "alter_ingestion_export_data_configs called on \
1370 export {source_export_id} of {ingestion_id} but \
1371 the data config was the same"
1372 );
1373 }
1374 }
1375 o => {
1376 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1377 Err(StorageError::IdentifierInvalid(ingestion_id))?
1378 }
1379 }
1380 }
1381
1382 for id in ingestions_to_run {
1383 self.run_ingestion(id)?;
1384 }
1385 Ok(())
1386 }
1387
1388 async fn alter_table_desc(
1389 &mut self,
1390 existing_collection: GlobalId,
1391 new_collection: GlobalId,
1392 new_desc: RelationDesc,
1393 expected_version: RelationVersion,
1394 register_ts: Self::Timestamp,
1395 ) -> Result<(), StorageError<Self::Timestamp>> {
1396 let data_shard = {
1397 let Controller {
1398 collections,
1399 storage_collections,
1400 ..
1401 } = self;
1402
1403 let existing = collections
1404 .get(&existing_collection)
1405 .ok_or(StorageError::IdentifierMissing(existing_collection))?;
1406 if existing.data_source != DataSource::Table {
1407 return Err(StorageError::IdentifierInvalid(existing_collection));
1408 }
1409
1410 storage_collections
1412 .alter_table_desc(
1413 existing_collection,
1414 new_collection,
1415 new_desc.clone(),
1416 expected_version,
1417 )
1418 .await?;
1419
1420 existing.collection_metadata.data_shard.clone()
1421 };
1422
1423 let persist_client = self
1424 .persist
1425 .open(self.persist_location.clone())
1426 .await
1427 .expect("invalid persist location");
1428 let write_handle = self
1429 .open_data_handles(
1430 &existing_collection,
1431 data_shard,
1432 new_desc.clone(),
1433 &persist_client,
1434 )
1435 .await;
1436
1437 let collection_meta = CollectionMetadata {
1438 persist_location: self.persist_location.clone(),
1439 data_shard,
1440 relation_desc: new_desc.clone(),
1441 txns_shard: Some(self.txns_read.txns_id().clone()),
1443 };
1444 let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(new_collection, None);
1446 let collection_state = CollectionState::new(
1447 DataSource::Table,
1448 collection_meta,
1449 CollectionStateExtra::None,
1450 wallclock_lag_metrics,
1451 );
1452
1453 self.collections.insert(new_collection, collection_state);
1456
1457 self.persist_table_worker
1458 .register(register_ts, vec![(new_collection, write_handle)])
1459 .await
1460 .expect("table worker unexpectedly shut down");
1461
1462 self.append_shard_mappings([new_collection].into_iter(), Diff::ONE);
1463
1464 Ok(())
1465 }
1466
1467 fn export(
1468 &self,
1469 id: GlobalId,
1470 ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1471 self.collections
1472 .get(&id)
1473 .and_then(|c| match &c.extra_state {
1474 CollectionStateExtra::Export(state) => Some(state),
1475 _ => None,
1476 })
1477 .ok_or(StorageError::IdentifierMissing(id))
1478 }
1479
1480 fn export_mut(
1481 &mut self,
1482 id: GlobalId,
1483 ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1484 self.collections
1485 .get_mut(&id)
1486 .and_then(|c| match &mut c.extra_state {
1487 CollectionStateExtra::Export(state) => Some(state),
1488 _ => None,
1489 })
1490 .ok_or(StorageError::IdentifierMissing(id))
1491 }
1492
1493 async fn create_oneshot_ingestion(
1495 &mut self,
1496 ingestion_id: uuid::Uuid,
1497 collection_id: GlobalId,
1498 instance_id: StorageInstanceId,
1499 request: OneshotIngestionRequest,
1500 result_tx: OneshotResultCallback<ProtoBatch>,
1501 ) -> Result<(), StorageError<Self::Timestamp>> {
1502 let collection_meta = self
1503 .collections
1504 .get(&collection_id)
1505 .ok_or_else(|| StorageError::IdentifierMissing(collection_id))?
1506 .collection_metadata
1507 .clone();
1508 let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1509 StorageError::Generic(anyhow::anyhow!("missing cluster {instance_id}"))
1511 })?;
1512 let oneshot_cmd = RunOneshotIngestion {
1513 ingestion_id,
1514 collection_id,
1515 collection_meta,
1516 request,
1517 };
1518
1519 if !self.read_only {
1520 instance.send(StorageCommand::RunOneshotIngestion(Box::new(oneshot_cmd)));
1521 let pending = PendingOneshotIngestion {
1522 result_tx,
1523 cluster_id: instance_id,
1524 };
1525 let novel = self
1526 .pending_oneshot_ingestions
1527 .insert(ingestion_id, pending);
1528 assert_none!(novel);
1529 Ok(())
1530 } else {
1531 Err(StorageError::ReadOnly)
1532 }
1533 }
1534
1535 fn cancel_oneshot_ingestion(
1536 &mut self,
1537 ingestion_id: uuid::Uuid,
1538 ) -> Result<(), StorageError<Self::Timestamp>> {
1539 if self.read_only {
1540 return Err(StorageError::ReadOnly);
1541 }
1542
1543 let pending = self
1544 .pending_oneshot_ingestions
1545 .remove(&ingestion_id)
1546 .ok_or_else(|| {
1547 StorageError::Generic(anyhow::anyhow!("missing oneshot ingestion {ingestion_id}"))
1549 })?;
1550
1551 match self.instances.get_mut(&pending.cluster_id) {
1552 Some(instance) => {
1553 instance.send(StorageCommand::CancelOneshotIngestion(ingestion_id));
1554 }
1555 None => {
1556 mz_ore::soft_panic_or_log!(
1557 "canceling oneshot ingestion on non-existent cluster, ingestion {:?}, instance {}",
1558 ingestion_id,
1559 pending.cluster_id,
1560 );
1561 }
1562 }
1563 pending.cancel();
1565
1566 Ok(())
1567 }
1568
1569 async fn alter_export(
1570 &mut self,
1571 id: GlobalId,
1572 new_description: ExportDescription<Self::Timestamp>,
1573 ) -> Result<(), StorageError<Self::Timestamp>> {
1574 let from_id = new_description.sink.from;
1575
1576 let desired_read_holds = vec![from_id.clone(), id.clone()];
1579 let [input_hold, self_hold] = self
1580 .storage_collections
1581 .acquire_read_holds(desired_read_holds)
1582 .expect("missing dependency")
1583 .try_into()
1584 .expect("expected number of holds");
1585 let from_storage_metadata = self.storage_collections.collection_metadata(from_id)?;
1586 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1587
1588 let cur_export = self.export_mut(id)?;
1590 let input_readable = cur_export
1591 .write_frontier
1592 .iter()
1593 .all(|t| input_hold.since().less_than(t));
1594 if !input_readable {
1595 return Err(StorageError::ReadBeforeSince(from_id));
1596 }
1597
1598 let new_export = ExportState {
1599 read_capabilities: cur_export.read_capabilities.clone(),
1600 cluster_id: new_description.instance_id,
1601 derived_since: cur_export.derived_since.clone(),
1602 read_holds: [input_hold, self_hold],
1603 read_policy: cur_export.read_policy.clone(),
1604 write_frontier: cur_export.write_frontier.clone(),
1605 };
1606 *cur_export = new_export;
1607
1608 let with_snapshot = new_description.sink.with_snapshot
1615 && !PartialOrder::less_than(&new_description.sink.as_of, &cur_export.write_frontier);
1616
1617 let cmd = RunSinkCommand {
1618 id,
1619 description: StorageSinkDesc {
1620 from: from_id,
1621 from_desc: new_description.sink.from_desc,
1622 connection: new_description.sink.connection,
1623 envelope: new_description.sink.envelope,
1624 as_of: new_description.sink.as_of,
1625 version: new_description.sink.version,
1626 from_storage_metadata,
1627 with_snapshot,
1628 to_storage_metadata,
1629 commit_interval: new_description.sink.commit_interval,
1630 },
1631 };
1632
1633 let instance = self
1635 .instances
1636 .get_mut(&new_description.instance_id)
1637 .ok_or_else(|| StorageError::ExportInstanceMissing {
1638 storage_instance_id: new_description.instance_id,
1639 export_id: id,
1640 })?;
1641
1642 instance.send(StorageCommand::RunSink(Box::new(cmd)));
1643 Ok(())
1644 }
1645
1646 async fn alter_export_connections(
1648 &mut self,
1649 exports: BTreeMap<GlobalId, StorageSinkConnection>,
1650 ) -> Result<(), StorageError<Self::Timestamp>> {
1651 let mut updates_by_instance =
1652 BTreeMap::<StorageInstanceId, Vec<(RunSinkCommand<T>, ExportDescription<T>)>>::new();
1653
1654 for (id, connection) in exports {
1655 let (mut new_export_description, as_of): (ExportDescription<Self::Timestamp>, _) = {
1663 let export = &self.collections[&id];
1664 let DataSource::Sink { desc } = &export.data_source else {
1665 panic!("export exists")
1666 };
1667 let CollectionStateExtra::Export(state) = &export.extra_state else {
1668 panic!("export exists")
1669 };
1670 let export_description = desc.clone();
1671 let as_of = state.input_hold().since().clone();
1672
1673 (export_description, as_of)
1674 };
1675 let current_sink = new_export_description.sink.clone();
1676
1677 new_export_description.sink.connection = connection;
1678
1679 current_sink.alter_compatible(id, &new_export_description.sink)?;
1681
1682 let from_storage_metadata = self
1683 .storage_collections
1684 .collection_metadata(new_export_description.sink.from)?;
1685 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1686
1687 let cmd = RunSinkCommand {
1688 id,
1689 description: StorageSinkDesc {
1690 from: new_export_description.sink.from,
1691 from_desc: new_export_description.sink.from_desc.clone(),
1692 connection: new_export_description.sink.connection.clone(),
1693 envelope: new_export_description.sink.envelope,
1694 with_snapshot: new_export_description.sink.with_snapshot,
1695 version: new_export_description.sink.version,
1696 as_of: as_of.to_owned(),
1707 from_storage_metadata,
1708 to_storage_metadata,
1709 commit_interval: new_export_description.sink.commit_interval,
1710 },
1711 };
1712
1713 let update = updates_by_instance
1714 .entry(new_export_description.instance_id)
1715 .or_default();
1716 update.push((cmd, new_export_description));
1717 }
1718
1719 for (instance_id, updates) in updates_by_instance {
1720 let mut export_updates = BTreeMap::new();
1721 let mut cmds = Vec::with_capacity(updates.len());
1722
1723 for (cmd, export_state) in updates {
1724 export_updates.insert(cmd.id, export_state);
1725 cmds.push(cmd);
1726 }
1727
1728 let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1730 StorageError::ExportInstanceMissing {
1731 storage_instance_id: instance_id,
1732 export_id: *export_updates
1733 .keys()
1734 .next()
1735 .expect("set of exports not empty"),
1736 }
1737 })?;
1738
1739 for cmd in cmds {
1740 instance.send(StorageCommand::RunSink(Box::new(cmd)));
1741 }
1742
1743 for (id, new_export_description) in export_updates {
1745 let Some(state) = self.collections.get_mut(&id) else {
1746 panic!("export known to exist")
1747 };
1748 let DataSource::Sink { desc } = &mut state.data_source else {
1749 panic!("export known to exist")
1750 };
1751 *desc = new_export_description;
1752 }
1753 }
1754
1755 Ok(())
1756 }
1757
1758 fn drop_tables(
1773 &mut self,
1774 storage_metadata: &StorageMetadata,
1775 identifiers: Vec<GlobalId>,
1776 ts: Self::Timestamp,
1777 ) -> Result<(), StorageError<Self::Timestamp>> {
1778 let (table_write_ids, data_source_ids): (Vec<_>, Vec<_>) = identifiers
1780 .into_iter()
1781 .partition(|id| match self.collections[id].data_source {
1782 DataSource::Table => true,
1783 DataSource::IngestionExport { .. } | DataSource::Webhook => false,
1784 _ => panic!("identifier is not a table: {}", id),
1785 });
1786
1787 if table_write_ids.len() > 0 {
1789 let drop_notif = self
1790 .persist_table_worker
1791 .drop_handles(table_write_ids.clone(), ts);
1792 let tx = self.pending_table_handle_drops_tx.clone();
1793 mz_ore::task::spawn(|| "table-cleanup".to_string(), async move {
1794 drop_notif.await;
1795 for identifier in table_write_ids {
1796 let _ = tx.send(identifier);
1797 }
1798 });
1799 }
1800
1801 if data_source_ids.len() > 0 {
1803 self.validate_collection_ids(data_source_ids.iter().cloned())?;
1804 self.drop_sources_unvalidated(storage_metadata, data_source_ids)?;
1805 }
1806
1807 Ok(())
1808 }
1809
1810 fn drop_sources(
1811 &mut self,
1812 storage_metadata: &StorageMetadata,
1813 identifiers: Vec<GlobalId>,
1814 ) -> Result<(), StorageError<Self::Timestamp>> {
1815 self.validate_collection_ids(identifiers.iter().cloned())?;
1816 self.drop_sources_unvalidated(storage_metadata, identifiers)
1817 }
1818
1819 fn drop_sources_unvalidated(
1820 &mut self,
1821 storage_metadata: &StorageMetadata,
1822 ids: Vec<GlobalId>,
1823 ) -> Result<(), StorageError<Self::Timestamp>> {
1824 let mut ingestions_to_execute = BTreeSet::new();
1827 let mut ingestions_to_drop = BTreeSet::new();
1828 let mut source_statistics_to_drop = Vec::new();
1829
1830 let mut collections_to_drop = Vec::new();
1834
1835 for id in ids.iter() {
1836 let collection_state = self.collections.get(id);
1837
1838 if let Some(collection_state) = collection_state {
1839 match collection_state.data_source {
1840 DataSource::Webhook => {
1841 let fut = self.collection_manager.unregister_collection(*id);
1844 mz_ore::task::spawn(|| format!("storage-webhook-cleanup-{id}"), fut);
1845
1846 collections_to_drop.push(*id);
1847 source_statistics_to_drop.push(*id);
1848 }
1849 DataSource::Ingestion(_) => {
1850 ingestions_to_drop.insert(*id);
1851 source_statistics_to_drop.push(*id);
1852 }
1853 DataSource::IngestionExport { ingestion_id, .. } => {
1854 ingestions_to_execute.insert(ingestion_id);
1861
1862 let ingestion_state = match self.collections.get_mut(&ingestion_id) {
1864 Some(ingestion_collection) => ingestion_collection,
1865 None => {
1867 tracing::error!(
1868 "primary source {ingestion_id} seemingly dropped before subsource {id}"
1869 );
1870 continue;
1871 }
1872 };
1873
1874 match &mut ingestion_state.data_source {
1875 DataSource::Ingestion(ingestion_desc) => {
1876 let removed = ingestion_desc.source_exports.remove(id);
1877 mz_ore::soft_assert_or_log!(
1878 removed.is_some(),
1879 "dropped subsource {id} already removed from source exports"
1880 );
1881 }
1882 _ => unreachable!(
1883 "SourceExport must only refer to primary sources that already exist"
1884 ),
1885 };
1886
1887 ingestions_to_drop.insert(*id);
1891 source_statistics_to_drop.push(*id);
1892 }
1893 DataSource::Progress | DataSource::Table | DataSource::Other => {
1894 collections_to_drop.push(*id);
1895 }
1896 DataSource::Introspection(_) | DataSource::Sink { .. } => {
1897 soft_panic_or_log!(
1900 "drop_sources called on a {:?} (id={id}))",
1901 collection_state.data_source,
1902 );
1903 }
1904 }
1905 }
1906 }
1907
1908 ingestions_to_execute.retain(|id| !ingestions_to_drop.contains(id));
1910 for ingestion_id in ingestions_to_execute {
1911 self.run_ingestion(ingestion_id)?;
1912 }
1913
1914 let ingestion_policies = ingestions_to_drop
1921 .iter()
1922 .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
1923 .collect();
1924
1925 tracing::debug!(
1926 ?ingestion_policies,
1927 "dropping sources by setting read hold policies"
1928 );
1929 self.set_hold_policies(ingestion_policies);
1930
1931 let shards_to_update: BTreeSet<_> = ingestions_to_drop
1933 .iter()
1934 .chain(collections_to_drop.iter())
1935 .cloned()
1936 .collect();
1937 self.append_shard_mappings(shards_to_update.into_iter(), Diff::MINUS_ONE);
1938
1939 let status_now = mz_ore::now::to_datetime((self.now)());
1940 let mut status_updates = vec![];
1941 for id in ingestions_to_drop.iter() {
1942 status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
1943 }
1944
1945 if !self.read_only {
1946 self.append_status_introspection_updates(
1947 IntrospectionType::SourceStatusHistory,
1948 status_updates,
1949 );
1950 }
1951
1952 {
1953 let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1954 for id in source_statistics_to_drop {
1955 source_statistics
1956 .source_statistics
1957 .retain(|(stats_id, _), _| stats_id != &id);
1958 source_statistics
1959 .webhook_statistics
1960 .retain(|stats_id, _| stats_id != &id);
1961 }
1962 }
1963
1964 for id in ingestions_to_drop.iter().chain(collections_to_drop.iter()) {
1966 tracing::info!(%id, "dropping collection state");
1967 let collection = self
1968 .collections
1969 .remove(id)
1970 .expect("list populated after checking that self.collections contains it");
1971
1972 let instance = match &collection.extra_state {
1973 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
1974 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
1975 CollectionStateExtra::None => None,
1976 }
1977 .and_then(|i| self.instances.get(&i));
1978
1979 if let Some(instance) = instance {
1983 let active_replicas = instance.get_active_replicas_for_object(id);
1984 if !active_replicas.is_empty() {
1985 match &collection.data_source {
1992 DataSource::Ingestion(ingestion_desc) => {
1993 if *id != ingestion_desc.remap_collection_id {
1994 self.dropped_objects.insert(
1995 ingestion_desc.remap_collection_id,
1996 active_replicas.clone(),
1997 );
1998 }
1999 }
2000 _ => {}
2001 }
2002
2003 self.dropped_objects.insert(*id, active_replicas);
2004 }
2005 }
2006 }
2007
2008 self.storage_collections
2010 .drop_collections_unvalidated(storage_metadata, ids);
2011
2012 Ok(())
2013 }
2014
2015 fn drop_sinks(
2017 &mut self,
2018 storage_metadata: &StorageMetadata,
2019 identifiers: Vec<GlobalId>,
2020 ) -> Result<(), StorageError<Self::Timestamp>> {
2021 self.validate_export_ids(identifiers.iter().cloned())?;
2022 self.drop_sinks_unvalidated(storage_metadata, identifiers);
2023 Ok(())
2024 }
2025
2026 fn drop_sinks_unvalidated(
2027 &mut self,
2028 storage_metadata: &StorageMetadata,
2029 mut sinks_to_drop: Vec<GlobalId>,
2030 ) {
2031 sinks_to_drop.retain(|id| self.export(*id).is_ok());
2033
2034 let drop_policy = sinks_to_drop
2041 .iter()
2042 .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
2043 .collect();
2044
2045 tracing::debug!(
2046 ?drop_policy,
2047 "dropping sources by setting read hold policies"
2048 );
2049 self.set_hold_policies(drop_policy);
2050
2051 let status_now = mz_ore::now::to_datetime((self.now)());
2058
2059 let mut status_updates = vec![];
2061 {
2062 let mut sink_statistics = self.sink_statistics.lock().expect("poisoned");
2063 for id in sinks_to_drop.iter() {
2064 status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
2065 sink_statistics.retain(|(stats_id, _), _| stats_id != id);
2066 }
2067 }
2068
2069 if !self.read_only {
2070 self.append_status_introspection_updates(
2071 IntrospectionType::SinkStatusHistory,
2072 status_updates,
2073 );
2074 }
2075
2076 for id in sinks_to_drop.iter() {
2078 tracing::info!(%id, "dropping export state");
2079 let collection = self
2080 .collections
2081 .remove(id)
2082 .expect("list populated after checking that self.collections contains it");
2083
2084 let instance = match &collection.extra_state {
2085 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
2086 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
2087 CollectionStateExtra::None => None,
2088 }
2089 .and_then(|i| self.instances.get(&i));
2090
2091 if let Some(instance) = instance {
2095 let active_replicas = instance.get_active_replicas_for_object(id);
2096 if !active_replicas.is_empty() {
2097 self.dropped_objects.insert(*id, active_replicas);
2098 }
2099 }
2100 }
2101
2102 self.storage_collections
2104 .drop_collections_unvalidated(storage_metadata, sinks_to_drop);
2105 }
2106
2107 #[instrument(level = "debug")]
2108 fn append_table(
2109 &mut self,
2110 write_ts: Self::Timestamp,
2111 advance_to: Self::Timestamp,
2112 commands: Vec<(GlobalId, Vec<TableData>)>,
2113 ) -> Result<
2114 tokio::sync::oneshot::Receiver<Result<(), StorageError<Self::Timestamp>>>,
2115 StorageError<Self::Timestamp>,
2116 > {
2117 if self.read_only {
2118 if !commands
2121 .iter()
2122 .all(|(id, _)| id.is_system() && self.migrated_storage_collections.contains(id))
2123 {
2124 return Err(StorageError::ReadOnly);
2125 }
2126 }
2127
2128 for (id, updates) in commands.iter() {
2130 if !updates.is_empty() {
2131 if !write_ts.less_than(&advance_to) {
2132 return Err(StorageError::UpdateBeyondUpper(*id));
2133 }
2134 }
2135 }
2136
2137 Ok(self
2138 .persist_table_worker
2139 .append(write_ts, advance_to, commands))
2140 }
2141
2142 fn monotonic_appender(
2143 &self,
2144 id: GlobalId,
2145 ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>> {
2146 self.collection_manager.monotonic_appender(id)
2147 }
2148
2149 fn webhook_statistics(
2150 &self,
2151 id: GlobalId,
2152 ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>> {
2153 let source_statistics = self.source_statistics.lock().expect("poisoned");
2155 source_statistics
2156 .webhook_statistics
2157 .get(&id)
2158 .cloned()
2159 .ok_or(StorageError::IdentifierMissing(id))
2160 }
2161
2162 async fn ready(&mut self) {
2163 if self.maintenance_scheduled {
2164 return;
2165 }
2166
2167 if !self.pending_table_handle_drops_rx.is_empty() {
2168 return;
2169 }
2170
2171 tokio::select! {
2172 Some(m) = self.instance_response_rx.recv() => {
2173 self.stashed_responses.push(m);
2174 while let Ok(m) = self.instance_response_rx.try_recv() {
2175 self.stashed_responses.push(m);
2176 }
2177 }
2178 _ = self.maintenance_ticker.tick() => {
2179 self.maintenance_scheduled = true;
2180 },
2181 };
2182 }
2183
2184 #[instrument(level = "debug")]
2185 fn process(
2186 &mut self,
2187 storage_metadata: &StorageMetadata,
2188 ) -> Result<Option<Response<T>>, anyhow::Error> {
2189 if self.maintenance_scheduled {
2191 self.maintain();
2192 self.maintenance_scheduled = false;
2193 }
2194
2195 for instance in self.instances.values_mut() {
2196 instance.rehydrate_failed_replicas();
2197 }
2198
2199 let mut status_updates = vec![];
2200 let mut updated_frontiers = BTreeMap::new();
2201
2202 let stashed_responses = std::mem::take(&mut self.stashed_responses);
2204 for resp in stashed_responses {
2205 match resp {
2206 (_replica_id, StorageResponse::FrontierUpper(id, upper)) => {
2207 self.update_write_frontier(id, &upper);
2208 updated_frontiers.insert(id, upper);
2209 }
2210 (replica_id, StorageResponse::DroppedId(id)) => {
2211 let replica_id = replica_id.expect("DroppedId from unknown replica");
2212 if let Some(remaining_replicas) = self.dropped_objects.get_mut(&id) {
2213 remaining_replicas.remove(&replica_id);
2214 if remaining_replicas.is_empty() {
2215 self.dropped_objects.remove(&id);
2216 }
2217 } else {
2218 soft_panic_or_log!("unexpected DroppedId for {id}");
2219 }
2220 }
2221 (replica_id, StorageResponse::StatisticsUpdates(source_stats, sink_stats)) => {
2222 {
2224 let replica_id = if let Some(replica_id) = replica_id {
2231 replica_id
2232 } else {
2233 tracing::error!(
2234 ?source_stats,
2235 "missing replica_id for source statistics update"
2236 );
2237 continue;
2238 };
2239
2240 let mut shared_stats = self.source_statistics.lock().expect("poisoned");
2241
2242 for stat in source_stats {
2243 let collection_id = stat.id.clone();
2244
2245 if self.collection(collection_id).is_err() {
2246 continue;
2249 }
2250
2251 let entry = shared_stats
2252 .source_statistics
2253 .entry((stat.id, Some(replica_id)));
2254
2255 match entry {
2256 btree_map::Entry::Vacant(vacant_entry) => {
2257 let mut stats = ControllerSourceStatistics::new(
2258 collection_id,
2259 Some(replica_id),
2260 );
2261 stats.incorporate(stat);
2262 vacant_entry.insert(stats);
2263 }
2264 btree_map::Entry::Occupied(mut occupied_entry) => {
2265 occupied_entry.get_mut().incorporate(stat);
2266 }
2267 }
2268 }
2269 }
2270
2271 {
2272 let replica_id = if let Some(replica_id) = replica_id {
2283 replica_id
2284 } else {
2285 tracing::error!(
2286 ?sink_stats,
2287 "missing replica_id for sink statistics update"
2288 );
2289 continue;
2290 };
2291
2292 let mut shared_stats = self.sink_statistics.lock().expect("poisoned");
2293
2294 for stat in sink_stats {
2295 let collection_id = stat.id.clone();
2296
2297 if self.collection(collection_id).is_err() {
2298 continue;
2301 }
2302
2303 let entry = shared_stats.entry((stat.id, Some(replica_id)));
2304
2305 match entry {
2306 btree_map::Entry::Vacant(vacant_entry) => {
2307 let mut stats =
2308 ControllerSinkStatistics::new(collection_id, replica_id);
2309 stats.incorporate(stat);
2310 vacant_entry.insert(stats);
2311 }
2312 btree_map::Entry::Occupied(mut occupied_entry) => {
2313 occupied_entry.get_mut().incorporate(stat);
2314 }
2315 }
2316 }
2317 }
2318 }
2319 (replica_id, StorageResponse::StatusUpdate(mut status_update)) => {
2320 match status_update.status {
2336 Status::Running => {
2337 let collection = self.collections.get_mut(&status_update.id);
2338 match collection {
2339 Some(collection) => {
2340 match collection.extra_state {
2341 CollectionStateExtra::Ingestion(
2342 ref mut ingestion_state,
2343 ) => {
2344 if ingestion_state.hydrated_on.is_empty() {
2345 tracing::debug!(ingestion_id = %status_update.id, "ingestion is hydrated");
2346 }
2347 ingestion_state.hydrated_on.insert(replica_id.expect(
2348 "replica id should be present for status running",
2349 ));
2350 }
2351 CollectionStateExtra::Export(_) => {
2352 }
2354 CollectionStateExtra::None => {
2355 }
2357 }
2358 }
2359 None => (), }
2362 }
2363 Status::Paused => {
2364 let collection = self.collections.get_mut(&status_update.id);
2365 match collection {
2366 Some(collection) => {
2367 match collection.extra_state {
2368 CollectionStateExtra::Ingestion(
2369 ref mut ingestion_state,
2370 ) => {
2371 tracing::debug!(ingestion_id = %status_update.id, "ingestion is now paused");
2378 ingestion_state.hydrated_on.clear();
2379 }
2380 CollectionStateExtra::Export(_) => {
2381 }
2383 CollectionStateExtra::None => {
2384 }
2386 }
2387 }
2388 None => (), }
2391 }
2392 _ => (),
2393 }
2394
2395 if let Some(id) = replica_id {
2397 status_update.replica_id = Some(id);
2398 }
2399 status_updates.push(status_update);
2400 }
2401 (_replica_id, StorageResponse::StagedBatches(batches)) => {
2402 for (ingestion_id, batches) in batches {
2403 match self.pending_oneshot_ingestions.remove(&ingestion_id) {
2404 Some(pending) => {
2405 if let Some(instance) = self.instances.get_mut(&pending.cluster_id)
2408 {
2409 instance
2410 .send(StorageCommand::CancelOneshotIngestion(ingestion_id));
2411 }
2412 (pending.result_tx)(batches)
2414 }
2415 None => {
2416 }
2419 }
2420 }
2421 }
2422 }
2423 }
2424
2425 self.record_status_updates(status_updates);
2426
2427 let mut dropped_table_ids = Vec::new();
2429 while let Ok(dropped_id) = self.pending_table_handle_drops_rx.try_recv() {
2430 dropped_table_ids.push(dropped_id);
2431 }
2432 if !dropped_table_ids.is_empty() {
2433 self.drop_sources(storage_metadata, dropped_table_ids)?;
2434 }
2435
2436 if updated_frontiers.is_empty() {
2437 Ok(None)
2438 } else {
2439 Ok(Some(Response::FrontierUpdates(
2440 updated_frontiers.into_iter().collect(),
2441 )))
2442 }
2443 }
2444
2445 async fn inspect_persist_state(
2446 &self,
2447 id: GlobalId,
2448 ) -> Result<serde_json::Value, anyhow::Error> {
2449 let collection = &self.storage_collections.collection_metadata(id)?;
2450 let client = self
2451 .persist
2452 .open(collection.persist_location.clone())
2453 .await?;
2454 let shard_state = client
2455 .inspect_shard::<Self::Timestamp>(&collection.data_shard)
2456 .await?;
2457 let json_state = serde_json::to_value(shard_state)?;
2458 Ok(json_state)
2459 }
2460
2461 fn append_introspection_updates(
2462 &mut self,
2463 type_: IntrospectionType,
2464 updates: Vec<(Row, Diff)>,
2465 ) {
2466 let id = self.introspection_ids[&type_];
2467 let updates = updates.into_iter().map(|update| update.into()).collect();
2468 self.collection_manager.blind_write(id, updates);
2469 }
2470
2471 fn append_status_introspection_updates(
2472 &mut self,
2473 type_: IntrospectionType,
2474 updates: Vec<StatusUpdate>,
2475 ) {
2476 let id = self.introspection_ids[&type_];
2477 let updates: Vec<_> = updates.into_iter().map(|update| update.into()).collect();
2478 if !updates.is_empty() {
2479 self.collection_manager.blind_write(id, updates);
2480 }
2481 }
2482
2483 fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp) {
2484 let id = self.introspection_ids[&type_];
2485 self.collection_manager.differential_write(id, op);
2486 }
2487
2488 fn append_only_introspection_tx(
2489 &self,
2490 type_: IntrospectionType,
2491 ) -> mpsc::UnboundedSender<(
2492 Vec<AppendOnlyUpdate>,
2493 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2494 )> {
2495 let id = self.introspection_ids[&type_];
2496 self.collection_manager.append_only_write_sender(id)
2497 }
2498
2499 fn differential_introspection_tx(
2500 &self,
2501 type_: IntrospectionType,
2502 ) -> mpsc::UnboundedSender<(
2503 StorageWriteOp,
2504 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2505 )> {
2506 let id = self.introspection_ids[&type_];
2507 self.collection_manager.differential_write_sender(id)
2508 }
2509
2510 async fn real_time_recent_timestamp(
2511 &self,
2512 timestamp_objects: BTreeSet<GlobalId>,
2513 timeout: Duration,
2514 ) -> Result<
2515 BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>,
2516 StorageError<Self::Timestamp>,
2517 > {
2518 use mz_storage_types::sources::GenericSourceConnection;
2519
2520 let mut rtr_futures = BTreeMap::new();
2521
2522 for id in timestamp_objects.into_iter().filter(GlobalId::is_user) {
2524 let collection = match self.collection(id) {
2525 Ok(c) => c,
2526 Err(_) => continue,
2528 };
2529
2530 let (source_conn, remap_id) = match &collection.data_source {
2531 DataSource::Ingestion(IngestionDescription {
2532 desc: SourceDesc { connection, .. },
2533 remap_collection_id,
2534 ..
2535 }) => match connection {
2536 GenericSourceConnection::Kafka(_)
2537 | GenericSourceConnection::Postgres(_)
2538 | GenericSourceConnection::MySql(_)
2539 | GenericSourceConnection::SqlServer(_) => {
2540 (connection.clone(), *remap_collection_id)
2541 }
2542
2543 GenericSourceConnection::LoadGenerator(_) => continue,
2548 },
2549 _ => {
2551 continue;
2552 }
2553 };
2554
2555 let config = self.config().clone();
2557
2558 let read_handle = self.read_handle_for_snapshot(remap_id).await?;
2566
2567 let remap_read_hold = self
2570 .storage_collections
2571 .acquire_read_holds(vec![remap_id])
2572 .map_err(|_e| StorageError::ReadBeforeSince(remap_id))?
2573 .expect_element(|| "known to be exactly one");
2574
2575 let remap_as_of = remap_read_hold
2576 .since()
2577 .to_owned()
2578 .into_option()
2579 .ok_or(StorageError::ReadBeforeSince(remap_id))?;
2580
2581 rtr_futures.insert(
2582 id,
2583 tokio::time::timeout(timeout, async move {
2584 use mz_storage_types::sources::SourceConnection as _;
2585
2586 let as_of = Antichain::from_elem(remap_as_of);
2589 let remap_subscribe = read_handle
2590 .subscribe(as_of.clone())
2591 .await
2592 .map_err(|_| StorageError::ReadBeforeSince(remap_id))?;
2593
2594 tracing::debug!(?id, type_ = source_conn.name(), upstream = ?source_conn.external_reference(), "fetching real time recency");
2595
2596 let result = rtr::real_time_recency_ts(
2597 source_conn,
2598 id,
2599 config,
2600 as_of,
2601 remap_subscribe,
2602 )
2603 .await
2604 .map_err(|e| {
2605 tracing::debug!(?id, "real time recency error: {:?}", e);
2606 e
2607 });
2608
2609 drop(remap_read_hold);
2611
2612 result
2613 }),
2614 );
2615 }
2616
2617 Ok(Box::pin(async move {
2618 let (ids, futs): (Vec<_>, Vec<_>) = rtr_futures.into_iter().unzip();
2619 ids.into_iter()
2620 .zip_eq(futures::future::join_all(futs).await)
2621 .try_fold(T::minimum(), |curr, (id, per_source_res)| {
2622 let new =
2623 per_source_res.map_err(|_e: Elapsed| StorageError::RtrTimeout(id))??;
2624 Ok::<_, StorageError<Self::Timestamp>>(std::cmp::max(curr, new))
2625 })
2626 }))
2627 }
2628
2629 fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2630 let Self {
2632 build_info: _,
2633 now: _,
2634 read_only,
2635 collections,
2636 dropped_objects,
2637 persist_table_worker: _,
2638 txns_read: _,
2639 txns_metrics: _,
2640 stashed_responses,
2641 pending_table_handle_drops_tx: _,
2642 pending_table_handle_drops_rx: _,
2643 pending_oneshot_ingestions,
2644 collection_manager: _,
2645 introspection_ids,
2646 introspection_tokens: _,
2647 source_statistics: _,
2648 sink_statistics: _,
2649 statistics_interval_sender: _,
2650 instances,
2651 initialized,
2652 config,
2653 persist_location,
2654 persist: _,
2655 metrics: _,
2656 recorded_frontiers,
2657 recorded_replica_frontiers,
2658 wallclock_lag: _,
2659 wallclock_lag_last_recorded,
2660 storage_collections: _,
2661 migrated_storage_collections,
2662 maintenance_ticker: _,
2663 maintenance_scheduled,
2664 instance_response_tx: _,
2665 instance_response_rx: _,
2666 persist_warm_task: _,
2667 } = self;
2668
2669 let collections: BTreeMap<_, _> = collections
2670 .iter()
2671 .map(|(id, c)| (id.to_string(), format!("{c:?}")))
2672 .collect();
2673 let dropped_objects: BTreeMap<_, _> = dropped_objects
2674 .iter()
2675 .map(|(id, rs)| (id.to_string(), format!("{rs:?}")))
2676 .collect();
2677 let stashed_responses: Vec<_> =
2678 stashed_responses.iter().map(|r| format!("{r:?}")).collect();
2679 let pending_oneshot_ingestions: BTreeMap<_, _> = pending_oneshot_ingestions
2680 .iter()
2681 .map(|(uuid, i)| (uuid.to_string(), format!("{i:?}")))
2682 .collect();
2683 let introspection_ids: BTreeMap<_, _> = introspection_ids
2684 .iter()
2685 .map(|(typ, id)| (format!("{typ:?}"), id.to_string()))
2686 .collect();
2687 let instances: BTreeMap<_, _> = instances
2688 .iter()
2689 .map(|(id, i)| (id.to_string(), format!("{i:?}")))
2690 .collect();
2691 let recorded_frontiers: BTreeMap<_, _> = recorded_frontiers
2692 .iter()
2693 .map(|(id, fs)| (id.to_string(), format!("{fs:?}")))
2694 .collect();
2695 let recorded_replica_frontiers: Vec<_> = recorded_replica_frontiers
2696 .iter()
2697 .map(|((gid, rid), f)| (gid.to_string(), rid.to_string(), format!("{f:?}")))
2698 .collect();
2699 let migrated_storage_collections: Vec<_> = migrated_storage_collections
2700 .iter()
2701 .map(|id| id.to_string())
2702 .collect();
2703
2704 Ok(serde_json::json!({
2705 "read_only": read_only,
2706 "collections": collections,
2707 "dropped_objects": dropped_objects,
2708 "stashed_responses": stashed_responses,
2709 "pending_oneshot_ingestions": pending_oneshot_ingestions,
2710 "introspection_ids": introspection_ids,
2711 "instances": instances,
2712 "initialized": initialized,
2713 "config": format!("{config:?}"),
2714 "persist_location": format!("{persist_location:?}"),
2715 "recorded_frontiers": recorded_frontiers,
2716 "recorded_replica_frontiers": recorded_replica_frontiers,
2717 "wallclock_lag_last_recorded": format!("{wallclock_lag_last_recorded:?}"),
2718 "migrated_storage_collections": migrated_storage_collections,
2719 "maintenance_scheduled": maintenance_scheduled,
2720 }))
2721 }
2722}
2723
2724pub fn prepare_initialization<T>(txn: &mut dyn StorageTxn<T>) -> Result<(), StorageError<T>> {
2731 if txn.get_txn_wal_shard().is_none() {
2732 let txns_id = ShardId::new();
2733 txn.write_txn_wal_shard(txns_id)?;
2734 }
2735
2736 Ok(())
2737}
2738
2739impl<T> Controller<T>
2740where
2741 T: Timestamp
2742 + Lattice
2743 + TotalOrder
2744 + Codec64
2745 + From<EpochMillis>
2746 + TimestampManipulation
2747 + Into<Datum<'static>>,
2748 Self: StorageController<Timestamp = T>,
2749{
2750 pub async fn new(
2758 build_info: &'static BuildInfo,
2759 persist_location: PersistLocation,
2760 persist_clients: Arc<PersistClientCache>,
2761 now: NowFn,
2762 wallclock_lag: WallclockLagFn<T>,
2763 txns_metrics: Arc<TxnMetrics>,
2764 read_only: bool,
2765 metrics_registry: &MetricsRegistry,
2766 controller_metrics: ControllerMetrics,
2767 connection_context: ConnectionContext,
2768 txn: &dyn StorageTxn<T>,
2769 storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
2770 ) -> Self {
2771 let txns_client = persist_clients
2772 .open(persist_location.clone())
2773 .await
2774 .expect("location should be valid");
2775
2776 let persist_warm_task = warm_persist_state_in_background(
2777 txns_client.clone(),
2778 txn.get_collection_metadata().into_values(),
2779 );
2780 let persist_warm_task = Some(persist_warm_task.abort_on_drop());
2781
2782 let txns_id = txn
2786 .get_txn_wal_shard()
2787 .expect("must call prepare initialization before creating storage controller");
2788
2789 let persist_table_worker = if read_only {
2790 let txns_write = txns_client
2791 .open_writer(
2792 txns_id,
2793 Arc::new(TxnsCodecRow::desc()),
2794 Arc::new(UnitSchema),
2795 Diagnostics {
2796 shard_name: "txns".to_owned(),
2797 handle_purpose: "follow txns upper".to_owned(),
2798 },
2799 )
2800 .await
2801 .expect("txns schema shouldn't change");
2802 persist_handles::PersistTableWriteWorker::new_read_only_mode(txns_write)
2803 } else {
2804 let mut txns = TxnsHandle::open(
2805 T::minimum(),
2806 txns_client.clone(),
2807 txns_client.dyncfgs().clone(),
2808 Arc::clone(&txns_metrics),
2809 txns_id,
2810 Opaque::encode(&PersistEpoch::default()),
2811 )
2812 .await;
2813 txns.upgrade_version().await;
2814 persist_handles::PersistTableWriteWorker::new_txns(txns)
2815 };
2816 let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
2817
2818 let collection_manager = collection_mgmt::CollectionManager::new(read_only, now.clone());
2819
2820 let introspection_ids = BTreeMap::new();
2821 let introspection_tokens = Arc::new(Mutex::new(BTreeMap::new()));
2822
2823 let (statistics_interval_sender, _) =
2824 channel(mz_storage_types::parameters::STATISTICS_INTERVAL_DEFAULT);
2825
2826 let (pending_table_handle_drops_tx, pending_table_handle_drops_rx) =
2827 tokio::sync::mpsc::unbounded_channel();
2828
2829 let mut maintenance_ticker = tokio::time::interval(Duration::from_secs(1));
2830 maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2831
2832 let (instance_response_tx, instance_response_rx) = mpsc::unbounded_channel();
2833
2834 let metrics = StorageControllerMetrics::new(metrics_registry, controller_metrics);
2835
2836 let now_dt = mz_ore::now::to_datetime(now());
2837
2838 Self {
2839 build_info,
2840 collections: BTreeMap::default(),
2841 dropped_objects: Default::default(),
2842 persist_table_worker,
2843 txns_read,
2844 txns_metrics,
2845 stashed_responses: vec![],
2846 pending_table_handle_drops_tx,
2847 pending_table_handle_drops_rx,
2848 pending_oneshot_ingestions: BTreeMap::default(),
2849 collection_manager,
2850 introspection_ids,
2851 introspection_tokens,
2852 now,
2853 read_only,
2854 source_statistics: Arc::new(Mutex::new(statistics::SourceStatistics {
2855 source_statistics: BTreeMap::new(),
2856 webhook_statistics: BTreeMap::new(),
2857 })),
2858 sink_statistics: Arc::new(Mutex::new(BTreeMap::new())),
2859 statistics_interval_sender,
2860 instances: BTreeMap::new(),
2861 initialized: false,
2862 config: StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs()),
2863 persist_location,
2864 persist: persist_clients,
2865 metrics,
2866 recorded_frontiers: BTreeMap::new(),
2867 recorded_replica_frontiers: BTreeMap::new(),
2868 wallclock_lag,
2869 wallclock_lag_last_recorded: now_dt,
2870 storage_collections,
2871 migrated_storage_collections: BTreeSet::new(),
2872 maintenance_ticker,
2873 maintenance_scheduled: false,
2874 instance_response_rx,
2875 instance_response_tx,
2876 persist_warm_task,
2877 }
2878 }
2879
2880 #[instrument(level = "debug")]
2888 fn set_hold_policies(&mut self, policies: Vec<(GlobalId, ReadPolicy<T>)>) {
2889 let mut read_capability_changes = BTreeMap::default();
2890
2891 for (id, policy) in policies.into_iter() {
2892 if let Some(collection) = self.collections.get_mut(&id) {
2893 let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state
2894 {
2895 CollectionStateExtra::Ingestion(ingestion) => (
2896 ingestion.write_frontier.borrow(),
2897 &mut ingestion.derived_since,
2898 &mut ingestion.hold_policy,
2899 ),
2900 CollectionStateExtra::None => {
2901 unreachable!("set_hold_policies is only called for ingestions");
2902 }
2903 CollectionStateExtra::Export(export) => (
2904 export.write_frontier.borrow(),
2905 &mut export.derived_since,
2906 &mut export.read_policy,
2907 ),
2908 };
2909
2910 let new_derived_since = policy.frontier(write_frontier);
2911 let mut update = swap_updates(derived_since, new_derived_since);
2912 if !update.is_empty() {
2913 read_capability_changes.insert(id, update);
2914 }
2915
2916 *hold_policy = policy;
2917 }
2918 }
2919
2920 if !read_capability_changes.is_empty() {
2921 self.update_hold_capabilities(&mut read_capability_changes);
2922 }
2923 }
2924
2925 #[instrument(level = "debug", fields(updates))]
2926 fn update_write_frontier(&mut self, id: GlobalId, new_upper: &Antichain<T>) {
2927 let mut read_capability_changes = BTreeMap::default();
2928
2929 if let Some(collection) = self.collections.get_mut(&id) {
2930 let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state {
2931 CollectionStateExtra::Ingestion(ingestion) => (
2932 &mut ingestion.write_frontier,
2933 &mut ingestion.derived_since,
2934 &ingestion.hold_policy,
2935 ),
2936 CollectionStateExtra::None => {
2937 if matches!(collection.data_source, DataSource::Progress) {
2938 } else {
2940 tracing::error!(
2941 ?collection,
2942 ?new_upper,
2943 "updated write frontier for collection which is not an ingestion"
2944 );
2945 }
2946 return;
2947 }
2948 CollectionStateExtra::Export(export) => (
2949 &mut export.write_frontier,
2950 &mut export.derived_since,
2951 &export.read_policy,
2952 ),
2953 };
2954
2955 if PartialOrder::less_than(write_frontier, new_upper) {
2956 write_frontier.clone_from(new_upper);
2957 }
2958
2959 let new_derived_since = hold_policy.frontier(write_frontier.borrow());
2960 let mut update = swap_updates(derived_since, new_derived_since);
2961 if !update.is_empty() {
2962 read_capability_changes.insert(id, update);
2963 }
2964 } else if self.dropped_objects.contains_key(&id) {
2965 } else {
2968 soft_panic_or_log!("spurious upper update for {id}: {new_upper:?}");
2969 }
2970
2971 if !read_capability_changes.is_empty() {
2972 self.update_hold_capabilities(&mut read_capability_changes);
2973 }
2974 }
2975
2976 #[instrument(level = "debug", fields(updates))]
2980 fn update_hold_capabilities(&mut self, updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>) {
2981 let mut collections_net = BTreeMap::new();
2983
2984 while let Some(key) = updates.keys().rev().next().cloned() {
2989 let mut update = updates.remove(&key).unwrap();
2990
2991 if key.is_user() {
2992 debug!(id = %key, ?update, "update_hold_capability");
2993 }
2994
2995 if let Some(collection) = self.collections.get_mut(&key) {
2996 match &mut collection.extra_state {
2997 CollectionStateExtra::Ingestion(ingestion) => {
2998 let changes = ingestion.read_capabilities.update_iter(update.drain());
2999 update.extend(changes);
3000
3001 let (changes, frontier, _cluster_id) =
3002 collections_net.entry(key).or_insert_with(|| {
3003 (
3004 <ChangeBatch<_>>::new(),
3005 Antichain::new(),
3006 ingestion.instance_id,
3007 )
3008 });
3009
3010 changes.extend(update.drain());
3011 *frontier = ingestion.read_capabilities.frontier().to_owned();
3012 }
3013 CollectionStateExtra::None => {
3014 soft_panic_or_log!(
3016 "trying to update holds for collection {collection:?} which is not \
3017 an ingestion: {update:?}"
3018 );
3019 continue;
3020 }
3021 CollectionStateExtra::Export(export) => {
3022 let changes = export.read_capabilities.update_iter(update.drain());
3023 update.extend(changes);
3024
3025 let (changes, frontier, _cluster_id) =
3026 collections_net.entry(key).or_insert_with(|| {
3027 (<ChangeBatch<_>>::new(), Antichain::new(), export.cluster_id)
3028 });
3029
3030 changes.extend(update.drain());
3031 *frontier = export.read_capabilities.frontier().to_owned();
3032 }
3033 }
3034 } else {
3035 tracing::warn!(id = ?key, ?update, "update_hold_capabilities for unknown object");
3037 }
3038 }
3039
3040 for (key, (mut changes, frontier, cluster_id)) in collections_net {
3043 if !changes.is_empty() {
3044 if key.is_user() {
3045 debug!(id = %key, ?frontier, "downgrading ingestion read holds!");
3046 }
3047
3048 let collection = self
3049 .collections
3050 .get_mut(&key)
3051 .expect("missing collection state");
3052
3053 let read_holds = match &mut collection.extra_state {
3054 CollectionStateExtra::Ingestion(ingestion) => {
3055 ingestion.dependency_read_holds.as_mut_slice()
3056 }
3057 CollectionStateExtra::Export(export) => export.read_holds.as_mut_slice(),
3058 CollectionStateExtra::None => {
3059 soft_panic_or_log!(
3060 "trying to downgrade read holds for collection which is not an \
3061 ingestion: {collection:?}"
3062 );
3063 continue;
3064 }
3065 };
3066
3067 for read_hold in read_holds.iter_mut() {
3068 read_hold
3069 .try_downgrade(frontier.clone())
3070 .expect("we only advance the frontier");
3071 }
3072
3073 if let Some(instance) = self.instances.get_mut(&cluster_id) {
3075 instance.send(StorageCommand::AllowCompaction(key, frontier.clone()));
3076 } else {
3077 soft_panic_or_log!(
3078 "missing instance client for cluster {cluster_id} while we still have outstanding AllowCompaction command {frontier:?} for {key}"
3079 );
3080 }
3081 }
3082 }
3083 }
3084
3085 fn validate_collection_ids(
3087 &self,
3088 ids: impl Iterator<Item = GlobalId>,
3089 ) -> Result<(), StorageError<T>> {
3090 for id in ids {
3091 self.storage_collections.check_exists(id)?;
3092 }
3093 Ok(())
3094 }
3095
3096 fn validate_export_ids(
3098 &self,
3099 ids: impl Iterator<Item = GlobalId>,
3100 ) -> Result<(), StorageError<T>> {
3101 for id in ids {
3102 self.export(id)?;
3103 }
3104 Ok(())
3105 }
3106
3107 async fn open_data_handles(
3115 &self,
3116 id: &GlobalId,
3117 shard: ShardId,
3118 relation_desc: RelationDesc,
3119 persist_client: &PersistClient,
3120 ) -> WriteHandle<SourceData, (), T, StorageDiff> {
3121 let diagnostics = Diagnostics {
3122 shard_name: id.to_string(),
3123 handle_purpose: format!("controller data for {}", id),
3124 };
3125
3126 let mut write = persist_client
3127 .open_writer(
3128 shard,
3129 Arc::new(relation_desc),
3130 Arc::new(UnitSchema),
3131 diagnostics.clone(),
3132 )
3133 .await
3134 .expect("invalid persist usage");
3135
3136 write.fetch_recent_upper().await;
3145
3146 write
3147 }
3148
3149 fn register_introspection_collection(
3154 &mut self,
3155 id: GlobalId,
3156 introspection_type: IntrospectionType,
3157 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
3158 persist_client: PersistClient,
3159 ) -> Result<(), StorageError<T>> {
3160 tracing::info!(%id, ?introspection_type, "registering introspection collection");
3161
3162 let force_writable = self.read_only && self.migrated_storage_collections.contains(&id);
3166 if force_writable {
3167 assert!(id.is_system(), "unexpected non-system global id: {id:?}");
3168 info!("writing to migrated storage collection {id} in read-only mode");
3169 }
3170
3171 let prev = self.introspection_ids.insert(introspection_type, id);
3172 assert!(
3173 prev.is_none(),
3174 "cannot have multiple IDs for introspection type"
3175 );
3176
3177 let metadata = self.storage_collections.collection_metadata(id)?.clone();
3178
3179 let read_handle_fn = move || {
3180 let persist_client = persist_client.clone();
3181 let metadata = metadata.clone();
3182
3183 let fut = async move {
3184 let read_handle = persist_client
3185 .open_leased_reader::<SourceData, (), T, StorageDiff>(
3186 metadata.data_shard,
3187 Arc::new(metadata.relation_desc.clone()),
3188 Arc::new(UnitSchema),
3189 Diagnostics {
3190 shard_name: id.to_string(),
3191 handle_purpose: format!("snapshot {}", id),
3192 },
3193 USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3194 )
3195 .await
3196 .expect("invalid persist usage");
3197 read_handle
3198 };
3199
3200 fut.boxed()
3201 };
3202
3203 let recent_upper = write_handle.shared_upper();
3204
3205 match CollectionManagerKind::from(&introspection_type) {
3206 CollectionManagerKind::Differential => {
3211 let statistics_retention_duration =
3212 dyncfgs::STATISTICS_RETENTION_DURATION.get(self.config().config_set());
3213
3214 let introspection_config = DifferentialIntrospectionConfig {
3216 recent_upper,
3217 introspection_type,
3218 storage_collections: Arc::clone(&self.storage_collections),
3219 collection_manager: self.collection_manager.clone(),
3220 source_statistics: Arc::clone(&self.source_statistics),
3221 sink_statistics: Arc::clone(&self.sink_statistics),
3222 statistics_interval: self.config.parameters.statistics_interval.clone(),
3223 statistics_interval_receiver: self.statistics_interval_sender.subscribe(),
3224 statistics_retention_duration,
3225 metrics: self.metrics.clone(),
3226 introspection_tokens: Arc::clone(&self.introspection_tokens),
3227 };
3228 self.collection_manager.register_differential_collection(
3229 id,
3230 write_handle,
3231 read_handle_fn,
3232 force_writable,
3233 introspection_config,
3234 );
3235 }
3236 CollectionManagerKind::AppendOnly => {
3244 let introspection_config = AppendOnlyIntrospectionConfig {
3245 introspection_type,
3246 config_set: Arc::clone(self.config.config_set()),
3247 parameters: self.config.parameters.clone(),
3248 storage_collections: Arc::clone(&self.storage_collections),
3249 };
3250 self.collection_manager.register_append_only_collection(
3251 id,
3252 write_handle,
3253 force_writable,
3254 Some(introspection_config),
3255 );
3256 }
3257 }
3258
3259 Ok(())
3260 }
3261
3262 fn reconcile_dangling_statistics(&self) {
3265 self.source_statistics
3266 .lock()
3267 .expect("poisoned")
3268 .source_statistics
3269 .retain(|(k, _replica_id), _| self.storage_collections.check_exists(*k).is_ok());
3271 self.sink_statistics
3272 .lock()
3273 .expect("poisoned")
3274 .retain(|(k, _replica_id), _| self.export(*k).is_ok());
3275 }
3276
3277 #[instrument(level = "debug")]
3287 fn append_shard_mappings<I>(&self, global_ids: I, diff: Diff)
3288 where
3289 I: Iterator<Item = GlobalId>,
3290 {
3291 mz_ore::soft_assert_or_log!(
3292 diff == Diff::MINUS_ONE || diff == Diff::ONE,
3293 "use 1 for insert or -1 for delete"
3294 );
3295
3296 let id = *self
3297 .introspection_ids
3298 .get(&IntrospectionType::ShardMapping)
3299 .expect("should be registered before this call");
3300
3301 let mut updates = vec![];
3302 let mut row_buf = Row::default();
3304
3305 for global_id in global_ids {
3306 let shard_id = if let Some(collection) = self.collections.get(&global_id) {
3307 collection.collection_metadata.data_shard.clone()
3308 } else {
3309 panic!("unknown global id: {}", global_id);
3310 };
3311
3312 let mut packer = row_buf.packer();
3313 packer.push(Datum::from(global_id.to_string().as_str()));
3314 packer.push(Datum::from(shard_id.to_string().as_str()));
3315 updates.push((row_buf.clone(), diff));
3316 }
3317
3318 self.collection_manager.differential_append(id, updates);
3319 }
3320
3321 fn determine_collection_dependencies(
3323 &self,
3324 self_id: GlobalId,
3325 collection_desc: &CollectionDescription<T>,
3326 ) -> Result<Vec<GlobalId>, StorageError<T>> {
3327 let mut dependencies = Vec::new();
3328
3329 if let Some(id) = collection_desc.primary {
3330 dependencies.push(id);
3331 }
3332
3333 match &collection_desc.data_source {
3334 DataSource::Introspection(_)
3335 | DataSource::Webhook
3336 | DataSource::Table
3337 | DataSource::Progress
3338 | DataSource::Other => (),
3339 DataSource::IngestionExport { ingestion_id, .. } => {
3340 let source_collection = self.collection(*ingestion_id)?;
3343 let ingestion_remap_collection_id = match &source_collection.data_source {
3344 DataSource::Ingestion(ingestion) => ingestion.remap_collection_id,
3345 _ => unreachable!(
3346 "SourceExport must only refer to primary sources that already exist"
3347 ),
3348 };
3349
3350 dependencies.extend([self_id, ingestion_remap_collection_id]);
3356 }
3357 DataSource::Ingestion(ingestion) => {
3359 dependencies.push(self_id);
3364 if self_id != ingestion.remap_collection_id {
3365 dependencies.push(ingestion.remap_collection_id);
3366 }
3367 }
3368 DataSource::Sink { desc } => {
3369 dependencies.extend([self_id, desc.sink.from]);
3371 }
3372 };
3373
3374 Ok(dependencies)
3375 }
3376
3377 async fn read_handle_for_snapshot(
3378 &self,
3379 id: GlobalId,
3380 ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
3381 let metadata = self.storage_collections.collection_metadata(id)?;
3382 read_handle_for_snapshot(&self.persist, id, &metadata).await
3383 }
3384
3385 fn record_status_updates(&mut self, updates: Vec<StatusUpdate>) {
3388 if self.read_only {
3389 return;
3390 }
3391
3392 let mut sink_status_updates = vec![];
3393 let mut source_status_updates = vec![];
3394
3395 for update in updates {
3396 let id = update.id;
3397 if self.export(id).is_ok() {
3398 sink_status_updates.push(update);
3399 } else if self.storage_collections.check_exists(id).is_ok() {
3400 source_status_updates.push(update);
3401 }
3402 }
3403
3404 self.append_status_introspection_updates(
3405 IntrospectionType::SourceStatusHistory,
3406 source_status_updates,
3407 );
3408 self.append_status_introspection_updates(
3409 IntrospectionType::SinkStatusHistory,
3410 sink_status_updates,
3411 );
3412 }
3413
3414 fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, StorageError<T>> {
3415 self.collections
3416 .get(&id)
3417 .ok_or(StorageError::IdentifierMissing(id))
3418 }
3419
3420 fn run_ingestion(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3423 tracing::info!(%id, "starting ingestion");
3424
3425 let collection = self.collection(id)?;
3426 let ingestion_description = match &collection.data_source {
3427 DataSource::Ingestion(i) => i.clone(),
3428 _ => {
3429 tracing::warn!("run_ingestion called on non-ingestion ID {}", id);
3430 Err(StorageError::IdentifierInvalid(id))?
3431 }
3432 };
3433
3434 let mut source_exports = BTreeMap::new();
3436 for (export_id, export) in ingestion_description.source_exports.clone() {
3437 let export_storage_metadata = self.collection(export_id)?.collection_metadata.clone();
3438 source_exports.insert(
3439 export_id,
3440 SourceExport {
3441 storage_metadata: export_storage_metadata,
3442 details: export.details,
3443 data_config: export.data_config,
3444 },
3445 );
3446 }
3447
3448 let remap_collection = self.collection(ingestion_description.remap_collection_id)?;
3449
3450 let description = IngestionDescription::<CollectionMetadata> {
3451 source_exports,
3452 remap_metadata: remap_collection.collection_metadata.clone(),
3453 desc: ingestion_description.desc.clone(),
3455 instance_id: ingestion_description.instance_id,
3456 remap_collection_id: ingestion_description.remap_collection_id,
3457 };
3458
3459 let storage_instance_id = description.instance_id;
3460 let instance = self
3462 .instances
3463 .get_mut(&storage_instance_id)
3464 .ok_or_else(|| StorageError::IngestionInstanceMissing {
3465 storage_instance_id,
3466 ingestion_id: id,
3467 })?;
3468
3469 let augmented_ingestion = Box::new(RunIngestionCommand { id, description });
3470 instance.send(StorageCommand::RunIngestion(augmented_ingestion));
3471
3472 Ok(())
3473 }
3474
3475 fn run_export(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3478 let DataSource::Sink { desc: description } = &self.collections[&id].data_source else {
3479 return Err(StorageError::IdentifierMissing(id));
3480 };
3481
3482 let from_storage_metadata = self
3483 .storage_collections
3484 .collection_metadata(description.sink.from)?;
3485 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
3486
3487 let export_state = self.storage_collections.collection_frontiers(id)?;
3491 let mut as_of = description.sink.as_of.clone();
3492 as_of.join_assign(&export_state.implied_capability);
3493 let with_snapshot = description.sink.with_snapshot
3494 && !PartialOrder::less_than(&as_of, &export_state.write_frontier);
3495
3496 info!(
3497 sink_id = %id,
3498 from_id = %description.sink.from,
3499 write_frontier = ?export_state.write_frontier,
3500 ?as_of,
3501 ?with_snapshot,
3502 "run_export"
3503 );
3504
3505 let cmd = RunSinkCommand {
3506 id,
3507 description: StorageSinkDesc {
3508 from: description.sink.from,
3509 from_desc: description.sink.from_desc.clone(),
3510 connection: description.sink.connection.clone(),
3511 envelope: description.sink.envelope,
3512 as_of,
3513 version: description.sink.version,
3514 from_storage_metadata,
3515 with_snapshot,
3516 to_storage_metadata,
3517 commit_interval: description.sink.commit_interval,
3518 },
3519 };
3520
3521 let storage_instance_id = description.instance_id.clone();
3522
3523 let instance = self
3524 .instances
3525 .get_mut(&storage_instance_id)
3526 .ok_or_else(|| StorageError::ExportInstanceMissing {
3527 storage_instance_id,
3528 export_id: id,
3529 })?;
3530
3531 instance.send(StorageCommand::RunSink(Box::new(cmd)));
3532
3533 Ok(())
3534 }
3535
3536 fn update_frontier_introspection(&mut self) {
3541 let mut global_frontiers = BTreeMap::new();
3542 let mut replica_frontiers = BTreeMap::new();
3543
3544 for collection_frontiers in self.storage_collections.active_collection_frontiers() {
3545 let id = collection_frontiers.id;
3546 let since = collection_frontiers.read_capabilities;
3547 let upper = collection_frontiers.write_frontier;
3548
3549 let instance = self
3550 .collections
3551 .get(&id)
3552 .and_then(|collection_state| match &collection_state.extra_state {
3553 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
3554 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
3555 CollectionStateExtra::None => None,
3556 })
3557 .and_then(|i| self.instances.get(&i));
3558
3559 if let Some(instance) = instance {
3560 for replica_id in instance.replica_ids() {
3561 replica_frontiers.insert((id, replica_id), upper.clone());
3562 }
3563 }
3564
3565 global_frontiers.insert(id, (since, upper));
3566 }
3567
3568 let mut global_updates = Vec::new();
3569 let mut replica_updates = Vec::new();
3570
3571 let mut push_global_update =
3572 |id: GlobalId, (since, upper): (Antichain<T>, Antichain<T>), diff: Diff| {
3573 let read_frontier = since.into_option().map_or(Datum::Null, |t| t.into());
3574 let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3575 let row = Row::pack_slice(&[
3576 Datum::String(&id.to_string()),
3577 read_frontier,
3578 write_frontier,
3579 ]);
3580 global_updates.push((row, diff));
3581 };
3582
3583 let mut push_replica_update =
3584 |(id, replica_id): (GlobalId, ReplicaId), upper: Antichain<T>, diff: Diff| {
3585 let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3586 let row = Row::pack_slice(&[
3587 Datum::String(&id.to_string()),
3588 Datum::String(&replica_id.to_string()),
3589 write_frontier,
3590 ]);
3591 replica_updates.push((row, diff));
3592 };
3593
3594 let mut old_global_frontiers =
3595 std::mem::replace(&mut self.recorded_frontiers, global_frontiers);
3596 for (&id, new) in &self.recorded_frontiers {
3597 match old_global_frontiers.remove(&id) {
3598 Some(old) if &old != new => {
3599 push_global_update(id, new.clone(), Diff::ONE);
3600 push_global_update(id, old, Diff::MINUS_ONE);
3601 }
3602 Some(_) => (),
3603 None => push_global_update(id, new.clone(), Diff::ONE),
3604 }
3605 }
3606 for (id, old) in old_global_frontiers {
3607 push_global_update(id, old, Diff::MINUS_ONE);
3608 }
3609
3610 let mut old_replica_frontiers =
3611 std::mem::replace(&mut self.recorded_replica_frontiers, replica_frontiers);
3612 for (&key, new) in &self.recorded_replica_frontiers {
3613 match old_replica_frontiers.remove(&key) {
3614 Some(old) if &old != new => {
3615 push_replica_update(key, new.clone(), Diff::ONE);
3616 push_replica_update(key, old, Diff::MINUS_ONE);
3617 }
3618 Some(_) => (),
3619 None => push_replica_update(key, new.clone(), Diff::ONE),
3620 }
3621 }
3622 for (key, old) in old_replica_frontiers {
3623 push_replica_update(key, old, Diff::MINUS_ONE);
3624 }
3625
3626 let id = self.introspection_ids[&IntrospectionType::Frontiers];
3627 self.collection_manager
3628 .differential_append(id, global_updates);
3629
3630 let id = self.introspection_ids[&IntrospectionType::ReplicaFrontiers];
3631 self.collection_manager
3632 .differential_append(id, replica_updates);
3633 }
3634
3635 fn refresh_wallclock_lag(&mut self) {
3654 let now_ms = (self.now)();
3655 let histogram_period =
3656 WallclockLagHistogramPeriod::from_epoch_millis(now_ms, self.config.config_set());
3657
3658 let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
3659 Some(ts) => (self.wallclock_lag)(ts.clone()),
3660 None => Duration::ZERO,
3661 };
3662
3663 for frontiers in self.storage_collections.active_collection_frontiers() {
3664 let id = frontiers.id;
3665 let Some(collection) = self.collections.get_mut(&id) else {
3666 continue;
3667 };
3668
3669 let collection_unreadable =
3670 PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
3671 let lag = if collection_unreadable {
3672 WallclockLag::Undefined
3673 } else {
3674 let lag = frontier_lag(&frontiers.write_frontier);
3675 WallclockLag::Seconds(lag.as_secs())
3676 };
3677
3678 collection.wallclock_lag_max = collection.wallclock_lag_max.max(lag);
3679
3680 let secs = lag.unwrap_seconds_or(u64::MAX);
3683 collection.wallclock_lag_metrics.observe(secs);
3684
3685 if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
3686 let bucket = lag.map_seconds(|secs| secs.next_power_of_two());
3687
3688 let instance_id = match &collection.extra_state {
3689 CollectionStateExtra::Ingestion(i) => Some(i.instance_id),
3690 CollectionStateExtra::Export(e) => Some(e.cluster_id()),
3691 CollectionStateExtra::None => None,
3692 };
3693 let workload_class = instance_id
3694 .and_then(|id| self.instances.get(&id))
3695 .and_then(|i| i.workload_class.clone());
3696 let labels = match workload_class {
3697 Some(wc) => [("workload_class", wc.clone())].into(),
3698 None => BTreeMap::new(),
3699 };
3700
3701 let key = (histogram_period, bucket, labels);
3702 *stash.entry(key).or_default() += Diff::ONE;
3703 }
3704 }
3705
3706 self.maybe_record_wallclock_lag();
3708 }
3709
3710 fn maybe_record_wallclock_lag(&mut self) {
3718 if self.read_only {
3719 return;
3720 }
3721
3722 let duration_trunc = |datetime: DateTime<_>, interval| {
3723 let td = TimeDelta::from_std(interval).ok()?;
3724 datetime.duration_trunc(td).ok()
3725 };
3726
3727 let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(self.config.config_set());
3728 let now_dt = mz_ore::now::to_datetime((self.now)());
3729 let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
3730 soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
3731 let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
3732 duration_trunc(now_dt, *default).unwrap()
3733 });
3734 if now_trunc <= self.wallclock_lag_last_recorded {
3735 return;
3736 }
3737
3738 let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
3739
3740 let mut history_updates = Vec::new();
3741 let mut histogram_updates = Vec::new();
3742 let mut row_buf = Row::default();
3743 for frontiers in self.storage_collections.active_collection_frontiers() {
3744 let id = frontiers.id;
3745 let Some(collection) = self.collections.get_mut(&id) else {
3746 continue;
3747 };
3748
3749 let max_lag = std::mem::replace(&mut collection.wallclock_lag_max, WallclockLag::MIN);
3750 let row = Row::pack_slice(&[
3751 Datum::String(&id.to_string()),
3752 Datum::Null,
3753 max_lag.into_interval_datum(),
3754 Datum::TimestampTz(now_ts),
3755 ]);
3756 history_updates.push((row, Diff::ONE));
3757
3758 let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
3759 continue;
3760 };
3761
3762 for ((period, lag, labels), count) in std::mem::take(stash) {
3763 let mut packer = row_buf.packer();
3764 packer.extend([
3765 Datum::TimestampTz(period.start),
3766 Datum::TimestampTz(period.end),
3767 Datum::String(&id.to_string()),
3768 lag.into_uint64_datum(),
3769 ]);
3770 let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
3771 packer.push_dict(labels);
3772
3773 histogram_updates.push((row_buf.clone(), count));
3774 }
3775 }
3776
3777 if !history_updates.is_empty() {
3778 self.append_introspection_updates(
3779 IntrospectionType::WallclockLagHistory,
3780 history_updates,
3781 );
3782 }
3783 if !histogram_updates.is_empty() {
3784 self.append_introspection_updates(
3785 IntrospectionType::WallclockLagHistogram,
3786 histogram_updates,
3787 );
3788 }
3789
3790 self.wallclock_lag_last_recorded = now_trunc;
3791 }
3792
3793 fn maintain(&mut self) {
3798 self.update_frontier_introspection();
3799 self.refresh_wallclock_lag();
3800
3801 for instance in self.instances.values_mut() {
3803 instance.refresh_state_metrics();
3804 }
3805 }
3806}
3807
3808impl From<&IntrospectionType> for CollectionManagerKind {
3809 fn from(value: &IntrospectionType) -> Self {
3810 match value {
3811 IntrospectionType::ShardMapping
3812 | IntrospectionType::Frontiers
3813 | IntrospectionType::ReplicaFrontiers
3814 | IntrospectionType::StorageSourceStatistics
3815 | IntrospectionType::StorageSinkStatistics
3816 | IntrospectionType::ComputeDependencies
3817 | IntrospectionType::ComputeOperatorHydrationStatus
3818 | IntrospectionType::ComputeMaterializedViewRefreshes
3819 | IntrospectionType::ComputeErrorCounts
3820 | IntrospectionType::ComputeHydrationTimes => CollectionManagerKind::Differential,
3821
3822 IntrospectionType::SourceStatusHistory
3823 | IntrospectionType::SinkStatusHistory
3824 | IntrospectionType::PrivatelinkConnectionStatusHistory
3825 | IntrospectionType::ReplicaStatusHistory
3826 | IntrospectionType::ReplicaMetricsHistory
3827 | IntrospectionType::WallclockLagHistory
3828 | IntrospectionType::WallclockLagHistogram
3829 | IntrospectionType::PreparedStatementHistory
3830 | IntrospectionType::StatementExecutionHistory
3831 | IntrospectionType::SessionHistory
3832 | IntrospectionType::StatementLifecycleHistory
3833 | IntrospectionType::SqlText => CollectionManagerKind::AppendOnly,
3834 }
3835 }
3836}
3837
3838async fn snapshot_statistics<T>(
3844 id: GlobalId,
3845 upper: Antichain<T>,
3846 storage_collections: &Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
3847) -> Vec<Row>
3848where
3849 T: Codec64 + From<EpochMillis> + TimestampManipulation,
3850{
3851 match upper.as_option() {
3852 Some(f) if f > &T::minimum() => {
3853 let as_of = f.step_back().unwrap();
3854
3855 let snapshot = storage_collections.snapshot(id, as_of).await.unwrap();
3856 snapshot
3857 .into_iter()
3858 .map(|(row, diff)| {
3859 assert_eq!(diff, 1);
3860 row
3861 })
3862 .collect()
3863 }
3864 _ => Vec::new(),
3867 }
3868}
3869
3870async fn read_handle_for_snapshot<T>(
3871 persist: &PersistClientCache,
3872 id: GlobalId,
3873 metadata: &CollectionMetadata,
3874) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>>
3875where
3876 T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
3877{
3878 let persist_client = persist
3879 .open(metadata.persist_location.clone())
3880 .await
3881 .unwrap();
3882
3883 let read_handle = persist_client
3888 .open_leased_reader::<SourceData, (), _, _>(
3889 metadata.data_shard,
3890 Arc::new(metadata.relation_desc.clone()),
3891 Arc::new(UnitSchema),
3892 Diagnostics {
3893 shard_name: id.to_string(),
3894 handle_purpose: format!("snapshot {}", id),
3895 },
3896 USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3897 )
3898 .await
3899 .expect("invalid persist usage");
3900 Ok(read_handle)
3901}
3902
3903#[derive(Debug)]
3905struct CollectionState<T: TimelyTimestamp> {
3906 pub data_source: DataSource<T>,
3908
3909 pub collection_metadata: CollectionMetadata,
3910
3911 pub extra_state: CollectionStateExtra<T>,
3912
3913 wallclock_lag_max: WallclockLag,
3915 wallclock_lag_histogram_stash: Option<
3922 BTreeMap<
3923 (
3924 WallclockLagHistogramPeriod,
3925 WallclockLag,
3926 BTreeMap<&'static str, String>,
3927 ),
3928 Diff,
3929 >,
3930 >,
3931 wallclock_lag_metrics: WallclockLagMetrics,
3933}
3934
3935impl<T: TimelyTimestamp> CollectionState<T> {
3936 fn new(
3937 data_source: DataSource<T>,
3938 collection_metadata: CollectionMetadata,
3939 extra_state: CollectionStateExtra<T>,
3940 wallclock_lag_metrics: WallclockLagMetrics,
3941 ) -> Self {
3942 let wallclock_lag_histogram_stash = match &data_source {
3946 DataSource::Other => None,
3947 _ => Some(Default::default()),
3948 };
3949
3950 Self {
3951 data_source,
3952 collection_metadata,
3953 extra_state,
3954 wallclock_lag_max: WallclockLag::MIN,
3955 wallclock_lag_histogram_stash,
3956 wallclock_lag_metrics,
3957 }
3958 }
3959}
3960
3961#[derive(Debug)]
3963enum CollectionStateExtra<T: TimelyTimestamp> {
3964 Ingestion(IngestionState<T>),
3965 Export(ExportState<T>),
3966 None,
3967}
3968
3969#[derive(Debug)]
3971struct IngestionState<T: TimelyTimestamp> {
3972 pub read_capabilities: MutableAntichain<T>,
3974
3975 pub derived_since: Antichain<T>,
3978
3979 pub dependency_read_holds: Vec<ReadHold<T>>,
3981
3982 pub write_frontier: Antichain<T>,
3984
3985 pub hold_policy: ReadPolicy<T>,
3992
3993 pub instance_id: StorageInstanceId,
3995
3996 pub hydrated_on: BTreeSet<ReplicaId>,
3998}
3999
4000struct StatusHistoryDesc<K> {
4005 retention_policy: StatusHistoryRetentionPolicy,
4006 extract_key: Box<dyn Fn(&[Datum]) -> K + Send>,
4007 extract_time: Box<dyn Fn(&[Datum]) -> CheckedTimestamp<DateTime<Utc>> + Send>,
4008}
4009enum StatusHistoryRetentionPolicy {
4010 LastN(usize),
4012 TimeWindow(Duration),
4014}
4015
4016fn source_status_history_desc(
4017 params: &StorageParameters,
4018) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
4019 let desc = &MZ_SOURCE_STATUS_HISTORY_DESC;
4020 let (source_id_idx, _) = desc.get_by_name(&"source_id".into()).expect("exists");
4021 let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
4022 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4023
4024 StatusHistoryDesc {
4025 retention_policy: StatusHistoryRetentionPolicy::LastN(
4026 params.keep_n_source_status_history_entries,
4027 ),
4028 extract_key: Box::new(move |datums| {
4029 (
4030 GlobalId::from_str(datums[source_id_idx].unwrap_str()).expect("GlobalId column"),
4031 if datums[replica_id_idx].is_null() {
4032 None
4033 } else {
4034 Some(
4035 ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
4036 .expect("ReplicaId column"),
4037 )
4038 },
4039 )
4040 }),
4041 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4042 }
4043}
4044
4045fn sink_status_history_desc(
4046 params: &StorageParameters,
4047) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
4048 let desc = &MZ_SINK_STATUS_HISTORY_DESC;
4049 let (sink_id_idx, _) = desc.get_by_name(&"sink_id".into()).expect("exists");
4050 let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
4051 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4052
4053 StatusHistoryDesc {
4054 retention_policy: StatusHistoryRetentionPolicy::LastN(
4055 params.keep_n_sink_status_history_entries,
4056 ),
4057 extract_key: Box::new(move |datums| {
4058 (
4059 GlobalId::from_str(datums[sink_id_idx].unwrap_str()).expect("GlobalId column"),
4060 if datums[replica_id_idx].is_null() {
4061 None
4062 } else {
4063 Some(
4064 ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
4065 .expect("ReplicaId column"),
4066 )
4067 },
4068 )
4069 }),
4070 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4071 }
4072}
4073
4074fn privatelink_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<GlobalId> {
4075 let desc = &MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC;
4076 let (key_idx, _) = desc.get_by_name(&"connection_id".into()).expect("exists");
4077 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4078
4079 StatusHistoryDesc {
4080 retention_policy: StatusHistoryRetentionPolicy::LastN(
4081 params.keep_n_privatelink_status_history_entries,
4082 ),
4083 extract_key: Box::new(move |datums| {
4084 GlobalId::from_str(datums[key_idx].unwrap_str()).expect("GlobalId column")
4085 }),
4086 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4087 }
4088}
4089
4090fn replica_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<(GlobalId, u64)> {
4091 let desc = &REPLICA_STATUS_HISTORY_DESC;
4092 let (replica_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
4093 let (process_idx, _) = desc.get_by_name(&"process_id".into()).expect("exists");
4094 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4095
4096 StatusHistoryDesc {
4097 retention_policy: StatusHistoryRetentionPolicy::TimeWindow(
4098 params.replica_status_history_retention_window,
4099 ),
4100 extract_key: Box::new(move |datums| {
4101 (
4102 GlobalId::from_str(datums[replica_idx].unwrap_str()).expect("GlobalId column"),
4103 datums[process_idx].unwrap_uint64(),
4104 )
4105 }),
4106 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4107 }
4108}
4109
4110fn swap_updates<T: Timestamp>(
4112 from: &mut Antichain<T>,
4113 mut replace_with: Antichain<T>,
4114) -> ChangeBatch<T> {
4115 let mut update = ChangeBatch::new();
4116 if PartialOrder::less_equal(from, &replace_with) {
4117 update.extend(replace_with.iter().map(|time| (time.clone(), 1)));
4118 std::mem::swap(from, &mut replace_with);
4119 update.extend(replace_with.iter().map(|time| (time.clone(), -1)));
4120 }
4121 update
4122}