1use std::any::Any;
13use std::collections::btree_map;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt::{Debug, Display};
16use std::str::FromStr;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use crate::collection_mgmt::{
21 AppendOnlyIntrospectionConfig, CollectionManagerKind, DifferentialIntrospectionConfig,
22};
23use crate::instance::{Instance, ReplicaConfig};
24use async_trait::async_trait;
25use chrono::{DateTime, DurationRound, TimeDelta, Utc};
26use derivative::Derivative;
27use differential_dataflow::lattice::Lattice;
28use futures::FutureExt;
29use futures::StreamExt;
30use itertools::Itertools;
31use mz_build_info::BuildInfo;
32use mz_cluster_client::client::ClusterReplicaLocation;
33use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
34use mz_cluster_client::{ReplicaId, WallclockLagFn};
35use mz_controller_types::dyncfgs::{
36 ENABLE_0DT_DEPLOYMENT_SOURCES, WALLCLOCK_LAG_RECORDING_INTERVAL,
37};
38use mz_ore::collections::CollectionExt;
39use mz_ore::metrics::MetricsRegistry;
40use mz_ore::now::{EpochMillis, NowFn};
41use mz_ore::task::AbortOnDropHandle;
42use mz_ore::{assert_none, halt, instrument, soft_panic_or_log};
43use mz_persist_client::batch::ProtoBatch;
44use mz_persist_client::cache::PersistClientCache;
45use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
46use mz_persist_client::critical::Opaque;
47use mz_persist_client::read::ReadHandle;
48use mz_persist_client::schema::CaESchema;
49use mz_persist_client::write::WriteHandle;
50use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
51use mz_persist_types::Codec64;
52use mz_persist_types::codec_impls::UnitSchema;
53use mz_repr::adt::timestamp::CheckedTimestamp;
54use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
55use mz_storage_client::client::{
56 AppendOnlyUpdate, RunIngestionCommand, RunOneshotIngestion, RunSinkCommand, Status,
57 StatusUpdate, StorageCommand, StorageResponse, TableData,
58};
59use mz_storage_client::controller::{
60 BoxFuture, CollectionDescription, DataSource, ExportDescription, ExportState,
61 IntrospectionType, MonotonicAppender, PersistEpoch, Response, StorageController,
62 StorageMetadata, StorageTxn, StorageWriteOp, WallclockLag, WallclockLagHistogramPeriod,
63};
64use mz_storage_client::healthcheck::{
65 MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC,
66 MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_STATUS_HISTORY_DESC,
67};
68use mz_storage_client::metrics::StorageControllerMetrics;
69use mz_storage_client::statistics::{
70 ControllerSinkStatistics, ControllerSourceStatistics, WebhookStatistics,
71};
72use mz_storage_client::storage_collections::StorageCollections;
73use mz_storage_types::configuration::StorageConfiguration;
74use mz_storage_types::connections::ConnectionContext;
75use mz_storage_types::connections::inline::InlinedConnection;
76use mz_storage_types::controller::{AlterError, CollectionMetadata, StorageError, TxnsCodecRow};
77use mz_storage_types::errors::CollectionMissing;
78use mz_storage_types::instances::StorageInstanceId;
79use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
80use mz_storage_types::parameters::StorageParameters;
81use mz_storage_types::read_holds::ReadHold;
82use mz_storage_types::read_policy::ReadPolicy;
83use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
84use mz_storage_types::sources::{
85 GenericSourceConnection, IngestionDescription, SourceConnection, SourceData, SourceDesc,
86 SourceExport, SourceExportDataConfig,
87};
88use mz_storage_types::{AlterCompatible, StorageDiff, dyncfgs};
89use mz_txn_wal::metrics::Metrics as TxnMetrics;
90use mz_txn_wal::txn_read::TxnsRead;
91use mz_txn_wal::txns::TxnsHandle;
92use timely::order::{PartialOrder, TotalOrder};
93use timely::progress::Timestamp as TimelyTimestamp;
94use timely::progress::frontier::MutableAntichain;
95use timely::progress::{Antichain, ChangeBatch, Timestamp};
96use tokio::sync::watch::{Sender, channel};
97use tokio::sync::{mpsc, oneshot};
98use tokio::time::MissedTickBehavior;
99use tokio::time::error::Elapsed;
100use tracing::{debug, info, warn};
101
102mod collection_mgmt;
103mod history;
104mod instance;
105mod persist_handles;
106mod rtr;
107mod statistics;
108
109#[derive(Derivative)]
110#[derivative(Debug)]
111struct PendingOneshotIngestion {
112 #[derivative(Debug = "ignore")]
114 result_tx: OneshotResultCallback<ProtoBatch>,
115 cluster_id: StorageInstanceId,
117}
118
119impl PendingOneshotIngestion {
120 pub(crate) fn cancel(self) {
124 (self.result_tx)(vec![Err("canceled".to_string())])
125 }
126}
127
128#[derive(Derivative)]
130#[derivative(Debug)]
131pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation>
132{
133 build_info: &'static BuildInfo,
135 now: NowFn,
137
138 read_only: bool,
144
145 pub(crate) collections: BTreeMap<GlobalId, CollectionState<T>>,
150
151 dropped_objects: BTreeMap<GlobalId, BTreeSet<ReplicaId>>,
160
161 pub(crate) persist_table_worker: persist_handles::PersistTableWriteWorker<T>,
163 txns_read: TxnsRead<T>,
165 txns_metrics: Arc<TxnMetrics>,
166 stashed_responses: Vec<(Option<ReplicaId>, StorageResponse<T>)>,
167 #[derivative(Debug = "ignore")]
169 pending_table_handle_drops_tx: mpsc::UnboundedSender<GlobalId>,
170 #[derivative(Debug = "ignore")]
172 pending_table_handle_drops_rx: mpsc::UnboundedReceiver<GlobalId>,
173 #[derivative(Debug = "ignore")]
175 pending_oneshot_ingestions: BTreeMap<uuid::Uuid, PendingOneshotIngestion>,
176
177 pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
179
180 pub(crate) introspection_ids: BTreeMap<IntrospectionType, GlobalId>,
182 introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
187
188 source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
193 sink_statistics: Arc<Mutex<BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSinkStatistics>>>,
196 statistics_interval_sender: Sender<Duration>,
198
199 instances: BTreeMap<StorageInstanceId, Instance<T>>,
201 initialized: bool,
203 config: StorageConfiguration,
205 persist_location: PersistLocation,
207 persist: Arc<PersistClientCache>,
209 metrics: StorageControllerMetrics,
211 recorded_frontiers: BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)>,
214 recorded_replica_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<T>>,
217
218 #[derivative(Debug = "ignore")]
220 wallclock_lag: WallclockLagFn<T>,
221 wallclock_lag_last_recorded: DateTime<Utc>,
223
224 storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
226 migrated_storage_collections: BTreeSet<GlobalId>,
228
229 maintenance_ticker: tokio::time::Interval,
231 maintenance_scheduled: bool,
233
234 instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
236 instance_response_rx: mpsc::UnboundedReceiver<(Option<ReplicaId>, StorageResponse<T>)>,
238
239 persist_warm_task: Option<AbortOnDropHandle<Box<dyn Debug + Send>>>,
241}
242
243fn warm_persist_state_in_background(
248 client: PersistClient,
249 shard_ids: impl Iterator<Item = ShardId> + Send + 'static,
250) -> mz_ore::task::JoinHandle<Box<dyn Debug + Send>> {
251 const MAX_CONCURRENT_WARMS: usize = 16;
253 let logic = async move {
254 let fetchers: Vec<_> = tokio_stream::iter(shard_ids)
255 .map(|shard_id| {
256 let client = client.clone();
257 async move {
258 client
259 .create_batch_fetcher::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
260 shard_id,
261 Arc::new(RelationDesc::empty()),
262 Arc::new(UnitSchema),
263 true,
264 Diagnostics::from_purpose("warm persist load state"),
265 )
266 .await
267 }
268 })
269 .buffer_unordered(MAX_CONCURRENT_WARMS)
270 .collect()
271 .await;
272 let fetchers: Box<dyn Debug + Send> = Box::new(fetchers);
273 fetchers
274 };
275 mz_ore::task::spawn(|| "warm_persist_load_state", logic)
276}
277
278#[async_trait(?Send)]
279impl<T> StorageController for Controller<T>
280where
281 T: Timestamp
282 + Lattice
283 + TotalOrder
284 + Codec64
285 + From<EpochMillis>
286 + TimestampManipulation
287 + Into<Datum<'static>>
288 + Display,
289{
290 type Timestamp = T;
291
292 fn initialization_complete(&mut self) {
293 self.reconcile_dangling_statistics();
294 self.initialized = true;
295
296 for instance in self.instances.values_mut() {
297 instance.send(StorageCommand::InitializationComplete);
298 }
299 }
300
301 fn update_parameters(&mut self, config_params: StorageParameters) {
302 self.storage_collections
303 .update_parameters(config_params.clone());
304
305 self.persist.cfg().apply_from(&config_params.dyncfg_updates);
308
309 for instance in self.instances.values_mut() {
310 let params = Box::new(config_params.clone());
311 instance.send(StorageCommand::UpdateConfiguration(params));
312 }
313 self.config.update(config_params);
314 self.statistics_interval_sender
315 .send_replace(self.config.parameters.statistics_interval);
316 self.collection_manager.update_user_batch_duration(
317 self.config
318 .parameters
319 .user_storage_managed_collections_batch_duration,
320 );
321 }
322
323 fn config(&self) -> &StorageConfiguration {
325 &self.config
326 }
327
328 fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing> {
329 self.storage_collections.collection_metadata(id)
330 }
331
332 fn collection_hydrated(
333 &self,
334 collection_id: GlobalId,
335 ) -> Result<bool, StorageError<Self::Timestamp>> {
336 let collection = self.collection(collection_id)?;
337
338 let instance_id = match &collection.data_source {
339 DataSource::Ingestion(ingestion_description) => ingestion_description.instance_id,
340 DataSource::IngestionExport { ingestion_id, .. } => {
341 let ingestion_state = self.collections.get(ingestion_id).expect("known to exist");
342
343 let instance_id = match &ingestion_state.data_source {
344 DataSource::Ingestion(ingestion_desc) => ingestion_desc.instance_id,
345 _ => unreachable!("SourceExport must only refer to primary source"),
346 };
347
348 instance_id
349 }
350 _ => return Ok(true),
351 };
352
353 let instance = self.instances.get(&instance_id).ok_or_else(|| {
354 StorageError::IngestionInstanceMissing {
355 storage_instance_id: instance_id,
356 ingestion_id: collection_id,
357 }
358 })?;
359
360 if instance.replica_ids().next().is_none() {
361 return Ok(true);
364 }
365
366 match &collection.extra_state {
367 CollectionStateExtra::Ingestion(ingestion_state) => {
368 Ok(ingestion_state.hydrated_on.len() >= 1)
370 }
371 CollectionStateExtra::Export(_) => {
372 Ok(true)
377 }
378 CollectionStateExtra::None => {
379 Ok(true)
383 }
384 }
385 }
386
387 #[mz_ore::instrument(level = "debug")]
388 fn collections_hydrated_on_replicas(
389 &self,
390 target_replica_ids: Option<Vec<ReplicaId>>,
391 target_cluster_id: &StorageInstanceId,
392 exclude_collections: &BTreeSet<GlobalId>,
393 ) -> Result<bool, StorageError<Self::Timestamp>> {
394 if target_replica_ids.as_ref().is_some_and(|v| v.is_empty()) {
397 return Ok(true);
398 }
399
400 let target_replicas: Option<BTreeSet<ReplicaId>> =
403 target_replica_ids.map(|ids| ids.into_iter().collect());
404
405 let mut all_hydrated = true;
406 for (collection_id, collection_state) in self.collections.iter() {
407 if collection_id.is_transient() || exclude_collections.contains(collection_id) {
408 continue;
409 }
410 let hydrated = match &collection_state.extra_state {
411 CollectionStateExtra::Ingestion(state) => {
412 if &state.instance_id != target_cluster_id {
413 continue;
414 }
415 match &target_replicas {
416 Some(target_replicas) => !state.hydrated_on.is_disjoint(target_replicas),
417 None => {
418 state.hydrated_on.len() >= 1
421 }
422 }
423 }
424 CollectionStateExtra::Export(_) => {
425 true
430 }
431 CollectionStateExtra::None => {
432 true
436 }
437 };
438 if !hydrated {
439 tracing::info!(%collection_id, "collection is not hydrated on any replica");
440 all_hydrated = false;
441 }
444 }
445 Ok(all_hydrated)
446 }
447
448 fn collection_frontiers(
449 &self,
450 id: GlobalId,
451 ) -> Result<(Antichain<Self::Timestamp>, Antichain<Self::Timestamp>), CollectionMissing> {
452 let frontiers = self.storage_collections.collection_frontiers(id)?;
453 Ok((frontiers.implied_capability, frontiers.write_frontier))
454 }
455
456 fn collections_frontiers(
457 &self,
458 mut ids: Vec<GlobalId>,
459 ) -> Result<Vec<(GlobalId, Antichain<T>, Antichain<T>)>, CollectionMissing> {
460 let mut result = vec![];
461 ids.retain(|&id| match self.export(id) {
466 Ok(export) => {
467 result.push((
468 id,
469 export.input_hold().since().clone(),
470 export.write_frontier.clone(),
471 ));
472 false
473 }
474 Err(_) => true,
475 });
476 result.extend(
477 self.storage_collections
478 .collections_frontiers(ids)?
479 .into_iter()
480 .map(|frontiers| {
481 (
482 frontiers.id,
483 frontiers.implied_capability,
484 frontiers.write_frontier,
485 )
486 }),
487 );
488
489 Ok(result)
490 }
491
492 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
493 self.storage_collections.active_collection_metadatas()
494 }
495
496 fn active_ingestion_exports(
497 &self,
498 instance_id: StorageInstanceId,
499 ) -> Box<dyn Iterator<Item = &GlobalId> + '_> {
500 let active_storage_collections: BTreeMap<_, _> = self
501 .storage_collections
502 .active_collection_frontiers()
503 .into_iter()
504 .map(|c| (c.id, c))
505 .collect();
506
507 let active_exports = self.instances[&instance_id]
508 .active_ingestion_exports()
509 .filter(move |id| {
510 let frontiers = active_storage_collections.get(id);
511 match frontiers {
512 Some(frontiers) => !frontiers.write_frontier.is_empty(),
513 None => {
514 false
516 }
517 }
518 });
519
520 Box::new(active_exports)
521 }
522
523 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
524 self.storage_collections.check_exists(id)
525 }
526
527 fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>) {
528 let metrics = self.metrics.for_instance(id);
529 let mut instance = Instance::new(
530 workload_class,
531 metrics,
532 self.now.clone(),
533 self.instance_response_tx.clone(),
534 );
535 if self.initialized {
536 instance.send(StorageCommand::InitializationComplete);
537 }
538 if !self.read_only {
539 instance.send(StorageCommand::AllowWrites);
540 }
541
542 let params = Box::new(self.config.parameters.clone());
543 instance.send(StorageCommand::UpdateConfiguration(params));
544
545 let old_instance = self.instances.insert(id, instance);
546 assert_none!(old_instance, "storage instance {id} already exists");
547 }
548
549 fn drop_instance(&mut self, id: StorageInstanceId) {
550 let instance = self.instances.remove(&id);
551 assert!(instance.is_some(), "storage instance {id} does not exist");
552 }
553
554 fn update_instance_workload_class(
555 &mut self,
556 id: StorageInstanceId,
557 workload_class: Option<String>,
558 ) {
559 let instance = self
560 .instances
561 .get_mut(&id)
562 .unwrap_or_else(|| panic!("instance {id} does not exist"));
563
564 instance.workload_class = workload_class;
565 }
566
567 fn connect_replica(
568 &mut self,
569 instance_id: StorageInstanceId,
570 replica_id: ReplicaId,
571 location: ClusterReplicaLocation,
572 ) {
573 let instance = self
574 .instances
575 .get_mut(&instance_id)
576 .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
577
578 let config = ReplicaConfig {
579 build_info: self.build_info,
580 location,
581 grpc_client: self.config.parameters.grpc_client.clone(),
582 };
583 instance.add_replica(replica_id, config);
584 }
585
586 fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId) {
587 let instance = self
588 .instances
589 .get_mut(&instance_id)
590 .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
591
592 let status_now = mz_ore::now::to_datetime((self.now)());
593 let mut source_status_updates = vec![];
594 let mut sink_status_updates = vec![];
595
596 let make_update = |id, object_type| StatusUpdate {
597 id,
598 status: Status::Paused,
599 timestamp: status_now,
600 error: None,
601 hints: BTreeSet::from([format!(
602 "The replica running this {object_type} has been dropped"
603 )]),
604 namespaced_errors: Default::default(),
605 replica_id: Some(replica_id),
606 };
607
608 for ingestion_id in instance.active_ingestions() {
609 if let Some(active_replicas) = self.dropped_objects.get_mut(ingestion_id) {
610 active_replicas.remove(&replica_id);
611 if active_replicas.is_empty() {
612 self.dropped_objects.remove(ingestion_id);
613 }
614 }
615
616 let ingestion = self
617 .collections
618 .get_mut(ingestion_id)
619 .expect("instance contains unknown ingestion");
620
621 let ingestion_description = match &ingestion.data_source {
622 DataSource::Ingestion(ingestion_description) => ingestion_description.clone(),
623 _ => panic!(
624 "unexpected data source for ingestion: {:?}",
625 ingestion.data_source
626 ),
627 };
628
629 let old_style_ingestion = *ingestion_id != ingestion_description.remap_collection_id;
630 let subsource_ids = ingestion_description.collection_ids().filter(|id| {
631 let should_discard =
636 old_style_ingestion && id == &ingestion_description.remap_collection_id;
637 !should_discard
638 });
639 for id in subsource_ids {
640 source_status_updates.push(make_update(id, "source"));
641 }
642 }
643
644 for id in instance.active_exports() {
645 if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
646 active_replicas.remove(&replica_id);
647 if active_replicas.is_empty() {
648 self.dropped_objects.remove(id);
649 }
650 }
651
652 sink_status_updates.push(make_update(*id, "sink"));
653 }
654
655 instance.drop_replica(replica_id);
656
657 if !self.read_only {
658 if !source_status_updates.is_empty() {
659 self.append_status_introspection_updates(
660 IntrospectionType::SourceStatusHistory,
661 source_status_updates,
662 );
663 }
664 if !sink_status_updates.is_empty() {
665 self.append_status_introspection_updates(
666 IntrospectionType::SinkStatusHistory,
667 sink_status_updates,
668 );
669 }
670 }
671 }
672
673 async fn evolve_nullability_for_bootstrap(
674 &mut self,
675 storage_metadata: &StorageMetadata,
676 collections: Vec<(GlobalId, RelationDesc)>,
677 ) -> Result<(), StorageError<Self::Timestamp>> {
678 let persist_client = self
679 .persist
680 .open(self.persist_location.clone())
681 .await
682 .unwrap();
683
684 for (global_id, relation_desc) in collections {
685 let shard_id = storage_metadata.get_collection_shard(global_id)?;
686 let diagnostics = Diagnostics {
687 shard_name: global_id.to_string(),
688 handle_purpose: "evolve nullability for bootstrap".to_string(),
689 };
690 let latest_schema = persist_client
691 .latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
692 .await
693 .expect("invalid persist usage");
694 let Some((schema_id, current_schema, _)) = latest_schema else {
695 tracing::debug!(?global_id, "no schema registered");
696 continue;
697 };
698 tracing::debug!(?global_id, ?current_schema, new_schema = ?relation_desc, "migrating schema");
699
700 let diagnostics = Diagnostics {
701 shard_name: global_id.to_string(),
702 handle_purpose: "evolve nullability for bootstrap".to_string(),
703 };
704 let evolve_result = persist_client
705 .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
706 shard_id,
707 schema_id,
708 &relation_desc,
709 &UnitSchema,
710 diagnostics,
711 )
712 .await
713 .expect("invalid persist usage");
714 match evolve_result {
715 CaESchema::Ok(_) => (),
716 CaESchema::ExpectedMismatch {
717 schema_id,
718 key,
719 val: _,
720 } => {
721 return Err(StorageError::PersistSchemaEvolveRace {
722 global_id,
723 shard_id,
724 schema_id,
725 relation_desc: key,
726 });
727 }
728 CaESchema::Incompatible => {
729 return Err(StorageError::PersistInvalidSchemaEvolve {
730 global_id,
731 shard_id,
732 });
733 }
734 };
735 }
736
737 Ok(())
738 }
739
740 #[instrument(name = "storage::create_collections")]
759 async fn create_collections_for_bootstrap(
760 &mut self,
761 storage_metadata: &StorageMetadata,
762 register_ts: Option<Self::Timestamp>,
763 mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
764 migrated_storage_collections: &BTreeSet<GlobalId>,
765 ) -> Result<(), StorageError<Self::Timestamp>> {
766 self.migrated_storage_collections
767 .extend(migrated_storage_collections.iter().cloned());
768
769 self.storage_collections
770 .create_collections_for_bootstrap(
771 storage_metadata,
772 register_ts.clone(),
773 collections.clone(),
774 migrated_storage_collections,
775 )
776 .await?;
777
778 drop(self.persist_warm_task.take());
781
782 collections.sort_by_key(|(id, _)| *id);
787 collections.dedup();
788 for pos in 1..collections.len() {
789 if collections[pos - 1].0 == collections[pos].0 {
790 return Err(StorageError::CollectionIdReused(collections[pos].0));
791 }
792 }
793
794 let enriched_with_metadata = collections
796 .into_iter()
797 .map(|(id, description)| {
798 let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
799
800 let txns_shard = description
803 .data_source
804 .in_txns()
805 .then(|| *self.txns_read.txns_id());
806
807 let metadata = CollectionMetadata {
808 persist_location: self.persist_location.clone(),
809 data_shard,
810 relation_desc: description.desc.clone(),
811 txns_shard,
812 };
813
814 Ok((id, description, metadata))
815 })
816 .collect_vec();
817
818 let persist_client = self
820 .persist
821 .open(self.persist_location.clone())
822 .await
823 .unwrap();
824 let persist_client = &persist_client;
825
826 use futures::stream::{StreamExt, TryStreamExt};
829 let this = &*self;
830 let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
831 .map(|data: Result<_, StorageError<Self::Timestamp>>| {
832 async move {
833 let (id, description, metadata) = data?;
834
835 debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
838
839 let write = this
840 .open_data_handles(
841 &id,
842 metadata.data_shard,
843 metadata.relation_desc.clone(),
844 persist_client,
845 )
846 .await;
847
848 Ok::<_, StorageError<T>>((id, description, write, metadata))
849 }
850 })
851 .buffer_unordered(50)
853 .try_collect()
866 .await?;
867
868 let mut to_execute = BTreeSet::new();
871 let mut new_collections = BTreeSet::new();
876 let mut table_registers = Vec::with_capacity(to_register.len());
877
878 to_register.sort_by_key(|(id, ..)| *id);
880
881 let (tables_to_register, collections_to_register): (Vec<_>, Vec<_>) = to_register
887 .into_iter()
888 .partition(|(_id, desc, ..)| desc.data_source == DataSource::Table);
889 let to_register = tables_to_register
890 .into_iter()
891 .rev()
892 .chain(collections_to_register.into_iter());
893
894 let mut new_source_statistic_entries = BTreeSet::new();
898 let mut new_webhook_statistic_entries = BTreeSet::new();
899 let mut new_sink_statistic_entries = BTreeSet::new();
900
901 for (id, description, write, metadata) in to_register {
902 let is_in_txns = |id, metadata: &CollectionMetadata| {
903 metadata.txns_shard.is_some()
904 && !(self.read_only && migrated_storage_collections.contains(&id))
905 };
906
907 to_execute.insert(id);
908 new_collections.insert(id);
909
910 let write_frontier = write.upper();
911
912 let storage_dependencies = self.determine_collection_dependencies(id, &description)?;
914
915 let dependency_read_holds = self
916 .storage_collections
917 .acquire_read_holds(storage_dependencies)
918 .expect("can acquire read holds");
919
920 let mut dependency_since = Antichain::from_elem(T::minimum());
921 for read_hold in dependency_read_holds.iter() {
922 dependency_since.join_assign(read_hold.since());
923 }
924
925 let data_source = description.data_source;
926
927 if !dependency_read_holds.is_empty()
936 && !is_in_txns(id, &metadata)
937 && !matches!(&data_source, DataSource::Sink { .. })
938 {
939 if dependency_since.is_empty() {
945 halt!(
946 "dependency since frontier is empty while dependent upper \
947 is not empty (dependent id={id}, write_frontier={:?}, dependency_read_holds={:?}), \
948 this indicates concurrent deletion of a collection",
949 write_frontier,
950 dependency_read_holds,
951 );
952 }
953
954 if description.primary.is_none() {
972 mz_ore::soft_assert_or_log!(
973 write_frontier.elements() == &[T::minimum()]
974 || write_frontier.is_empty()
975 || PartialOrder::less_than(&dependency_since, write_frontier),
976 "dependency since has advanced past dependent ({id}) upper \n
977 dependent ({id}): upper {:?} \n
978 dependency since {:?} \n
979 dependency read holds: {:?}",
980 write_frontier,
981 dependency_since,
982 dependency_read_holds,
983 );
984 }
985 }
986
987 let mut extra_state = CollectionStateExtra::None;
989 let mut maybe_instance_id = None;
990 match &data_source {
991 DataSource::Introspection(typ) => {
992 debug!(
993 ?data_source, meta = ?metadata,
994 "registering {id} with persist monotonic worker",
995 );
996 self.register_introspection_collection(
1002 id,
1003 *typ,
1004 write,
1005 persist_client.clone(),
1006 )?;
1007 }
1008 DataSource::Webhook => {
1009 debug!(
1010 ?data_source, meta = ?metadata,
1011 "registering {id} with persist monotonic worker",
1012 );
1013 new_source_statistic_entries.insert(id);
1014 new_webhook_statistic_entries.insert(id);
1017 self.collection_manager
1023 .register_append_only_collection(id, write, false, None);
1024 }
1025 DataSource::IngestionExport {
1026 ingestion_id,
1027 details,
1028 data_config,
1029 } => {
1030 debug!(
1031 ?data_source, meta = ?metadata,
1032 "not registering {id} with a controller persist worker",
1033 );
1034 let ingestion_state = self
1036 .collections
1037 .get_mut(ingestion_id)
1038 .expect("known to exist");
1039
1040 let instance_id = match &mut ingestion_state.data_source {
1041 DataSource::Ingestion(ingestion_desc) => {
1042 ingestion_desc.source_exports.insert(
1043 id,
1044 SourceExport {
1045 storage_metadata: (),
1046 details: details.clone(),
1047 data_config: data_config.clone(),
1048 },
1049 );
1050
1051 ingestion_desc.instance_id
1056 }
1057 _ => unreachable!(
1058 "SourceExport must only refer to primary sources that already exist"
1059 ),
1060 };
1061
1062 to_execute.remove(&id);
1064 to_execute.insert(*ingestion_id);
1065
1066 let ingestion_state = IngestionState {
1067 read_capabilities: MutableAntichain::from(dependency_since.clone()),
1068 dependency_read_holds,
1069 derived_since: dependency_since,
1070 write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1071 hold_policy: ReadPolicy::step_back(),
1072 instance_id,
1073 hydrated_on: BTreeSet::new(),
1074 };
1075
1076 extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1077 maybe_instance_id = Some(instance_id);
1078
1079 new_source_statistic_entries.insert(id);
1080 }
1081 DataSource::Table => {
1082 debug!(
1083 ?data_source, meta = ?metadata,
1084 "registering {id} with persist table worker",
1085 );
1086 table_registers.push((id, write));
1087 }
1088 DataSource::Progress | DataSource::Other => {
1089 debug!(
1090 ?data_source, meta = ?metadata,
1091 "not registering {id} with a controller persist worker",
1092 );
1093 }
1094 DataSource::Ingestion(ingestion_desc) => {
1095 debug!(
1096 ?data_source, meta = ?metadata,
1097 "not registering {id} with a controller persist worker",
1098 );
1099
1100 let mut dependency_since = Antichain::from_elem(T::minimum());
1101 for read_hold in dependency_read_holds.iter() {
1102 dependency_since.join_assign(read_hold.since());
1103 }
1104
1105 let ingestion_state = IngestionState {
1106 read_capabilities: MutableAntichain::from(dependency_since.clone()),
1107 dependency_read_holds,
1108 derived_since: dependency_since,
1109 write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1110 hold_policy: ReadPolicy::step_back(),
1111 instance_id: ingestion_desc.instance_id,
1112 hydrated_on: BTreeSet::new(),
1113 };
1114
1115 extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1116 maybe_instance_id = Some(ingestion_desc.instance_id);
1117
1118 new_source_statistic_entries.insert(id);
1119 }
1120 DataSource::Sink { desc } => {
1121 let mut dependency_since = Antichain::from_elem(T::minimum());
1122 for read_hold in dependency_read_holds.iter() {
1123 dependency_since.join_assign(read_hold.since());
1124 }
1125
1126 let [self_hold, read_hold] =
1127 dependency_read_holds.try_into().expect("two holds");
1128
1129 let state = ExportState::new(
1130 desc.instance_id,
1131 read_hold,
1132 self_hold,
1133 write_frontier.clone(),
1134 ReadPolicy::step_back(),
1135 );
1136 maybe_instance_id = Some(state.cluster_id);
1137 extra_state = CollectionStateExtra::Export(state);
1138
1139 new_sink_statistic_entries.insert(id);
1140 }
1141 }
1142
1143 let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(id, maybe_instance_id);
1144 let collection_state =
1145 CollectionState::new(data_source, metadata, extra_state, wallclock_lag_metrics);
1146
1147 self.collections.insert(id, collection_state);
1148 }
1149
1150 {
1151 let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1152
1153 for id in new_webhook_statistic_entries {
1156 source_statistics.webhook_statistics.entry(id).or_default();
1157 }
1158
1159 }
1163
1164 if !table_registers.is_empty() {
1166 let register_ts = register_ts
1167 .expect("caller should have provided a register_ts when creating a table");
1168
1169 if self.read_only {
1170 table_registers
1180 .retain(|(id, _write_handle)| migrated_storage_collections.contains(id));
1181
1182 self.persist_table_worker
1183 .register(register_ts, table_registers)
1184 .await
1185 .expect("table worker unexpectedly shut down");
1186 } else {
1187 self.persist_table_worker
1188 .register(register_ts, table_registers)
1189 .await
1190 .expect("table worker unexpectedly shut down");
1191 }
1192 }
1193
1194 self.append_shard_mappings(new_collections.into_iter(), Diff::ONE);
1195
1196 for id in to_execute {
1198 match &self.collection(id)?.data_source {
1199 DataSource::Ingestion(ingestion) => {
1200 if !self.read_only
1201 || (ENABLE_0DT_DEPLOYMENT_SOURCES.get(self.config.config_set())
1202 && ingestion.desc.connection.supports_read_only())
1203 {
1204 self.run_ingestion(id)?;
1205 }
1206 }
1207 DataSource::IngestionExport { .. } => unreachable!(
1208 "ingestion exports do not execute directly, but instead schedule their source to be re-executed"
1209 ),
1210 DataSource::Introspection(_)
1211 | DataSource::Webhook
1212 | DataSource::Table
1213 | DataSource::Progress
1214 | DataSource::Other => {}
1215 DataSource::Sink { .. } => {
1216 if !self.read_only {
1217 self.run_export(id)?;
1218 }
1219 }
1220 };
1221 }
1222
1223 Ok(())
1224 }
1225
1226 fn check_alter_ingestion_source_desc(
1227 &mut self,
1228 ingestion_id: GlobalId,
1229 source_desc: &SourceDesc,
1230 ) -> Result<(), StorageError<Self::Timestamp>> {
1231 let source_collection = self.collection(ingestion_id)?;
1232 let data_source = &source_collection.data_source;
1233 match &data_source {
1234 DataSource::Ingestion(cur_ingestion) => {
1235 cur_ingestion
1236 .desc
1237 .alter_compatible(ingestion_id, source_desc)?;
1238 }
1239 o => {
1240 tracing::info!(
1241 "{ingestion_id} inalterable because its data source is {:?} and not an ingestion",
1242 o
1243 );
1244 Err(AlterError { id: ingestion_id })?
1245 }
1246 }
1247
1248 Ok(())
1249 }
1250
1251 async fn alter_ingestion_source_desc(
1252 &mut self,
1253 ingestion_ids: BTreeMap<GlobalId, SourceDesc>,
1254 ) -> Result<(), StorageError<Self::Timestamp>> {
1255 let mut ingestions_to_run = BTreeSet::new();
1256
1257 for (id, new_desc) in ingestion_ids {
1258 let collection = self
1259 .collections
1260 .get_mut(&id)
1261 .ok_or_else(|| StorageError::IdentifierMissing(id))?;
1262
1263 match &mut collection.data_source {
1264 DataSource::Ingestion(ingestion) => {
1265 if ingestion.desc != new_desc {
1266 tracing::info!(
1267 from = ?ingestion.desc,
1268 to = ?new_desc,
1269 "alter_ingestion_source_desc, updating"
1270 );
1271 ingestion.desc = new_desc;
1272 ingestions_to_run.insert(id);
1273 }
1274 }
1275 o => {
1276 tracing::warn!("alter_ingestion_source_desc called on {:?}", o);
1277 Err(StorageError::IdentifierInvalid(id))?;
1278 }
1279 }
1280 }
1281
1282 for id in ingestions_to_run {
1283 self.run_ingestion(id)?;
1284 }
1285 Ok(())
1286 }
1287
1288 async fn alter_ingestion_connections(
1289 &mut self,
1290 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
1291 ) -> Result<(), StorageError<Self::Timestamp>> {
1292 let mut ingestions_to_run = BTreeSet::new();
1293
1294 for (id, conn) in source_connections {
1295 let collection = self
1296 .collections
1297 .get_mut(&id)
1298 .ok_or_else(|| StorageError::IdentifierMissing(id))?;
1299
1300 match &mut collection.data_source {
1301 DataSource::Ingestion(ingestion) => {
1302 if ingestion.desc.connection != conn {
1305 tracing::info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
1306 ingestion.desc.connection = conn;
1307 ingestions_to_run.insert(id);
1308 } else {
1309 tracing::warn!(
1310 "update_source_connection called on {id} but the \
1311 connection was the same"
1312 );
1313 }
1314 }
1315 o => {
1316 tracing::warn!("update_source_connection called on {:?}", o);
1317 Err(StorageError::IdentifierInvalid(id))?;
1318 }
1319 }
1320 }
1321
1322 for id in ingestions_to_run {
1323 self.run_ingestion(id)?;
1324 }
1325 Ok(())
1326 }
1327
1328 async fn alter_ingestion_export_data_configs(
1329 &mut self,
1330 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
1331 ) -> Result<(), StorageError<Self::Timestamp>> {
1332 let mut ingestions_to_run = BTreeSet::new();
1333
1334 for (source_export_id, new_data_config) in source_exports {
1335 let source_export_collection = self
1338 .collections
1339 .get_mut(&source_export_id)
1340 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1341 let ingestion_id = match &mut source_export_collection.data_source {
1342 DataSource::IngestionExport {
1343 ingestion_id,
1344 details: _,
1345 data_config,
1346 } => {
1347 *data_config = new_data_config.clone();
1348 *ingestion_id
1349 }
1350 o => {
1351 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1352 Err(StorageError::IdentifierInvalid(source_export_id))?
1353 }
1354 };
1355 let ingestion_collection = self
1358 .collections
1359 .get_mut(&ingestion_id)
1360 .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
1361
1362 match &mut ingestion_collection.data_source {
1363 DataSource::Ingestion(ingestion_desc) => {
1364 let source_export = ingestion_desc
1365 .source_exports
1366 .get_mut(&source_export_id)
1367 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1368
1369 if source_export.data_config != new_data_config {
1372 tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
1373 source_export.data_config = new_data_config;
1374
1375 ingestions_to_run.insert(ingestion_id);
1376 } else {
1377 tracing::warn!(
1378 "alter_ingestion_export_data_configs called on \
1379 export {source_export_id} of {ingestion_id} but \
1380 the data config was the same"
1381 );
1382 }
1383 }
1384 o => {
1385 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1386 Err(StorageError::IdentifierInvalid(ingestion_id))?
1387 }
1388 }
1389 }
1390
1391 for id in ingestions_to_run {
1392 self.run_ingestion(id)?;
1393 }
1394 Ok(())
1395 }
1396
1397 async fn alter_table_desc(
1398 &mut self,
1399 existing_collection: GlobalId,
1400 new_collection: GlobalId,
1401 new_desc: RelationDesc,
1402 expected_version: RelationVersion,
1403 register_ts: Self::Timestamp,
1404 ) -> Result<(), StorageError<Self::Timestamp>> {
1405 let data_shard = {
1406 let Controller {
1407 collections,
1408 storage_collections,
1409 ..
1410 } = self;
1411
1412 let existing = collections
1413 .get(&existing_collection)
1414 .ok_or(StorageError::IdentifierMissing(existing_collection))?;
1415 if existing.data_source != DataSource::Table {
1416 return Err(StorageError::IdentifierInvalid(existing_collection));
1417 }
1418
1419 storage_collections
1421 .alter_table_desc(
1422 existing_collection,
1423 new_collection,
1424 new_desc.clone(),
1425 expected_version,
1426 )
1427 .await?;
1428
1429 existing.collection_metadata.data_shard.clone()
1430 };
1431
1432 let persist_client = self
1433 .persist
1434 .open(self.persist_location.clone())
1435 .await
1436 .expect("invalid persist location");
1437 let write_handle = self
1438 .open_data_handles(
1439 &existing_collection,
1440 data_shard,
1441 new_desc.clone(),
1442 &persist_client,
1443 )
1444 .await;
1445
1446 let collection_meta = CollectionMetadata {
1447 persist_location: self.persist_location.clone(),
1448 data_shard,
1449 relation_desc: new_desc.clone(),
1450 txns_shard: Some(self.txns_read.txns_id().clone()),
1452 };
1453 let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(new_collection, None);
1455 let collection_state = CollectionState::new(
1456 DataSource::Table,
1457 collection_meta,
1458 CollectionStateExtra::None,
1459 wallclock_lag_metrics,
1460 );
1461
1462 self.collections.insert(new_collection, collection_state);
1465
1466 self.persist_table_worker
1467 .register(register_ts, vec![(new_collection, write_handle)])
1468 .await
1469 .expect("table worker unexpectedly shut down");
1470
1471 self.append_shard_mappings([new_collection].into_iter(), Diff::ONE);
1472
1473 Ok(())
1474 }
1475
1476 fn export(
1477 &self,
1478 id: GlobalId,
1479 ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1480 self.collections
1481 .get(&id)
1482 .and_then(|c| match &c.extra_state {
1483 CollectionStateExtra::Export(state) => Some(state),
1484 _ => None,
1485 })
1486 .ok_or(StorageError::IdentifierMissing(id))
1487 }
1488
1489 fn export_mut(
1490 &mut self,
1491 id: GlobalId,
1492 ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1493 self.collections
1494 .get_mut(&id)
1495 .and_then(|c| match &mut c.extra_state {
1496 CollectionStateExtra::Export(state) => Some(state),
1497 _ => None,
1498 })
1499 .ok_or(StorageError::IdentifierMissing(id))
1500 }
1501
1502 async fn create_oneshot_ingestion(
1504 &mut self,
1505 ingestion_id: uuid::Uuid,
1506 collection_id: GlobalId,
1507 instance_id: StorageInstanceId,
1508 request: OneshotIngestionRequest,
1509 result_tx: OneshotResultCallback<ProtoBatch>,
1510 ) -> Result<(), StorageError<Self::Timestamp>> {
1511 let collection_meta = self
1512 .collections
1513 .get(&collection_id)
1514 .ok_or_else(|| StorageError::IdentifierMissing(collection_id))?
1515 .collection_metadata
1516 .clone();
1517 let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1518 StorageError::Generic(anyhow::anyhow!("missing cluster {instance_id}"))
1520 })?;
1521 let oneshot_cmd = RunOneshotIngestion {
1522 ingestion_id,
1523 collection_id,
1524 collection_meta,
1525 request,
1526 };
1527
1528 if !self.read_only {
1529 instance.send(StorageCommand::RunOneshotIngestion(Box::new(oneshot_cmd)));
1530 let pending = PendingOneshotIngestion {
1531 result_tx,
1532 cluster_id: instance_id,
1533 };
1534 let novel = self
1535 .pending_oneshot_ingestions
1536 .insert(ingestion_id, pending);
1537 assert_none!(novel);
1538 Ok(())
1539 } else {
1540 Err(StorageError::ReadOnly)
1541 }
1542 }
1543
1544 fn cancel_oneshot_ingestion(
1545 &mut self,
1546 ingestion_id: uuid::Uuid,
1547 ) -> Result<(), StorageError<Self::Timestamp>> {
1548 if self.read_only {
1549 return Err(StorageError::ReadOnly);
1550 }
1551
1552 let pending = self
1553 .pending_oneshot_ingestions
1554 .remove(&ingestion_id)
1555 .ok_or_else(|| {
1556 StorageError::Generic(anyhow::anyhow!("missing oneshot ingestion {ingestion_id}"))
1558 })?;
1559
1560 match self.instances.get_mut(&pending.cluster_id) {
1561 Some(instance) => {
1562 instance.send(StorageCommand::CancelOneshotIngestion(ingestion_id));
1563 }
1564 None => {
1565 mz_ore::soft_panic_or_log!(
1566 "canceling oneshot ingestion on non-existent cluster, ingestion {:?}, instance {}",
1567 ingestion_id,
1568 pending.cluster_id,
1569 );
1570 }
1571 }
1572 pending.cancel();
1574
1575 Ok(())
1576 }
1577
1578 async fn alter_export(
1579 &mut self,
1580 id: GlobalId,
1581 new_description: ExportDescription<Self::Timestamp>,
1582 ) -> Result<(), StorageError<Self::Timestamp>> {
1583 let from_id = new_description.sink.from;
1584
1585 let desired_read_holds = vec![from_id.clone(), id.clone()];
1588 let [input_hold, self_hold] = self
1589 .storage_collections
1590 .acquire_read_holds(desired_read_holds)
1591 .expect("missing dependency")
1592 .try_into()
1593 .expect("expected number of holds");
1594 let from_storage_metadata = self.storage_collections.collection_metadata(from_id)?;
1595 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1596
1597 let cur_export = self.export_mut(id)?;
1599 let input_readable = cur_export
1600 .write_frontier
1601 .iter()
1602 .all(|t| input_hold.since().less_than(t));
1603 if !input_readable {
1604 return Err(StorageError::ReadBeforeSince(from_id));
1605 }
1606
1607 let new_export = ExportState {
1608 read_capabilities: cur_export.read_capabilities.clone(),
1609 cluster_id: new_description.instance_id,
1610 derived_since: cur_export.derived_since.clone(),
1611 read_holds: [input_hold, self_hold],
1612 read_policy: cur_export.read_policy.clone(),
1613 write_frontier: cur_export.write_frontier.clone(),
1614 };
1615 *cur_export = new_export;
1616
1617 let with_snapshot = new_description.sink.with_snapshot
1624 && !PartialOrder::less_than(&new_description.sink.as_of, &cur_export.write_frontier);
1625
1626 let cmd = RunSinkCommand {
1627 id,
1628 description: StorageSinkDesc {
1629 from: from_id,
1630 from_desc: new_description.sink.from_desc,
1631 connection: new_description.sink.connection,
1632 envelope: new_description.sink.envelope,
1633 as_of: new_description.sink.as_of,
1634 version: new_description.sink.version,
1635 from_storage_metadata,
1636 with_snapshot,
1637 to_storage_metadata,
1638 commit_interval: new_description.sink.commit_interval,
1639 },
1640 };
1641
1642 let instance = self
1644 .instances
1645 .get_mut(&new_description.instance_id)
1646 .ok_or_else(|| StorageError::ExportInstanceMissing {
1647 storage_instance_id: new_description.instance_id,
1648 export_id: id,
1649 })?;
1650
1651 instance.send(StorageCommand::RunSink(Box::new(cmd)));
1652 Ok(())
1653 }
1654
1655 async fn alter_export_connections(
1657 &mut self,
1658 exports: BTreeMap<GlobalId, StorageSinkConnection>,
1659 ) -> Result<(), StorageError<Self::Timestamp>> {
1660 let mut updates_by_instance =
1661 BTreeMap::<StorageInstanceId, Vec<(RunSinkCommand<T>, ExportDescription<T>)>>::new();
1662
1663 for (id, connection) in exports {
1664 let (mut new_export_description, as_of): (ExportDescription<Self::Timestamp>, _) = {
1672 let export = &self.collections[&id];
1673 let DataSource::Sink { desc } = &export.data_source else {
1674 panic!("export exists")
1675 };
1676 let CollectionStateExtra::Export(state) = &export.extra_state else {
1677 panic!("export exists")
1678 };
1679 let export_description = desc.clone();
1680 let as_of = state.input_hold().since().clone();
1681
1682 (export_description, as_of)
1683 };
1684 let current_sink = new_export_description.sink.clone();
1685
1686 new_export_description.sink.connection = connection;
1687
1688 current_sink.alter_compatible(id, &new_export_description.sink)?;
1690
1691 let from_storage_metadata = self
1692 .storage_collections
1693 .collection_metadata(new_export_description.sink.from)?;
1694 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1695
1696 let cmd = RunSinkCommand {
1697 id,
1698 description: StorageSinkDesc {
1699 from: new_export_description.sink.from,
1700 from_desc: new_export_description.sink.from_desc.clone(),
1701 connection: new_export_description.sink.connection.clone(),
1702 envelope: new_export_description.sink.envelope,
1703 with_snapshot: new_export_description.sink.with_snapshot,
1704 version: new_export_description.sink.version,
1705 as_of: as_of.to_owned(),
1716 from_storage_metadata,
1717 to_storage_metadata,
1718 commit_interval: new_export_description.sink.commit_interval,
1719 },
1720 };
1721
1722 let update = updates_by_instance
1723 .entry(new_export_description.instance_id)
1724 .or_default();
1725 update.push((cmd, new_export_description));
1726 }
1727
1728 for (instance_id, updates) in updates_by_instance {
1729 let mut export_updates = BTreeMap::new();
1730 let mut cmds = Vec::with_capacity(updates.len());
1731
1732 for (cmd, export_state) in updates {
1733 export_updates.insert(cmd.id, export_state);
1734 cmds.push(cmd);
1735 }
1736
1737 let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1739 StorageError::ExportInstanceMissing {
1740 storage_instance_id: instance_id,
1741 export_id: *export_updates
1742 .keys()
1743 .next()
1744 .expect("set of exports not empty"),
1745 }
1746 })?;
1747
1748 for cmd in cmds {
1749 instance.send(StorageCommand::RunSink(Box::new(cmd)));
1750 }
1751
1752 for (id, new_export_description) in export_updates {
1754 let Some(state) = self.collections.get_mut(&id) else {
1755 panic!("export known to exist")
1756 };
1757 let DataSource::Sink { desc } = &mut state.data_source else {
1758 panic!("export known to exist")
1759 };
1760 *desc = new_export_description;
1761 }
1762 }
1763
1764 Ok(())
1765 }
1766
1767 fn drop_tables(
1782 &mut self,
1783 storage_metadata: &StorageMetadata,
1784 identifiers: Vec<GlobalId>,
1785 ts: Self::Timestamp,
1786 ) -> Result<(), StorageError<Self::Timestamp>> {
1787 let (table_write_ids, data_source_ids): (Vec<_>, Vec<_>) = identifiers
1789 .into_iter()
1790 .partition(|id| match self.collections[id].data_source {
1791 DataSource::Table => true,
1792 DataSource::IngestionExport { .. } | DataSource::Webhook => false,
1793 _ => panic!("identifier is not a table: {}", id),
1794 });
1795
1796 if table_write_ids.len() > 0 {
1798 let drop_notif = self
1799 .persist_table_worker
1800 .drop_handles(table_write_ids.clone(), ts);
1801 let tx = self.pending_table_handle_drops_tx.clone();
1802 mz_ore::task::spawn(|| "table-cleanup".to_string(), async move {
1803 drop_notif.await;
1804 for identifier in table_write_ids {
1805 let _ = tx.send(identifier);
1806 }
1807 });
1808 }
1809
1810 if data_source_ids.len() > 0 {
1812 self.validate_collection_ids(data_source_ids.iter().cloned())?;
1813 self.drop_sources_unvalidated(storage_metadata, data_source_ids)?;
1814 }
1815
1816 Ok(())
1817 }
1818
1819 fn drop_sources(
1820 &mut self,
1821 storage_metadata: &StorageMetadata,
1822 identifiers: Vec<GlobalId>,
1823 ) -> Result<(), StorageError<Self::Timestamp>> {
1824 self.validate_collection_ids(identifiers.iter().cloned())?;
1825 self.drop_sources_unvalidated(storage_metadata, identifiers)
1826 }
1827
1828 fn drop_sources_unvalidated(
1829 &mut self,
1830 storage_metadata: &StorageMetadata,
1831 ids: Vec<GlobalId>,
1832 ) -> Result<(), StorageError<Self::Timestamp>> {
1833 let mut ingestions_to_execute = BTreeSet::new();
1836 let mut ingestions_to_drop = BTreeSet::new();
1837 let mut source_statistics_to_drop = Vec::new();
1838
1839 let mut collections_to_drop = Vec::new();
1843
1844 for id in ids.iter() {
1845 let collection_state = self.collections.get(id);
1846
1847 if let Some(collection_state) = collection_state {
1848 match collection_state.data_source {
1849 DataSource::Webhook => {
1850 let fut = self.collection_manager.unregister_collection(*id);
1853 mz_ore::task::spawn(|| format!("storage-webhook-cleanup-{id}"), fut);
1854
1855 collections_to_drop.push(*id);
1856 source_statistics_to_drop.push(*id);
1857 }
1858 DataSource::Ingestion(_) => {
1859 ingestions_to_drop.insert(*id);
1860 source_statistics_to_drop.push(*id);
1861 }
1862 DataSource::IngestionExport { ingestion_id, .. } => {
1863 ingestions_to_execute.insert(ingestion_id);
1870
1871 let ingestion_state = match self.collections.get_mut(&ingestion_id) {
1873 Some(ingestion_collection) => ingestion_collection,
1874 None => {
1876 tracing::error!(
1877 "primary source {ingestion_id} seemingly dropped before subsource {id}"
1878 );
1879 continue;
1880 }
1881 };
1882
1883 match &mut ingestion_state.data_source {
1884 DataSource::Ingestion(ingestion_desc) => {
1885 let removed = ingestion_desc.source_exports.remove(id);
1886 mz_ore::soft_assert_or_log!(
1887 removed.is_some(),
1888 "dropped subsource {id} already removed from source exports"
1889 );
1890 }
1891 _ => unreachable!(
1892 "SourceExport must only refer to primary sources that already exist"
1893 ),
1894 };
1895
1896 ingestions_to_drop.insert(*id);
1900 source_statistics_to_drop.push(*id);
1901 }
1902 DataSource::Progress | DataSource::Table | DataSource::Other => {
1903 collections_to_drop.push(*id);
1904 }
1905 DataSource::Introspection(_) | DataSource::Sink { .. } => {
1906 soft_panic_or_log!(
1909 "drop_sources called on a {:?} (id={id}))",
1910 collection_state.data_source,
1911 );
1912 }
1913 }
1914 }
1915 }
1916
1917 ingestions_to_execute.retain(|id| !ingestions_to_drop.contains(id));
1919 for ingestion_id in ingestions_to_execute {
1920 self.run_ingestion(ingestion_id)?;
1921 }
1922
1923 let ingestion_policies = ingestions_to_drop
1930 .iter()
1931 .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
1932 .collect();
1933
1934 tracing::debug!(
1935 ?ingestion_policies,
1936 "dropping sources by setting read hold policies"
1937 );
1938 self.set_hold_policies(ingestion_policies);
1939
1940 let shards_to_update: BTreeSet<_> = ingestions_to_drop
1942 .iter()
1943 .chain(collections_to_drop.iter())
1944 .cloned()
1945 .collect();
1946 self.append_shard_mappings(shards_to_update.into_iter(), Diff::MINUS_ONE);
1947
1948 let status_now = mz_ore::now::to_datetime((self.now)());
1949 let mut status_updates = vec![];
1950 for id in ingestions_to_drop.iter() {
1951 status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
1952 }
1953
1954 if !self.read_only {
1955 self.append_status_introspection_updates(
1956 IntrospectionType::SourceStatusHistory,
1957 status_updates,
1958 );
1959 }
1960
1961 {
1962 let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1963 for id in source_statistics_to_drop {
1964 source_statistics
1965 .source_statistics
1966 .retain(|(stats_id, _), _| stats_id != &id);
1967 source_statistics
1968 .webhook_statistics
1969 .retain(|stats_id, _| stats_id != &id);
1970 }
1971 }
1972
1973 for id in ingestions_to_drop.iter().chain(collections_to_drop.iter()) {
1975 tracing::info!(%id, "dropping collection state");
1976 let collection = self
1977 .collections
1978 .remove(id)
1979 .expect("list populated after checking that self.collections contains it");
1980
1981 let instance = match &collection.extra_state {
1982 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
1983 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
1984 CollectionStateExtra::None => None,
1985 }
1986 .and_then(|i| self.instances.get(&i));
1987
1988 if let Some(instance) = instance {
1992 let active_replicas = instance.get_active_replicas_for_object(id);
1993 if !active_replicas.is_empty() {
1994 match &collection.data_source {
2001 DataSource::Ingestion(ingestion_desc) => {
2002 if *id != ingestion_desc.remap_collection_id {
2003 self.dropped_objects.insert(
2004 ingestion_desc.remap_collection_id,
2005 active_replicas.clone(),
2006 );
2007 }
2008 }
2009 _ => {}
2010 }
2011
2012 self.dropped_objects.insert(*id, active_replicas);
2013 }
2014 }
2015 }
2016
2017 self.storage_collections
2019 .drop_collections_unvalidated(storage_metadata, ids);
2020
2021 Ok(())
2022 }
2023
2024 fn drop_sinks(
2026 &mut self,
2027 storage_metadata: &StorageMetadata,
2028 identifiers: Vec<GlobalId>,
2029 ) -> Result<(), StorageError<Self::Timestamp>> {
2030 self.validate_export_ids(identifiers.iter().cloned())?;
2031 self.drop_sinks_unvalidated(storage_metadata, identifiers);
2032 Ok(())
2033 }
2034
2035 fn drop_sinks_unvalidated(
2036 &mut self,
2037 storage_metadata: &StorageMetadata,
2038 mut sinks_to_drop: Vec<GlobalId>,
2039 ) {
2040 sinks_to_drop.retain(|id| self.export(*id).is_ok());
2042
2043 let drop_policy = sinks_to_drop
2050 .iter()
2051 .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
2052 .collect();
2053
2054 tracing::debug!(
2055 ?drop_policy,
2056 "dropping sources by setting read hold policies"
2057 );
2058 self.set_hold_policies(drop_policy);
2059
2060 let status_now = mz_ore::now::to_datetime((self.now)());
2067
2068 let mut status_updates = vec![];
2070 {
2071 let mut sink_statistics = self.sink_statistics.lock().expect("poisoned");
2072 for id in sinks_to_drop.iter() {
2073 status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
2074 sink_statistics.retain(|(stats_id, _), _| stats_id != id);
2075 }
2076 }
2077
2078 if !self.read_only {
2079 self.append_status_introspection_updates(
2080 IntrospectionType::SinkStatusHistory,
2081 status_updates,
2082 );
2083 }
2084
2085 for id in sinks_to_drop.iter() {
2087 tracing::info!(%id, "dropping export state");
2088 let collection = self
2089 .collections
2090 .remove(id)
2091 .expect("list populated after checking that self.collections contains it");
2092
2093 let instance = match &collection.extra_state {
2094 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
2095 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
2096 CollectionStateExtra::None => None,
2097 }
2098 .and_then(|i| self.instances.get(&i));
2099
2100 if let Some(instance) = instance {
2104 let active_replicas = instance.get_active_replicas_for_object(id);
2105 if !active_replicas.is_empty() {
2106 self.dropped_objects.insert(*id, active_replicas);
2107 }
2108 }
2109 }
2110
2111 self.storage_collections
2113 .drop_collections_unvalidated(storage_metadata, sinks_to_drop);
2114 }
2115
2116 #[instrument(level = "debug")]
2117 fn append_table(
2118 &mut self,
2119 write_ts: Self::Timestamp,
2120 advance_to: Self::Timestamp,
2121 commands: Vec<(GlobalId, Vec<TableData>)>,
2122 ) -> Result<
2123 tokio::sync::oneshot::Receiver<Result<(), StorageError<Self::Timestamp>>>,
2124 StorageError<Self::Timestamp>,
2125 > {
2126 if self.read_only {
2127 if !commands
2130 .iter()
2131 .all(|(id, _)| id.is_system() && self.migrated_storage_collections.contains(id))
2132 {
2133 return Err(StorageError::ReadOnly);
2134 }
2135 }
2136
2137 for (id, updates) in commands.iter() {
2139 if !updates.is_empty() {
2140 if !write_ts.less_than(&advance_to) {
2141 return Err(StorageError::UpdateBeyondUpper(*id));
2142 }
2143 }
2144 }
2145
2146 Ok(self
2147 .persist_table_worker
2148 .append(write_ts, advance_to, commands))
2149 }
2150
2151 fn monotonic_appender(
2152 &self,
2153 id: GlobalId,
2154 ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>> {
2155 self.collection_manager.monotonic_appender(id)
2156 }
2157
2158 fn webhook_statistics(
2159 &self,
2160 id: GlobalId,
2161 ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>> {
2162 let source_statistics = self.source_statistics.lock().expect("poisoned");
2164 source_statistics
2165 .webhook_statistics
2166 .get(&id)
2167 .cloned()
2168 .ok_or(StorageError::IdentifierMissing(id))
2169 }
2170
2171 async fn ready(&mut self) {
2172 if self.maintenance_scheduled {
2173 return;
2174 }
2175
2176 if !self.pending_table_handle_drops_rx.is_empty() {
2177 return;
2178 }
2179
2180 tokio::select! {
2181 Some(m) = self.instance_response_rx.recv() => {
2182 self.stashed_responses.push(m);
2183 while let Ok(m) = self.instance_response_rx.try_recv() {
2184 self.stashed_responses.push(m);
2185 }
2186 }
2187 _ = self.maintenance_ticker.tick() => {
2188 self.maintenance_scheduled = true;
2189 },
2190 };
2191 }
2192
2193 #[instrument(level = "debug")]
2194 fn process(
2195 &mut self,
2196 storage_metadata: &StorageMetadata,
2197 ) -> Result<Option<Response<T>>, anyhow::Error> {
2198 if self.maintenance_scheduled {
2200 self.maintain();
2201 self.maintenance_scheduled = false;
2202 }
2203
2204 for instance in self.instances.values_mut() {
2205 instance.rehydrate_failed_replicas();
2206 }
2207
2208 let mut status_updates = vec![];
2209 let mut updated_frontiers = BTreeMap::new();
2210
2211 let stashed_responses = std::mem::take(&mut self.stashed_responses);
2213 for resp in stashed_responses {
2214 match resp {
2215 (_replica_id, StorageResponse::FrontierUpper(id, upper)) => {
2216 self.update_write_frontier(id, &upper);
2217 updated_frontiers.insert(id, upper);
2218 }
2219 (replica_id, StorageResponse::DroppedId(id)) => {
2220 let replica_id = replica_id.expect("DroppedId from unknown replica");
2221 if let Some(remaining_replicas) = self.dropped_objects.get_mut(&id) {
2222 remaining_replicas.remove(&replica_id);
2223 if remaining_replicas.is_empty() {
2224 self.dropped_objects.remove(&id);
2225 }
2226 } else {
2227 soft_panic_or_log!("unexpected DroppedId for {id}");
2228 }
2229 }
2230 (replica_id, StorageResponse::StatisticsUpdates(source_stats, sink_stats)) => {
2231 {
2233 let replica_id = if let Some(replica_id) = replica_id {
2240 replica_id
2241 } else {
2242 tracing::error!(
2243 ?source_stats,
2244 "missing replica_id for source statistics update"
2245 );
2246 continue;
2247 };
2248
2249 let mut shared_stats = self.source_statistics.lock().expect("poisoned");
2250
2251 for stat in source_stats {
2252 let collection_id = stat.id.clone();
2253
2254 if self.collection(collection_id).is_err() {
2255 continue;
2258 }
2259
2260 let entry = shared_stats
2261 .source_statistics
2262 .entry((stat.id, Some(replica_id)));
2263
2264 match entry {
2265 btree_map::Entry::Vacant(vacant_entry) => {
2266 let mut stats = ControllerSourceStatistics::new(
2267 collection_id,
2268 Some(replica_id),
2269 );
2270 stats.incorporate(stat);
2271 vacant_entry.insert(stats);
2272 }
2273 btree_map::Entry::Occupied(mut occupied_entry) => {
2274 occupied_entry.get_mut().incorporate(stat);
2275 }
2276 }
2277 }
2278 }
2279
2280 {
2281 let replica_id = if let Some(replica_id) = replica_id {
2292 replica_id
2293 } else {
2294 tracing::error!(
2295 ?sink_stats,
2296 "missing replica_id for sink statistics update"
2297 );
2298 continue;
2299 };
2300
2301 let mut shared_stats = self.sink_statistics.lock().expect("poisoned");
2302
2303 for stat in sink_stats {
2304 let collection_id = stat.id.clone();
2305
2306 if self.collection(collection_id).is_err() {
2307 continue;
2310 }
2311
2312 let entry = shared_stats.entry((stat.id, Some(replica_id)));
2313
2314 match entry {
2315 btree_map::Entry::Vacant(vacant_entry) => {
2316 let mut stats =
2317 ControllerSinkStatistics::new(collection_id, replica_id);
2318 stats.incorporate(stat);
2319 vacant_entry.insert(stats);
2320 }
2321 btree_map::Entry::Occupied(mut occupied_entry) => {
2322 occupied_entry.get_mut().incorporate(stat);
2323 }
2324 }
2325 }
2326 }
2327 }
2328 (replica_id, StorageResponse::StatusUpdate(mut status_update)) => {
2329 match status_update.status {
2345 Status::Running => {
2346 let collection = self.collections.get_mut(&status_update.id);
2347 match collection {
2348 Some(collection) => {
2349 match collection.extra_state {
2350 CollectionStateExtra::Ingestion(
2351 ref mut ingestion_state,
2352 ) => {
2353 if ingestion_state.hydrated_on.is_empty() {
2354 tracing::debug!(ingestion_id = %status_update.id, "ingestion is hydrated");
2355 }
2356 ingestion_state.hydrated_on.insert(replica_id.expect(
2357 "replica id should be present for status running",
2358 ));
2359 }
2360 CollectionStateExtra::Export(_) => {
2361 }
2363 CollectionStateExtra::None => {
2364 }
2366 }
2367 }
2368 None => (), }
2371 }
2372 Status::Paused => {
2373 let collection = self.collections.get_mut(&status_update.id);
2374 match collection {
2375 Some(collection) => {
2376 match collection.extra_state {
2377 CollectionStateExtra::Ingestion(
2378 ref mut ingestion_state,
2379 ) => {
2380 tracing::debug!(ingestion_id = %status_update.id, "ingestion is now paused");
2387 ingestion_state.hydrated_on.clear();
2388 }
2389 CollectionStateExtra::Export(_) => {
2390 }
2392 CollectionStateExtra::None => {
2393 }
2395 }
2396 }
2397 None => (), }
2400 }
2401 _ => (),
2402 }
2403
2404 if let Some(id) = replica_id {
2406 status_update.replica_id = Some(id);
2407 }
2408 status_updates.push(status_update);
2409 }
2410 (_replica_id, StorageResponse::StagedBatches(batches)) => {
2411 for (ingestion_id, batches) in batches {
2412 match self.pending_oneshot_ingestions.remove(&ingestion_id) {
2413 Some(pending) => {
2414 if let Some(instance) = self.instances.get_mut(&pending.cluster_id)
2417 {
2418 instance
2419 .send(StorageCommand::CancelOneshotIngestion(ingestion_id));
2420 }
2421 (pending.result_tx)(batches)
2423 }
2424 None => {
2425 }
2428 }
2429 }
2430 }
2431 }
2432 }
2433
2434 self.record_status_updates(status_updates);
2435
2436 let mut dropped_table_ids = Vec::new();
2438 while let Ok(dropped_id) = self.pending_table_handle_drops_rx.try_recv() {
2439 dropped_table_ids.push(dropped_id);
2440 }
2441 if !dropped_table_ids.is_empty() {
2442 self.drop_sources(storage_metadata, dropped_table_ids)?;
2443 }
2444
2445 if updated_frontiers.is_empty() {
2446 Ok(None)
2447 } else {
2448 Ok(Some(Response::FrontierUpdates(
2449 updated_frontiers.into_iter().collect(),
2450 )))
2451 }
2452 }
2453
2454 async fn inspect_persist_state(
2455 &self,
2456 id: GlobalId,
2457 ) -> Result<serde_json::Value, anyhow::Error> {
2458 let collection = &self.storage_collections.collection_metadata(id)?;
2459 let client = self
2460 .persist
2461 .open(collection.persist_location.clone())
2462 .await?;
2463 let shard_state = client
2464 .inspect_shard::<Self::Timestamp>(&collection.data_shard)
2465 .await?;
2466 let json_state = serde_json::to_value(shard_state)?;
2467 Ok(json_state)
2468 }
2469
2470 fn append_introspection_updates(
2471 &mut self,
2472 type_: IntrospectionType,
2473 updates: Vec<(Row, Diff)>,
2474 ) {
2475 let id = self.introspection_ids[&type_];
2476 let updates = updates.into_iter().map(|update| update.into()).collect();
2477 self.collection_manager.blind_write(id, updates);
2478 }
2479
2480 fn append_status_introspection_updates(
2481 &mut self,
2482 type_: IntrospectionType,
2483 updates: Vec<StatusUpdate>,
2484 ) {
2485 let id = self.introspection_ids[&type_];
2486 let updates: Vec<_> = updates.into_iter().map(|update| update.into()).collect();
2487 if !updates.is_empty() {
2488 self.collection_manager.blind_write(id, updates);
2489 }
2490 }
2491
2492 fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp) {
2493 let id = self.introspection_ids[&type_];
2494 self.collection_manager.differential_write(id, op);
2495 }
2496
2497 fn append_only_introspection_tx(
2498 &self,
2499 type_: IntrospectionType,
2500 ) -> mpsc::UnboundedSender<(
2501 Vec<AppendOnlyUpdate>,
2502 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2503 )> {
2504 let id = self.introspection_ids[&type_];
2505 self.collection_manager.append_only_write_sender(id)
2506 }
2507
2508 fn differential_introspection_tx(
2509 &self,
2510 type_: IntrospectionType,
2511 ) -> mpsc::UnboundedSender<(
2512 StorageWriteOp,
2513 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2514 )> {
2515 let id = self.introspection_ids[&type_];
2516 self.collection_manager.differential_write_sender(id)
2517 }
2518
2519 async fn real_time_recent_timestamp(
2520 &self,
2521 timestamp_objects: BTreeSet<GlobalId>,
2522 timeout: Duration,
2523 ) -> Result<
2524 BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>,
2525 StorageError<Self::Timestamp>,
2526 > {
2527 use mz_storage_types::sources::GenericSourceConnection;
2528
2529 let mut rtr_futures = BTreeMap::new();
2530
2531 for id in timestamp_objects.into_iter().filter(GlobalId::is_user) {
2533 let collection = match self.collection(id) {
2534 Ok(c) => c,
2535 Err(_) => continue,
2537 };
2538
2539 let (source_conn, remap_id) = match &collection.data_source {
2540 DataSource::Ingestion(IngestionDescription {
2541 desc: SourceDesc { connection, .. },
2542 remap_collection_id,
2543 ..
2544 }) => match connection {
2545 GenericSourceConnection::Kafka(_)
2546 | GenericSourceConnection::Postgres(_)
2547 | GenericSourceConnection::MySql(_)
2548 | GenericSourceConnection::SqlServer(_) => {
2549 (connection.clone(), *remap_collection_id)
2550 }
2551
2552 GenericSourceConnection::LoadGenerator(_) => continue,
2557 },
2558 _ => {
2560 continue;
2561 }
2562 };
2563
2564 let config = self.config().clone();
2566
2567 let read_handle = self.read_handle_for_snapshot(remap_id).await?;
2575
2576 let remap_read_hold = self
2579 .storage_collections
2580 .acquire_read_holds(vec![remap_id])
2581 .map_err(|_e| StorageError::ReadBeforeSince(remap_id))?
2582 .expect_element(|| "known to be exactly one");
2583
2584 let remap_as_of = remap_read_hold
2585 .since()
2586 .to_owned()
2587 .into_option()
2588 .ok_or(StorageError::ReadBeforeSince(remap_id))?;
2589
2590 rtr_futures.insert(
2591 id,
2592 tokio::time::timeout(timeout, async move {
2593 use mz_storage_types::sources::SourceConnection as _;
2594
2595 let as_of = Antichain::from_elem(remap_as_of);
2598 let remap_subscribe = read_handle
2599 .subscribe(as_of.clone())
2600 .await
2601 .map_err(|_| StorageError::ReadBeforeSince(remap_id))?;
2602
2603 tracing::debug!(?id, type_ = source_conn.name(), upstream = ?source_conn.external_reference(), "fetching real time recency");
2604
2605 let result = rtr::real_time_recency_ts(
2606 source_conn,
2607 id,
2608 config,
2609 as_of,
2610 remap_subscribe,
2611 )
2612 .await
2613 .map_err(|e| {
2614 tracing::debug!(?id, "real time recency error: {:?}", e);
2615 e
2616 });
2617
2618 drop(remap_read_hold);
2620
2621 result
2622 }),
2623 );
2624 }
2625
2626 Ok(Box::pin(async move {
2627 let (ids, futs): (Vec<_>, Vec<_>) = rtr_futures.into_iter().unzip();
2628 ids.into_iter()
2629 .zip_eq(futures::future::join_all(futs).await)
2630 .try_fold(T::minimum(), |curr, (id, per_source_res)| {
2631 let new =
2632 per_source_res.map_err(|_e: Elapsed| StorageError::RtrTimeout(id))??;
2633 Ok::<_, StorageError<Self::Timestamp>>(std::cmp::max(curr, new))
2634 })
2635 }))
2636 }
2637
2638 fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2639 let Self {
2641 build_info: _,
2642 now: _,
2643 read_only,
2644 collections,
2645 dropped_objects,
2646 persist_table_worker: _,
2647 txns_read: _,
2648 txns_metrics: _,
2649 stashed_responses,
2650 pending_table_handle_drops_tx: _,
2651 pending_table_handle_drops_rx: _,
2652 pending_oneshot_ingestions,
2653 collection_manager: _,
2654 introspection_ids,
2655 introspection_tokens: _,
2656 source_statistics: _,
2657 sink_statistics: _,
2658 statistics_interval_sender: _,
2659 instances,
2660 initialized,
2661 config,
2662 persist_location,
2663 persist: _,
2664 metrics: _,
2665 recorded_frontiers,
2666 recorded_replica_frontiers,
2667 wallclock_lag: _,
2668 wallclock_lag_last_recorded,
2669 storage_collections: _,
2670 migrated_storage_collections,
2671 maintenance_ticker: _,
2672 maintenance_scheduled,
2673 instance_response_tx: _,
2674 instance_response_rx: _,
2675 persist_warm_task: _,
2676 } = self;
2677
2678 let collections: BTreeMap<_, _> = collections
2679 .iter()
2680 .map(|(id, c)| (id.to_string(), format!("{c:?}")))
2681 .collect();
2682 let dropped_objects: BTreeMap<_, _> = dropped_objects
2683 .iter()
2684 .map(|(id, rs)| (id.to_string(), format!("{rs:?}")))
2685 .collect();
2686 let stashed_responses: Vec<_> =
2687 stashed_responses.iter().map(|r| format!("{r:?}")).collect();
2688 let pending_oneshot_ingestions: BTreeMap<_, _> = pending_oneshot_ingestions
2689 .iter()
2690 .map(|(uuid, i)| (uuid.to_string(), format!("{i:?}")))
2691 .collect();
2692 let introspection_ids: BTreeMap<_, _> = introspection_ids
2693 .iter()
2694 .map(|(typ, id)| (format!("{typ:?}"), id.to_string()))
2695 .collect();
2696 let instances: BTreeMap<_, _> = instances
2697 .iter()
2698 .map(|(id, i)| (id.to_string(), format!("{i:?}")))
2699 .collect();
2700 let recorded_frontiers: BTreeMap<_, _> = recorded_frontiers
2701 .iter()
2702 .map(|(id, fs)| (id.to_string(), format!("{fs:?}")))
2703 .collect();
2704 let recorded_replica_frontiers: Vec<_> = recorded_replica_frontiers
2705 .iter()
2706 .map(|((gid, rid), f)| (gid.to_string(), rid.to_string(), format!("{f:?}")))
2707 .collect();
2708 let migrated_storage_collections: Vec<_> = migrated_storage_collections
2709 .iter()
2710 .map(|id| id.to_string())
2711 .collect();
2712
2713 Ok(serde_json::json!({
2714 "read_only": read_only,
2715 "collections": collections,
2716 "dropped_objects": dropped_objects,
2717 "stashed_responses": stashed_responses,
2718 "pending_oneshot_ingestions": pending_oneshot_ingestions,
2719 "introspection_ids": introspection_ids,
2720 "instances": instances,
2721 "initialized": initialized,
2722 "config": format!("{config:?}"),
2723 "persist_location": format!("{persist_location:?}"),
2724 "recorded_frontiers": recorded_frontiers,
2725 "recorded_replica_frontiers": recorded_replica_frontiers,
2726 "wallclock_lag_last_recorded": format!("{wallclock_lag_last_recorded:?}"),
2727 "migrated_storage_collections": migrated_storage_collections,
2728 "maintenance_scheduled": maintenance_scheduled,
2729 }))
2730 }
2731}
2732
2733pub fn prepare_initialization<T>(txn: &mut dyn StorageTxn<T>) -> Result<(), StorageError<T>> {
2740 if txn.get_txn_wal_shard().is_none() {
2741 let txns_id = ShardId::new();
2742 txn.write_txn_wal_shard(txns_id)?;
2743 }
2744
2745 Ok(())
2746}
2747
2748impl<T> Controller<T>
2749where
2750 T: Timestamp
2751 + Lattice
2752 + TotalOrder
2753 + Codec64
2754 + From<EpochMillis>
2755 + TimestampManipulation
2756 + Into<Datum<'static>>,
2757 Self: StorageController<Timestamp = T>,
2758{
2759 pub async fn new(
2767 build_info: &'static BuildInfo,
2768 persist_location: PersistLocation,
2769 persist_clients: Arc<PersistClientCache>,
2770 now: NowFn,
2771 wallclock_lag: WallclockLagFn<T>,
2772 txns_metrics: Arc<TxnMetrics>,
2773 read_only: bool,
2774 metrics_registry: &MetricsRegistry,
2775 controller_metrics: ControllerMetrics,
2776 connection_context: ConnectionContext,
2777 txn: &dyn StorageTxn<T>,
2778 storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
2779 ) -> Self {
2780 let txns_client = persist_clients
2781 .open(persist_location.clone())
2782 .await
2783 .expect("location should be valid");
2784
2785 let persist_warm_task = warm_persist_state_in_background(
2786 txns_client.clone(),
2787 txn.get_collection_metadata().into_values(),
2788 );
2789 let persist_warm_task = Some(persist_warm_task.abort_on_drop());
2790
2791 let txns_id = txn
2795 .get_txn_wal_shard()
2796 .expect("must call prepare initialization before creating storage controller");
2797
2798 let persist_table_worker = if read_only {
2799 let txns_write = txns_client
2800 .open_writer(
2801 txns_id,
2802 Arc::new(TxnsCodecRow::desc()),
2803 Arc::new(UnitSchema),
2804 Diagnostics {
2805 shard_name: "txns".to_owned(),
2806 handle_purpose: "follow txns upper".to_owned(),
2807 },
2808 )
2809 .await
2810 .expect("txns schema shouldn't change");
2811 persist_handles::PersistTableWriteWorker::new_read_only_mode(txns_write)
2812 } else {
2813 let mut txns = TxnsHandle::open(
2814 T::minimum(),
2815 txns_client.clone(),
2816 txns_client.dyncfgs().clone(),
2817 Arc::clone(&txns_metrics),
2818 txns_id,
2819 Opaque::encode(&PersistEpoch::default()),
2820 )
2821 .await;
2822 txns.upgrade_version().await;
2823 persist_handles::PersistTableWriteWorker::new_txns(txns)
2824 };
2825 let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
2826
2827 let collection_manager = collection_mgmt::CollectionManager::new(read_only, now.clone());
2828
2829 let introspection_ids = BTreeMap::new();
2830 let introspection_tokens = Arc::new(Mutex::new(BTreeMap::new()));
2831
2832 let (statistics_interval_sender, _) =
2833 channel(mz_storage_types::parameters::STATISTICS_INTERVAL_DEFAULT);
2834
2835 let (pending_table_handle_drops_tx, pending_table_handle_drops_rx) =
2836 tokio::sync::mpsc::unbounded_channel();
2837
2838 let mut maintenance_ticker = tokio::time::interval(Duration::from_secs(1));
2839 maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2840
2841 let (instance_response_tx, instance_response_rx) = mpsc::unbounded_channel();
2842
2843 let metrics = StorageControllerMetrics::new(metrics_registry, controller_metrics);
2844
2845 let now_dt = mz_ore::now::to_datetime(now());
2846
2847 Self {
2848 build_info,
2849 collections: BTreeMap::default(),
2850 dropped_objects: Default::default(),
2851 persist_table_worker,
2852 txns_read,
2853 txns_metrics,
2854 stashed_responses: vec![],
2855 pending_table_handle_drops_tx,
2856 pending_table_handle_drops_rx,
2857 pending_oneshot_ingestions: BTreeMap::default(),
2858 collection_manager,
2859 introspection_ids,
2860 introspection_tokens,
2861 now,
2862 read_only,
2863 source_statistics: Arc::new(Mutex::new(statistics::SourceStatistics {
2864 source_statistics: BTreeMap::new(),
2865 webhook_statistics: BTreeMap::new(),
2866 })),
2867 sink_statistics: Arc::new(Mutex::new(BTreeMap::new())),
2868 statistics_interval_sender,
2869 instances: BTreeMap::new(),
2870 initialized: false,
2871 config: StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs()),
2872 persist_location,
2873 persist: persist_clients,
2874 metrics,
2875 recorded_frontiers: BTreeMap::new(),
2876 recorded_replica_frontiers: BTreeMap::new(),
2877 wallclock_lag,
2878 wallclock_lag_last_recorded: now_dt,
2879 storage_collections,
2880 migrated_storage_collections: BTreeSet::new(),
2881 maintenance_ticker,
2882 maintenance_scheduled: false,
2883 instance_response_rx,
2884 instance_response_tx,
2885 persist_warm_task,
2886 }
2887 }
2888
2889 #[instrument(level = "debug")]
2897 fn set_hold_policies(&mut self, policies: Vec<(GlobalId, ReadPolicy<T>)>) {
2898 let mut read_capability_changes = BTreeMap::default();
2899
2900 for (id, policy) in policies.into_iter() {
2901 if let Some(collection) = self.collections.get_mut(&id) {
2902 let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state
2903 {
2904 CollectionStateExtra::Ingestion(ingestion) => (
2905 ingestion.write_frontier.borrow(),
2906 &mut ingestion.derived_since,
2907 &mut ingestion.hold_policy,
2908 ),
2909 CollectionStateExtra::None => {
2910 unreachable!("set_hold_policies is only called for ingestions");
2911 }
2912 CollectionStateExtra::Export(export) => (
2913 export.write_frontier.borrow(),
2914 &mut export.derived_since,
2915 &mut export.read_policy,
2916 ),
2917 };
2918
2919 let new_derived_since = policy.frontier(write_frontier);
2920 let mut update = swap_updates(derived_since, new_derived_since);
2921 if !update.is_empty() {
2922 read_capability_changes.insert(id, update);
2923 }
2924
2925 *hold_policy = policy;
2926 }
2927 }
2928
2929 if !read_capability_changes.is_empty() {
2930 self.update_hold_capabilities(&mut read_capability_changes);
2931 }
2932 }
2933
2934 #[instrument(level = "debug", fields(updates))]
2935 fn update_write_frontier(&mut self, id: GlobalId, new_upper: &Antichain<T>) {
2936 let mut read_capability_changes = BTreeMap::default();
2937
2938 if let Some(collection) = self.collections.get_mut(&id) {
2939 let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state {
2940 CollectionStateExtra::Ingestion(ingestion) => (
2941 &mut ingestion.write_frontier,
2942 &mut ingestion.derived_since,
2943 &ingestion.hold_policy,
2944 ),
2945 CollectionStateExtra::None => {
2946 if matches!(collection.data_source, DataSource::Progress) {
2947 } else {
2949 tracing::error!(
2950 ?collection,
2951 ?new_upper,
2952 "updated write frontier for collection which is not an ingestion"
2953 );
2954 }
2955 return;
2956 }
2957 CollectionStateExtra::Export(export) => (
2958 &mut export.write_frontier,
2959 &mut export.derived_since,
2960 &export.read_policy,
2961 ),
2962 };
2963
2964 if PartialOrder::less_than(write_frontier, new_upper) {
2965 write_frontier.clone_from(new_upper);
2966 }
2967
2968 let new_derived_since = hold_policy.frontier(write_frontier.borrow());
2969 let mut update = swap_updates(derived_since, new_derived_since);
2970 if !update.is_empty() {
2971 read_capability_changes.insert(id, update);
2972 }
2973 } else if self.dropped_objects.contains_key(&id) {
2974 } else {
2977 soft_panic_or_log!("spurious upper update for {id}: {new_upper:?}");
2978 }
2979
2980 if !read_capability_changes.is_empty() {
2981 self.update_hold_capabilities(&mut read_capability_changes);
2982 }
2983 }
2984
2985 #[instrument(level = "debug", fields(updates))]
2989 fn update_hold_capabilities(&mut self, updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>) {
2990 let mut collections_net = BTreeMap::new();
2992
2993 while let Some(key) = updates.keys().rev().next().cloned() {
2998 let mut update = updates.remove(&key).unwrap();
2999
3000 if key.is_user() {
3001 debug!(id = %key, ?update, "update_hold_capability");
3002 }
3003
3004 if let Some(collection) = self.collections.get_mut(&key) {
3005 match &mut collection.extra_state {
3006 CollectionStateExtra::Ingestion(ingestion) => {
3007 let changes = ingestion.read_capabilities.update_iter(update.drain());
3008 update.extend(changes);
3009
3010 let (changes, frontier, _cluster_id) =
3011 collections_net.entry(key).or_insert_with(|| {
3012 (
3013 <ChangeBatch<_>>::new(),
3014 Antichain::new(),
3015 ingestion.instance_id,
3016 )
3017 });
3018
3019 changes.extend(update.drain());
3020 *frontier = ingestion.read_capabilities.frontier().to_owned();
3021 }
3022 CollectionStateExtra::None => {
3023 soft_panic_or_log!(
3025 "trying to update holds for collection {collection:?} which is not \
3026 an ingestion: {update:?}"
3027 );
3028 continue;
3029 }
3030 CollectionStateExtra::Export(export) => {
3031 let changes = export.read_capabilities.update_iter(update.drain());
3032 update.extend(changes);
3033
3034 let (changes, frontier, _cluster_id) =
3035 collections_net.entry(key).or_insert_with(|| {
3036 (<ChangeBatch<_>>::new(), Antichain::new(), export.cluster_id)
3037 });
3038
3039 changes.extend(update.drain());
3040 *frontier = export.read_capabilities.frontier().to_owned();
3041 }
3042 }
3043 } else {
3044 tracing::warn!(id = ?key, ?update, "update_hold_capabilities for unknown object");
3046 }
3047 }
3048
3049 for (key, (mut changes, frontier, cluster_id)) in collections_net {
3052 if !changes.is_empty() {
3053 if key.is_user() {
3054 debug!(id = %key, ?frontier, "downgrading ingestion read holds!");
3055 }
3056
3057 let collection = self
3058 .collections
3059 .get_mut(&key)
3060 .expect("missing collection state");
3061
3062 let read_holds = match &mut collection.extra_state {
3063 CollectionStateExtra::Ingestion(ingestion) => {
3064 ingestion.dependency_read_holds.as_mut_slice()
3065 }
3066 CollectionStateExtra::Export(export) => export.read_holds.as_mut_slice(),
3067 CollectionStateExtra::None => {
3068 soft_panic_or_log!(
3069 "trying to downgrade read holds for collection which is not an \
3070 ingestion: {collection:?}"
3071 );
3072 continue;
3073 }
3074 };
3075
3076 for read_hold in read_holds.iter_mut() {
3077 read_hold
3078 .try_downgrade(frontier.clone())
3079 .expect("we only advance the frontier");
3080 }
3081
3082 if let Some(instance) = self.instances.get_mut(&cluster_id) {
3084 instance.send(StorageCommand::AllowCompaction(key, frontier.clone()));
3085 } else {
3086 soft_panic_or_log!(
3087 "missing instance client for cluster {cluster_id} while we still have outstanding AllowCompaction command {frontier:?} for {key}"
3088 );
3089 }
3090 }
3091 }
3092 }
3093
3094 fn validate_collection_ids(
3096 &self,
3097 ids: impl Iterator<Item = GlobalId>,
3098 ) -> Result<(), StorageError<T>> {
3099 for id in ids {
3100 self.storage_collections.check_exists(id)?;
3101 }
3102 Ok(())
3103 }
3104
3105 fn validate_export_ids(
3107 &self,
3108 ids: impl Iterator<Item = GlobalId>,
3109 ) -> Result<(), StorageError<T>> {
3110 for id in ids {
3111 self.export(id)?;
3112 }
3113 Ok(())
3114 }
3115
3116 async fn open_data_handles(
3124 &self,
3125 id: &GlobalId,
3126 shard: ShardId,
3127 relation_desc: RelationDesc,
3128 persist_client: &PersistClient,
3129 ) -> WriteHandle<SourceData, (), T, StorageDiff> {
3130 let diagnostics = Diagnostics {
3131 shard_name: id.to_string(),
3132 handle_purpose: format!("controller data for {}", id),
3133 };
3134
3135 let mut write = persist_client
3136 .open_writer(
3137 shard,
3138 Arc::new(relation_desc),
3139 Arc::new(UnitSchema),
3140 diagnostics.clone(),
3141 )
3142 .await
3143 .expect("invalid persist usage");
3144
3145 write.fetch_recent_upper().await;
3154
3155 write
3156 }
3157
3158 fn register_introspection_collection(
3163 &mut self,
3164 id: GlobalId,
3165 introspection_type: IntrospectionType,
3166 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
3167 persist_client: PersistClient,
3168 ) -> Result<(), StorageError<T>> {
3169 tracing::info!(%id, ?introspection_type, "registering introspection collection");
3170
3171 let force_writable = self.read_only && self.migrated_storage_collections.contains(&id);
3175 if force_writable {
3176 assert!(id.is_system(), "unexpected non-system global id: {id:?}");
3177 info!("writing to migrated storage collection {id} in read-only mode");
3178 }
3179
3180 let prev = self.introspection_ids.insert(introspection_type, id);
3181 assert!(
3182 prev.is_none(),
3183 "cannot have multiple IDs for introspection type"
3184 );
3185
3186 let metadata = self.storage_collections.collection_metadata(id)?.clone();
3187
3188 let read_handle_fn = move || {
3189 let persist_client = persist_client.clone();
3190 let metadata = metadata.clone();
3191
3192 let fut = async move {
3193 let read_handle = persist_client
3194 .open_leased_reader::<SourceData, (), T, StorageDiff>(
3195 metadata.data_shard,
3196 Arc::new(metadata.relation_desc.clone()),
3197 Arc::new(UnitSchema),
3198 Diagnostics {
3199 shard_name: id.to_string(),
3200 handle_purpose: format!("snapshot {}", id),
3201 },
3202 USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3203 )
3204 .await
3205 .expect("invalid persist usage");
3206 read_handle
3207 };
3208
3209 fut.boxed()
3210 };
3211
3212 let recent_upper = write_handle.shared_upper();
3213
3214 match CollectionManagerKind::from(&introspection_type) {
3215 CollectionManagerKind::Differential => {
3220 let statistics_retention_duration =
3221 dyncfgs::STATISTICS_RETENTION_DURATION.get(self.config().config_set());
3222
3223 let introspection_config = DifferentialIntrospectionConfig {
3225 recent_upper,
3226 introspection_type,
3227 storage_collections: Arc::clone(&self.storage_collections),
3228 collection_manager: self.collection_manager.clone(),
3229 source_statistics: Arc::clone(&self.source_statistics),
3230 sink_statistics: Arc::clone(&self.sink_statistics),
3231 statistics_interval: self.config.parameters.statistics_interval.clone(),
3232 statistics_interval_receiver: self.statistics_interval_sender.subscribe(),
3233 statistics_retention_duration,
3234 metrics: self.metrics.clone(),
3235 introspection_tokens: Arc::clone(&self.introspection_tokens),
3236 };
3237 self.collection_manager.register_differential_collection(
3238 id,
3239 write_handle,
3240 read_handle_fn,
3241 force_writable,
3242 introspection_config,
3243 );
3244 }
3245 CollectionManagerKind::AppendOnly => {
3253 let introspection_config = AppendOnlyIntrospectionConfig {
3254 introspection_type,
3255 config_set: Arc::clone(self.config.config_set()),
3256 parameters: self.config.parameters.clone(),
3257 storage_collections: Arc::clone(&self.storage_collections),
3258 };
3259 self.collection_manager.register_append_only_collection(
3260 id,
3261 write_handle,
3262 force_writable,
3263 Some(introspection_config),
3264 );
3265 }
3266 }
3267
3268 Ok(())
3269 }
3270
3271 fn reconcile_dangling_statistics(&self) {
3274 self.source_statistics
3275 .lock()
3276 .expect("poisoned")
3277 .source_statistics
3278 .retain(|(k, _replica_id), _| self.storage_collections.check_exists(*k).is_ok());
3280 self.sink_statistics
3281 .lock()
3282 .expect("poisoned")
3283 .retain(|(k, _replica_id), _| self.export(*k).is_ok());
3284 }
3285
3286 #[instrument(level = "debug")]
3296 fn append_shard_mappings<I>(&self, global_ids: I, diff: Diff)
3297 where
3298 I: Iterator<Item = GlobalId>,
3299 {
3300 mz_ore::soft_assert_or_log!(
3301 diff == Diff::MINUS_ONE || diff == Diff::ONE,
3302 "use 1 for insert or -1 for delete"
3303 );
3304
3305 let id = *self
3306 .introspection_ids
3307 .get(&IntrospectionType::ShardMapping)
3308 .expect("should be registered before this call");
3309
3310 let mut updates = vec![];
3311 let mut row_buf = Row::default();
3313
3314 for global_id in global_ids {
3315 let shard_id = if let Some(collection) = self.collections.get(&global_id) {
3316 collection.collection_metadata.data_shard.clone()
3317 } else {
3318 panic!("unknown global id: {}", global_id);
3319 };
3320
3321 let mut packer = row_buf.packer();
3322 packer.push(Datum::from(global_id.to_string().as_str()));
3323 packer.push(Datum::from(shard_id.to_string().as_str()));
3324 updates.push((row_buf.clone(), diff));
3325 }
3326
3327 self.collection_manager.differential_append(id, updates);
3328 }
3329
3330 fn determine_collection_dependencies(
3332 &self,
3333 self_id: GlobalId,
3334 collection_desc: &CollectionDescription<T>,
3335 ) -> Result<Vec<GlobalId>, StorageError<T>> {
3336 let mut dependencies = Vec::new();
3337
3338 if let Some(id) = collection_desc.primary {
3339 dependencies.push(id);
3340 }
3341
3342 match &collection_desc.data_source {
3343 DataSource::Introspection(_)
3344 | DataSource::Webhook
3345 | DataSource::Table
3346 | DataSource::Progress
3347 | DataSource::Other => (),
3348 DataSource::IngestionExport { ingestion_id, .. } => {
3349 let source_collection = self.collection(*ingestion_id)?;
3352 let ingestion_remap_collection_id = match &source_collection.data_source {
3353 DataSource::Ingestion(ingestion) => ingestion.remap_collection_id,
3354 _ => unreachable!(
3355 "SourceExport must only refer to primary sources that already exist"
3356 ),
3357 };
3358
3359 dependencies.extend([self_id, ingestion_remap_collection_id]);
3365 }
3366 DataSource::Ingestion(ingestion) => {
3368 dependencies.push(self_id);
3373 if self_id != ingestion.remap_collection_id {
3374 dependencies.push(ingestion.remap_collection_id);
3375 }
3376 }
3377 DataSource::Sink { desc } => {
3378 dependencies.extend([self_id, desc.sink.from]);
3380 }
3381 };
3382
3383 Ok(dependencies)
3384 }
3385
3386 async fn read_handle_for_snapshot(
3387 &self,
3388 id: GlobalId,
3389 ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
3390 let metadata = self.storage_collections.collection_metadata(id)?;
3391 read_handle_for_snapshot(&self.persist, id, &metadata).await
3392 }
3393
3394 fn record_status_updates(&mut self, updates: Vec<StatusUpdate>) {
3397 if self.read_only {
3398 return;
3399 }
3400
3401 let mut sink_status_updates = vec![];
3402 let mut source_status_updates = vec![];
3403
3404 for update in updates {
3405 let id = update.id;
3406 if self.export(id).is_ok() {
3407 sink_status_updates.push(update);
3408 } else if self.storage_collections.check_exists(id).is_ok() {
3409 source_status_updates.push(update);
3410 }
3411 }
3412
3413 self.append_status_introspection_updates(
3414 IntrospectionType::SourceStatusHistory,
3415 source_status_updates,
3416 );
3417 self.append_status_introspection_updates(
3418 IntrospectionType::SinkStatusHistory,
3419 sink_status_updates,
3420 );
3421 }
3422
3423 fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, StorageError<T>> {
3424 self.collections
3425 .get(&id)
3426 .ok_or(StorageError::IdentifierMissing(id))
3427 }
3428
3429 fn run_ingestion(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3432 tracing::info!(%id, "starting ingestion");
3433
3434 let collection = self.collection(id)?;
3435 let ingestion_description = match &collection.data_source {
3436 DataSource::Ingestion(i) => i.clone(),
3437 _ => {
3438 tracing::warn!("run_ingestion called on non-ingestion ID {}", id);
3439 Err(StorageError::IdentifierInvalid(id))?
3440 }
3441 };
3442
3443 let mut source_exports = BTreeMap::new();
3445 for (export_id, export) in ingestion_description.source_exports.clone() {
3446 let export_storage_metadata = self.collection(export_id)?.collection_metadata.clone();
3447 source_exports.insert(
3448 export_id,
3449 SourceExport {
3450 storage_metadata: export_storage_metadata,
3451 details: export.details,
3452 data_config: export.data_config,
3453 },
3454 );
3455 }
3456
3457 let remap_collection = self.collection(ingestion_description.remap_collection_id)?;
3458
3459 let description = IngestionDescription::<CollectionMetadata> {
3460 source_exports,
3461 remap_metadata: remap_collection.collection_metadata.clone(),
3462 desc: ingestion_description.desc.clone(),
3464 instance_id: ingestion_description.instance_id,
3465 remap_collection_id: ingestion_description.remap_collection_id,
3466 };
3467
3468 let storage_instance_id = description.instance_id;
3469 let instance = self
3471 .instances
3472 .get_mut(&storage_instance_id)
3473 .ok_or_else(|| StorageError::IngestionInstanceMissing {
3474 storage_instance_id,
3475 ingestion_id: id,
3476 })?;
3477
3478 let augmented_ingestion = Box::new(RunIngestionCommand { id, description });
3479 instance.send(StorageCommand::RunIngestion(augmented_ingestion));
3480
3481 Ok(())
3482 }
3483
3484 fn run_export(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3487 let DataSource::Sink { desc: description } = &self.collections[&id].data_source else {
3488 return Err(StorageError::IdentifierMissing(id));
3489 };
3490
3491 let from_storage_metadata = self
3492 .storage_collections
3493 .collection_metadata(description.sink.from)?;
3494 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
3495
3496 let export_state = self.storage_collections.collection_frontiers(id)?;
3500 let mut as_of = description.sink.as_of.clone();
3501 as_of.join_assign(&export_state.implied_capability);
3502 let with_snapshot = description.sink.with_snapshot
3503 && !PartialOrder::less_than(&as_of, &export_state.write_frontier);
3504
3505 info!(
3506 sink_id = %id,
3507 from_id = %description.sink.from,
3508 write_frontier = ?export_state.write_frontier,
3509 ?as_of,
3510 ?with_snapshot,
3511 "run_export"
3512 );
3513
3514 let cmd = RunSinkCommand {
3515 id,
3516 description: StorageSinkDesc {
3517 from: description.sink.from,
3518 from_desc: description.sink.from_desc.clone(),
3519 connection: description.sink.connection.clone(),
3520 envelope: description.sink.envelope,
3521 as_of,
3522 version: description.sink.version,
3523 from_storage_metadata,
3524 with_snapshot,
3525 to_storage_metadata,
3526 commit_interval: description.sink.commit_interval,
3527 },
3528 };
3529
3530 let storage_instance_id = description.instance_id.clone();
3531
3532 let instance = self
3533 .instances
3534 .get_mut(&storage_instance_id)
3535 .ok_or_else(|| StorageError::ExportInstanceMissing {
3536 storage_instance_id,
3537 export_id: id,
3538 })?;
3539
3540 instance.send(StorageCommand::RunSink(Box::new(cmd)));
3541
3542 Ok(())
3543 }
3544
3545 fn update_frontier_introspection(&mut self) {
3550 let mut global_frontiers = BTreeMap::new();
3551 let mut replica_frontiers = BTreeMap::new();
3552
3553 for collection_frontiers in self.storage_collections.active_collection_frontiers() {
3554 let id = collection_frontiers.id;
3555 let since = collection_frontiers.read_capabilities;
3556 let upper = collection_frontiers.write_frontier;
3557
3558 let instance = self
3559 .collections
3560 .get(&id)
3561 .and_then(|collection_state| match &collection_state.extra_state {
3562 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
3563 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
3564 CollectionStateExtra::None => None,
3565 })
3566 .and_then(|i| self.instances.get(&i));
3567
3568 if let Some(instance) = instance {
3569 for replica_id in instance.replica_ids() {
3570 replica_frontiers.insert((id, replica_id), upper.clone());
3571 }
3572 }
3573
3574 global_frontiers.insert(id, (since, upper));
3575 }
3576
3577 let mut global_updates = Vec::new();
3578 let mut replica_updates = Vec::new();
3579
3580 let mut push_global_update =
3581 |id: GlobalId, (since, upper): (Antichain<T>, Antichain<T>), diff: Diff| {
3582 let read_frontier = since.into_option().map_or(Datum::Null, |t| t.into());
3583 let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3584 let row = Row::pack_slice(&[
3585 Datum::String(&id.to_string()),
3586 read_frontier,
3587 write_frontier,
3588 ]);
3589 global_updates.push((row, diff));
3590 };
3591
3592 let mut push_replica_update =
3593 |(id, replica_id): (GlobalId, ReplicaId), upper: Antichain<T>, diff: Diff| {
3594 let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3595 let row = Row::pack_slice(&[
3596 Datum::String(&id.to_string()),
3597 Datum::String(&replica_id.to_string()),
3598 write_frontier,
3599 ]);
3600 replica_updates.push((row, diff));
3601 };
3602
3603 let mut old_global_frontiers =
3604 std::mem::replace(&mut self.recorded_frontiers, global_frontiers);
3605 for (&id, new) in &self.recorded_frontiers {
3606 match old_global_frontiers.remove(&id) {
3607 Some(old) if &old != new => {
3608 push_global_update(id, new.clone(), Diff::ONE);
3609 push_global_update(id, old, Diff::MINUS_ONE);
3610 }
3611 Some(_) => (),
3612 None => push_global_update(id, new.clone(), Diff::ONE),
3613 }
3614 }
3615 for (id, old) in old_global_frontiers {
3616 push_global_update(id, old, Diff::MINUS_ONE);
3617 }
3618
3619 let mut old_replica_frontiers =
3620 std::mem::replace(&mut self.recorded_replica_frontiers, replica_frontiers);
3621 for (&key, new) in &self.recorded_replica_frontiers {
3622 match old_replica_frontiers.remove(&key) {
3623 Some(old) if &old != new => {
3624 push_replica_update(key, new.clone(), Diff::ONE);
3625 push_replica_update(key, old, Diff::MINUS_ONE);
3626 }
3627 Some(_) => (),
3628 None => push_replica_update(key, new.clone(), Diff::ONE),
3629 }
3630 }
3631 for (key, old) in old_replica_frontiers {
3632 push_replica_update(key, old, Diff::MINUS_ONE);
3633 }
3634
3635 let id = self.introspection_ids[&IntrospectionType::Frontiers];
3636 self.collection_manager
3637 .differential_append(id, global_updates);
3638
3639 let id = self.introspection_ids[&IntrospectionType::ReplicaFrontiers];
3640 self.collection_manager
3641 .differential_append(id, replica_updates);
3642 }
3643
3644 fn refresh_wallclock_lag(&mut self) {
3663 let now_ms = (self.now)();
3664 let histogram_period =
3665 WallclockLagHistogramPeriod::from_epoch_millis(now_ms, self.config.config_set());
3666
3667 let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
3668 Some(ts) => (self.wallclock_lag)(ts.clone()),
3669 None => Duration::ZERO,
3670 };
3671
3672 for frontiers in self.storage_collections.active_collection_frontiers() {
3673 let id = frontiers.id;
3674 let Some(collection) = self.collections.get_mut(&id) else {
3675 continue;
3676 };
3677
3678 let collection_unreadable =
3679 PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
3680 let lag = if collection_unreadable {
3681 WallclockLag::Undefined
3682 } else {
3683 let lag = frontier_lag(&frontiers.write_frontier);
3684 WallclockLag::Seconds(lag.as_secs())
3685 };
3686
3687 collection.wallclock_lag_max = collection.wallclock_lag_max.max(lag);
3688
3689 let secs = lag.unwrap_seconds_or(u64::MAX);
3692 collection.wallclock_lag_metrics.observe(secs);
3693
3694 if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
3695 let bucket = lag.map_seconds(|secs| secs.next_power_of_two());
3696
3697 let instance_id = match &collection.extra_state {
3698 CollectionStateExtra::Ingestion(i) => Some(i.instance_id),
3699 CollectionStateExtra::Export(e) => Some(e.cluster_id()),
3700 CollectionStateExtra::None => None,
3701 };
3702 let workload_class = instance_id
3703 .and_then(|id| self.instances.get(&id))
3704 .and_then(|i| i.workload_class.clone());
3705 let labels = match workload_class {
3706 Some(wc) => [("workload_class", wc.clone())].into(),
3707 None => BTreeMap::new(),
3708 };
3709
3710 let key = (histogram_period, bucket, labels);
3711 *stash.entry(key).or_default() += Diff::ONE;
3712 }
3713 }
3714
3715 self.maybe_record_wallclock_lag();
3717 }
3718
3719 fn maybe_record_wallclock_lag(&mut self) {
3727 if self.read_only {
3728 return;
3729 }
3730
3731 let duration_trunc = |datetime: DateTime<_>, interval| {
3732 let td = TimeDelta::from_std(interval).ok()?;
3733 datetime.duration_trunc(td).ok()
3734 };
3735
3736 let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(self.config.config_set());
3737 let now_dt = mz_ore::now::to_datetime((self.now)());
3738 let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
3739 soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
3740 let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
3741 duration_trunc(now_dt, *default).unwrap()
3742 });
3743 if now_trunc <= self.wallclock_lag_last_recorded {
3744 return;
3745 }
3746
3747 let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
3748
3749 let mut history_updates = Vec::new();
3750 let mut histogram_updates = Vec::new();
3751 let mut row_buf = Row::default();
3752 for frontiers in self.storage_collections.active_collection_frontiers() {
3753 let id = frontiers.id;
3754 let Some(collection) = self.collections.get_mut(&id) else {
3755 continue;
3756 };
3757
3758 let max_lag = std::mem::replace(&mut collection.wallclock_lag_max, WallclockLag::MIN);
3759 let row = Row::pack_slice(&[
3760 Datum::String(&id.to_string()),
3761 Datum::Null,
3762 max_lag.into_interval_datum(),
3763 Datum::TimestampTz(now_ts),
3764 ]);
3765 history_updates.push((row, Diff::ONE));
3766
3767 let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
3768 continue;
3769 };
3770
3771 for ((period, lag, labels), count) in std::mem::take(stash) {
3772 let mut packer = row_buf.packer();
3773 packer.extend([
3774 Datum::TimestampTz(period.start),
3775 Datum::TimestampTz(period.end),
3776 Datum::String(&id.to_string()),
3777 lag.into_uint64_datum(),
3778 ]);
3779 let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
3780 packer.push_dict(labels);
3781
3782 histogram_updates.push((row_buf.clone(), count));
3783 }
3784 }
3785
3786 if !history_updates.is_empty() {
3787 self.append_introspection_updates(
3788 IntrospectionType::WallclockLagHistory,
3789 history_updates,
3790 );
3791 }
3792 if !histogram_updates.is_empty() {
3793 self.append_introspection_updates(
3794 IntrospectionType::WallclockLagHistogram,
3795 histogram_updates,
3796 );
3797 }
3798
3799 self.wallclock_lag_last_recorded = now_trunc;
3800 }
3801
3802 fn maintain(&mut self) {
3807 self.update_frontier_introspection();
3808 self.refresh_wallclock_lag();
3809
3810 for instance in self.instances.values_mut() {
3812 instance.refresh_state_metrics();
3813 }
3814 }
3815}
3816
3817impl From<&IntrospectionType> for CollectionManagerKind {
3818 fn from(value: &IntrospectionType) -> Self {
3819 match value {
3820 IntrospectionType::ShardMapping
3821 | IntrospectionType::Frontiers
3822 | IntrospectionType::ReplicaFrontiers
3823 | IntrospectionType::StorageSourceStatistics
3824 | IntrospectionType::StorageSinkStatistics
3825 | IntrospectionType::ComputeDependencies
3826 | IntrospectionType::ComputeOperatorHydrationStatus
3827 | IntrospectionType::ComputeMaterializedViewRefreshes
3828 | IntrospectionType::ComputeErrorCounts
3829 | IntrospectionType::ComputeHydrationTimes => CollectionManagerKind::Differential,
3830
3831 IntrospectionType::SourceStatusHistory
3832 | IntrospectionType::SinkStatusHistory
3833 | IntrospectionType::PrivatelinkConnectionStatusHistory
3834 | IntrospectionType::ReplicaStatusHistory
3835 | IntrospectionType::ReplicaMetricsHistory
3836 | IntrospectionType::WallclockLagHistory
3837 | IntrospectionType::WallclockLagHistogram
3838 | IntrospectionType::PreparedStatementHistory
3839 | IntrospectionType::StatementExecutionHistory
3840 | IntrospectionType::SessionHistory
3841 | IntrospectionType::StatementLifecycleHistory
3842 | IntrospectionType::SqlText => CollectionManagerKind::AppendOnly,
3843 }
3844 }
3845}
3846
3847async fn snapshot_statistics<T>(
3853 id: GlobalId,
3854 upper: Antichain<T>,
3855 storage_collections: &Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
3856) -> Vec<Row>
3857where
3858 T: Codec64 + From<EpochMillis> + TimestampManipulation,
3859{
3860 match upper.as_option() {
3861 Some(f) if f > &T::minimum() => {
3862 let as_of = f.step_back().unwrap();
3863
3864 let snapshot = storage_collections.snapshot(id, as_of).await.unwrap();
3865 snapshot
3866 .into_iter()
3867 .map(|(row, diff)| {
3868 assert_eq!(diff, 1);
3869 row
3870 })
3871 .collect()
3872 }
3873 _ => Vec::new(),
3876 }
3877}
3878
3879async fn read_handle_for_snapshot<T>(
3880 persist: &PersistClientCache,
3881 id: GlobalId,
3882 metadata: &CollectionMetadata,
3883) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>>
3884where
3885 T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
3886{
3887 let persist_client = persist
3888 .open(metadata.persist_location.clone())
3889 .await
3890 .unwrap();
3891
3892 let read_handle = persist_client
3897 .open_leased_reader::<SourceData, (), _, _>(
3898 metadata.data_shard,
3899 Arc::new(metadata.relation_desc.clone()),
3900 Arc::new(UnitSchema),
3901 Diagnostics {
3902 shard_name: id.to_string(),
3903 handle_purpose: format!("snapshot {}", id),
3904 },
3905 USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3906 )
3907 .await
3908 .expect("invalid persist usage");
3909 Ok(read_handle)
3910}
3911
3912#[derive(Debug)]
3914struct CollectionState<T: TimelyTimestamp> {
3915 pub data_source: DataSource<T>,
3917
3918 pub collection_metadata: CollectionMetadata,
3919
3920 pub extra_state: CollectionStateExtra<T>,
3921
3922 wallclock_lag_max: WallclockLag,
3924 wallclock_lag_histogram_stash: Option<
3931 BTreeMap<
3932 (
3933 WallclockLagHistogramPeriod,
3934 WallclockLag,
3935 BTreeMap<&'static str, String>,
3936 ),
3937 Diff,
3938 >,
3939 >,
3940 wallclock_lag_metrics: WallclockLagMetrics,
3942}
3943
3944impl<T: TimelyTimestamp> CollectionState<T> {
3945 fn new(
3946 data_source: DataSource<T>,
3947 collection_metadata: CollectionMetadata,
3948 extra_state: CollectionStateExtra<T>,
3949 wallclock_lag_metrics: WallclockLagMetrics,
3950 ) -> Self {
3951 let wallclock_lag_histogram_stash = match &data_source {
3955 DataSource::Other => None,
3956 _ => Some(Default::default()),
3957 };
3958
3959 Self {
3960 data_source,
3961 collection_metadata,
3962 extra_state,
3963 wallclock_lag_max: WallclockLag::MIN,
3964 wallclock_lag_histogram_stash,
3965 wallclock_lag_metrics,
3966 }
3967 }
3968}
3969
3970#[derive(Debug)]
3972enum CollectionStateExtra<T: TimelyTimestamp> {
3973 Ingestion(IngestionState<T>),
3974 Export(ExportState<T>),
3975 None,
3976}
3977
3978#[derive(Debug)]
3980struct IngestionState<T: TimelyTimestamp> {
3981 pub read_capabilities: MutableAntichain<T>,
3983
3984 pub derived_since: Antichain<T>,
3987
3988 pub dependency_read_holds: Vec<ReadHold<T>>,
3990
3991 pub write_frontier: Antichain<T>,
3993
3994 pub hold_policy: ReadPolicy<T>,
4001
4002 pub instance_id: StorageInstanceId,
4004
4005 pub hydrated_on: BTreeSet<ReplicaId>,
4007}
4008
4009struct StatusHistoryDesc<K> {
4014 retention_policy: StatusHistoryRetentionPolicy,
4015 extract_key: Box<dyn Fn(&[Datum]) -> K + Send>,
4016 extract_time: Box<dyn Fn(&[Datum]) -> CheckedTimestamp<DateTime<Utc>> + Send>,
4017}
4018enum StatusHistoryRetentionPolicy {
4019 LastN(usize),
4021 TimeWindow(Duration),
4023}
4024
4025fn source_status_history_desc(
4026 params: &StorageParameters,
4027) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
4028 let desc = &MZ_SOURCE_STATUS_HISTORY_DESC;
4029 let (source_id_idx, _) = desc.get_by_name(&"source_id".into()).expect("exists");
4030 let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
4031 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4032
4033 StatusHistoryDesc {
4034 retention_policy: StatusHistoryRetentionPolicy::LastN(
4035 params.keep_n_source_status_history_entries,
4036 ),
4037 extract_key: Box::new(move |datums| {
4038 (
4039 GlobalId::from_str(datums[source_id_idx].unwrap_str()).expect("GlobalId column"),
4040 if datums[replica_id_idx].is_null() {
4041 None
4042 } else {
4043 Some(
4044 ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
4045 .expect("ReplicaId column"),
4046 )
4047 },
4048 )
4049 }),
4050 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4051 }
4052}
4053
4054fn sink_status_history_desc(
4055 params: &StorageParameters,
4056) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
4057 let desc = &MZ_SINK_STATUS_HISTORY_DESC;
4058 let (sink_id_idx, _) = desc.get_by_name(&"sink_id".into()).expect("exists");
4059 let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
4060 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4061
4062 StatusHistoryDesc {
4063 retention_policy: StatusHistoryRetentionPolicy::LastN(
4064 params.keep_n_sink_status_history_entries,
4065 ),
4066 extract_key: Box::new(move |datums| {
4067 (
4068 GlobalId::from_str(datums[sink_id_idx].unwrap_str()).expect("GlobalId column"),
4069 if datums[replica_id_idx].is_null() {
4070 None
4071 } else {
4072 Some(
4073 ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
4074 .expect("ReplicaId column"),
4075 )
4076 },
4077 )
4078 }),
4079 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4080 }
4081}
4082
4083fn privatelink_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<GlobalId> {
4084 let desc = &MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC;
4085 let (key_idx, _) = desc.get_by_name(&"connection_id".into()).expect("exists");
4086 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4087
4088 StatusHistoryDesc {
4089 retention_policy: StatusHistoryRetentionPolicy::LastN(
4090 params.keep_n_privatelink_status_history_entries,
4091 ),
4092 extract_key: Box::new(move |datums| {
4093 GlobalId::from_str(datums[key_idx].unwrap_str()).expect("GlobalId column")
4094 }),
4095 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4096 }
4097}
4098
4099fn replica_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<(GlobalId, u64)> {
4100 let desc = &REPLICA_STATUS_HISTORY_DESC;
4101 let (replica_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
4102 let (process_idx, _) = desc.get_by_name(&"process_id".into()).expect("exists");
4103 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4104
4105 StatusHistoryDesc {
4106 retention_policy: StatusHistoryRetentionPolicy::TimeWindow(
4107 params.replica_status_history_retention_window,
4108 ),
4109 extract_key: Box::new(move |datums| {
4110 (
4111 GlobalId::from_str(datums[replica_idx].unwrap_str()).expect("GlobalId column"),
4112 datums[process_idx].unwrap_uint64(),
4113 )
4114 }),
4115 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4116 }
4117}
4118
4119fn swap_updates<T: Timestamp>(
4121 from: &mut Antichain<T>,
4122 mut replace_with: Antichain<T>,
4123) -> ChangeBatch<T> {
4124 let mut update = ChangeBatch::new();
4125 if PartialOrder::less_equal(from, &replace_with) {
4126 update.extend(replace_with.iter().map(|time| (time.clone(), 1)));
4127 std::mem::swap(from, &mut replace_with);
4128 update.extend(replace_with.iter().map(|time| (time.clone(), -1)));
4129 }
4130 update
4131}