1use std::any::Any;
13use std::collections::btree_map;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt::{Debug, Display};
16use std::str::FromStr;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use async_trait::async_trait;
21use chrono::{DateTime, DurationRound, TimeDelta, Utc};
22use derivative::Derivative;
23use differential_dataflow::lattice::Lattice;
24use futures::FutureExt;
25use futures::StreamExt;
26use itertools::Itertools;
27use mz_build_info::BuildInfo;
28use mz_cluster_client::client::ClusterReplicaLocation;
29use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
30use mz_cluster_client::{ReplicaId, WallclockLagFn};
31use mz_controller_types::dyncfgs::{
32 ENABLE_0DT_DEPLOYMENT_SOURCES, WALLCLOCK_LAG_RECORDING_INTERVAL,
33};
34use mz_ore::collections::CollectionExt;
35use mz_ore::metrics::MetricsRegistry;
36use mz_ore::now::{EpochMillis, NowFn};
37use mz_ore::task::AbortOnDropHandle;
38use mz_ore::{assert_none, halt, instrument, soft_panic_or_log};
39use mz_persist_client::batch::ProtoBatch;
40use mz_persist_client::cache::PersistClientCache;
41use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
42use mz_persist_client::read::ReadHandle;
43use mz_persist_client::schema::CaESchema;
44use mz_persist_client::write::WriteHandle;
45use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
46use mz_persist_types::Codec64;
47use mz_persist_types::codec_impls::UnitSchema;
48use mz_repr::adt::timestamp::CheckedTimestamp;
49use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
50use mz_storage_client::client::{
51 AppendOnlyUpdate, RunIngestionCommand, RunOneshotIngestion, RunSinkCommand, Status,
52 StatusUpdate, StorageCommand, StorageResponse, TableData,
53};
54use mz_storage_client::controller::{
55 BoxFuture, CollectionDescription, DataSource, ExportDescription, ExportState,
56 IntrospectionType, MonotonicAppender, PersistEpoch, Response, StorageController,
57 StorageMetadata, StorageTxn, StorageWriteOp, WallclockLag, WallclockLagHistogramPeriod,
58};
59use mz_storage_client::healthcheck::{
60 MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC,
61 MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_STATUS_HISTORY_DESC,
62};
63use mz_storage_client::metrics::StorageControllerMetrics;
64use mz_storage_client::statistics::{
65 ControllerSinkStatistics, ControllerSourceStatistics, WebhookStatistics,
66};
67use mz_storage_client::storage_collections::StorageCollections;
68use mz_storage_types::configuration::StorageConfiguration;
69use mz_storage_types::connections::ConnectionContext;
70use mz_storage_types::connections::inline::InlinedConnection;
71use mz_storage_types::controller::{AlterError, CollectionMetadata, StorageError, TxnsCodecRow};
72use mz_storage_types::errors::CollectionMissing;
73use mz_storage_types::instances::StorageInstanceId;
74use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
75use mz_storage_types::parameters::StorageParameters;
76use mz_storage_types::read_holds::ReadHold;
77use mz_storage_types::read_policy::ReadPolicy;
78use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
79use mz_storage_types::sources::{
80 GenericSourceConnection, IngestionDescription, SourceConnection, SourceData, SourceDesc,
81 SourceExport, SourceExportDataConfig,
82};
83use mz_storage_types::{AlterCompatible, StorageDiff, dyncfgs};
84use mz_txn_wal::metrics::Metrics as TxnMetrics;
85use mz_txn_wal::txn_read::TxnsRead;
86use mz_txn_wal::txns::TxnsHandle;
87use timely::order::{PartialOrder, TotalOrder};
88use timely::progress::Timestamp as TimelyTimestamp;
89use timely::progress::frontier::MutableAntichain;
90use timely::progress::{Antichain, ChangeBatch, Timestamp};
91use tokio::sync::watch::{Sender, channel};
92use tokio::sync::{mpsc, oneshot};
93use tokio::time::MissedTickBehavior;
94use tokio::time::error::Elapsed;
95use tracing::{debug, info, warn};
96
97use crate::collection_mgmt::{
98 AppendOnlyIntrospectionConfig, CollectionManagerKind, DifferentialIntrospectionConfig,
99};
100use crate::instance::{Instance, ReplicaConfig};
101
102mod collection_mgmt;
103mod history;
104mod instance;
105mod persist_handles;
106mod rtr;
107mod statistics;
108
109#[derive(Derivative)]
110#[derivative(Debug)]
111struct PendingOneshotIngestion {
112 #[derivative(Debug = "ignore")]
114 result_tx: OneshotResultCallback<ProtoBatch>,
115 cluster_id: StorageInstanceId,
117}
118
119impl PendingOneshotIngestion {
120 pub(crate) fn cancel(self) {
124 (self.result_tx)(vec![Err("canceled".to_string())])
125 }
126}
127
128#[derive(Derivative)]
130#[derivative(Debug)]
131pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation>
132{
133 build_info: &'static BuildInfo,
135 now: NowFn,
137
138 read_only: bool,
144
145 pub(crate) collections: BTreeMap<GlobalId, CollectionState<T>>,
150
151 dropped_objects: BTreeMap<GlobalId, BTreeSet<ReplicaId>>,
160
161 pub(crate) persist_table_worker: persist_handles::PersistTableWriteWorker<T>,
163 txns_read: TxnsRead<T>,
165 txns_metrics: Arc<TxnMetrics>,
166 stashed_responses: Vec<(Option<ReplicaId>, StorageResponse<T>)>,
167 #[derivative(Debug = "ignore")]
169 pending_table_handle_drops_tx: mpsc::UnboundedSender<GlobalId>,
170 #[derivative(Debug = "ignore")]
172 pending_table_handle_drops_rx: mpsc::UnboundedReceiver<GlobalId>,
173 #[derivative(Debug = "ignore")]
175 pending_oneshot_ingestions: BTreeMap<uuid::Uuid, PendingOneshotIngestion>,
176
177 pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
179
180 pub(crate) introspection_ids: BTreeMap<IntrospectionType, GlobalId>,
182 introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
187
188 source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
193 sink_statistics: Arc<Mutex<BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSinkStatistics>>>,
196 statistics_interval_sender: Sender<Duration>,
198
199 instances: BTreeMap<StorageInstanceId, Instance<T>>,
201 initialized: bool,
203 config: StorageConfiguration,
205 persist_location: PersistLocation,
207 persist: Arc<PersistClientCache>,
209 metrics: StorageControllerMetrics,
211 recorded_frontiers: BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)>,
214 recorded_replica_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<T>>,
217
218 #[derivative(Debug = "ignore")]
220 wallclock_lag: WallclockLagFn<T>,
221 wallclock_lag_last_recorded: DateTime<Utc>,
223
224 storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
226 migrated_storage_collections: BTreeSet<GlobalId>,
228
229 maintenance_ticker: tokio::time::Interval,
231 maintenance_scheduled: bool,
233
234 instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
236 instance_response_rx: mpsc::UnboundedReceiver<(Option<ReplicaId>, StorageResponse<T>)>,
238
239 persist_warm_task: Option<AbortOnDropHandle<Box<dyn Debug + Send>>>,
241}
242
243fn warm_persist_state_in_background(
248 client: PersistClient,
249 shard_ids: impl Iterator<Item = ShardId> + Send + 'static,
250) -> mz_ore::task::JoinHandle<Box<dyn Debug + Send>> {
251 const MAX_CONCURRENT_WARMS: usize = 16;
253 let logic = async move {
254 let fetchers: Vec<_> = tokio_stream::iter(shard_ids)
255 .map(|shard_id| {
256 let client = client.clone();
257 async move {
258 client
259 .create_batch_fetcher::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
260 shard_id,
261 Arc::new(RelationDesc::empty()),
262 Arc::new(UnitSchema),
263 true,
264 Diagnostics::from_purpose("warm persist load state"),
265 )
266 .await
267 }
268 })
269 .buffer_unordered(MAX_CONCURRENT_WARMS)
270 .collect()
271 .await;
272 let fetchers: Box<dyn Debug + Send> = Box::new(fetchers);
273 fetchers
274 };
275 mz_ore::task::spawn(|| "warm_persist_load_state", logic)
276}
277
278#[async_trait(?Send)]
279impl<T> StorageController for Controller<T>
280where
281 T: Timestamp
282 + Lattice
283 + TotalOrder
284 + Codec64
285 + From<EpochMillis>
286 + TimestampManipulation
287 + Into<Datum<'static>>
288 + Display,
289{
290 type Timestamp = T;
291
292 fn initialization_complete(&mut self) {
293 self.reconcile_dangling_statistics();
294 self.initialized = true;
295
296 for instance in self.instances.values_mut() {
297 instance.send(StorageCommand::InitializationComplete);
298 }
299 }
300
301 fn update_parameters(&mut self, config_params: StorageParameters) {
302 self.storage_collections
303 .update_parameters(config_params.clone());
304
305 self.persist.cfg().apply_from(&config_params.dyncfg_updates);
308
309 for instance in self.instances.values_mut() {
310 let params = Box::new(config_params.clone());
311 instance.send(StorageCommand::UpdateConfiguration(params));
312 }
313 self.config.update(config_params);
314 self.statistics_interval_sender
315 .send_replace(self.config.parameters.statistics_interval);
316 self.collection_manager.update_user_batch_duration(
317 self.config
318 .parameters
319 .user_storage_managed_collections_batch_duration,
320 );
321 }
322
323 fn config(&self) -> &StorageConfiguration {
325 &self.config
326 }
327
328 fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing> {
329 self.storage_collections.collection_metadata(id)
330 }
331
332 fn collection_hydrated(
333 &self,
334 collection_id: GlobalId,
335 ) -> Result<bool, StorageError<Self::Timestamp>> {
336 let collection = self.collection(collection_id)?;
337
338 let instance_id = match &collection.data_source {
339 DataSource::Ingestion(ingestion_description) => ingestion_description.instance_id,
340 DataSource::IngestionExport { ingestion_id, .. } => {
341 let ingestion_state = self.collections.get(ingestion_id).expect("known to exist");
342
343 let instance_id = match &ingestion_state.data_source {
344 DataSource::Ingestion(ingestion_desc) => ingestion_desc.instance_id,
345 _ => unreachable!("SourceExport must only refer to primary source"),
346 };
347
348 instance_id
349 }
350 _ => return Ok(true),
351 };
352
353 let instance = self.instances.get(&instance_id).ok_or_else(|| {
354 StorageError::IngestionInstanceMissing {
355 storage_instance_id: instance_id,
356 ingestion_id: collection_id,
357 }
358 })?;
359
360 if instance.replica_ids().next().is_none() {
361 return Ok(true);
364 }
365
366 match &collection.extra_state {
367 CollectionStateExtra::Ingestion(ingestion_state) => {
368 Ok(ingestion_state.hydrated_on.len() >= 1)
370 }
371 CollectionStateExtra::Export(_) => {
372 Ok(true)
377 }
378 CollectionStateExtra::None => {
379 Ok(true)
383 }
384 }
385 }
386
387 #[mz_ore::instrument(level = "debug")]
388 fn collections_hydrated_on_replicas(
389 &self,
390 target_replica_ids: Option<Vec<ReplicaId>>,
391 target_cluster_id: &StorageInstanceId,
392 exclude_collections: &BTreeSet<GlobalId>,
393 ) -> Result<bool, StorageError<Self::Timestamp>> {
394 if target_replica_ids.as_ref().is_some_and(|v| v.is_empty()) {
397 return Ok(true);
398 }
399
400 let target_replicas: Option<BTreeSet<ReplicaId>> =
403 target_replica_ids.map(|ids| ids.into_iter().collect());
404
405 let mut all_hydrated = true;
406 for (collection_id, collection_state) in self.collections.iter() {
407 if collection_id.is_transient() || exclude_collections.contains(collection_id) {
408 continue;
409 }
410 let hydrated = match &collection_state.extra_state {
411 CollectionStateExtra::Ingestion(state) => {
412 if &state.instance_id != target_cluster_id {
413 continue;
414 }
415 match &target_replicas {
416 Some(target_replicas) => !state.hydrated_on.is_disjoint(target_replicas),
417 None => {
418 state.hydrated_on.len() >= 1
421 }
422 }
423 }
424 CollectionStateExtra::Export(_) => {
425 true
430 }
431 CollectionStateExtra::None => {
432 true
436 }
437 };
438 if !hydrated {
439 tracing::info!(%collection_id, "collection is not hydrated on any replica");
440 all_hydrated = false;
441 }
444 }
445 Ok(all_hydrated)
446 }
447
448 fn collection_frontiers(
449 &self,
450 id: GlobalId,
451 ) -> Result<(Antichain<Self::Timestamp>, Antichain<Self::Timestamp>), CollectionMissing> {
452 let frontiers = self.storage_collections.collection_frontiers(id)?;
453 Ok((frontiers.implied_capability, frontiers.write_frontier))
454 }
455
456 fn collections_frontiers(
457 &self,
458 mut ids: Vec<GlobalId>,
459 ) -> Result<Vec<(GlobalId, Antichain<T>, Antichain<T>)>, CollectionMissing> {
460 let mut result = vec![];
461 ids.retain(|&id| match self.export(id) {
466 Ok(export) => {
467 result.push((
468 id,
469 export.input_hold().since().clone(),
470 export.write_frontier.clone(),
471 ));
472 false
473 }
474 Err(_) => true,
475 });
476 result.extend(
477 self.storage_collections
478 .collections_frontiers(ids)?
479 .into_iter()
480 .map(|frontiers| {
481 (
482 frontiers.id,
483 frontiers.implied_capability,
484 frontiers.write_frontier,
485 )
486 }),
487 );
488
489 Ok(result)
490 }
491
492 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
493 self.storage_collections.active_collection_metadatas()
494 }
495
496 fn active_ingestion_exports(
497 &self,
498 instance_id: StorageInstanceId,
499 ) -> Box<dyn Iterator<Item = &GlobalId> + '_> {
500 let active_storage_collections: BTreeMap<_, _> = self
501 .storage_collections
502 .active_collection_frontiers()
503 .into_iter()
504 .map(|c| (c.id, c))
505 .collect();
506
507 let active_exports = self.instances[&instance_id]
508 .active_ingestion_exports()
509 .filter(move |id| {
510 let frontiers = active_storage_collections.get(id);
511 match frontiers {
512 Some(frontiers) => !frontiers.write_frontier.is_empty(),
513 None => {
514 false
516 }
517 }
518 });
519
520 Box::new(active_exports)
521 }
522
523 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
524 self.storage_collections.check_exists(id)
525 }
526
527 fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>) {
528 let metrics = self.metrics.for_instance(id);
529 let mut instance = Instance::new(
530 workload_class,
531 metrics,
532 self.now.clone(),
533 self.instance_response_tx.clone(),
534 );
535 if self.initialized {
536 instance.send(StorageCommand::InitializationComplete);
537 }
538 if !self.read_only {
539 instance.send(StorageCommand::AllowWrites);
540 }
541
542 let params = Box::new(self.config.parameters.clone());
543 instance.send(StorageCommand::UpdateConfiguration(params));
544
545 let old_instance = self.instances.insert(id, instance);
546 assert_none!(old_instance, "storage instance {id} already exists");
547 }
548
549 fn drop_instance(&mut self, id: StorageInstanceId) {
550 let instance = self.instances.remove(&id);
551 assert!(instance.is_some(), "storage instance {id} does not exist");
552 }
553
554 fn update_instance_workload_class(
555 &mut self,
556 id: StorageInstanceId,
557 workload_class: Option<String>,
558 ) {
559 let instance = self
560 .instances
561 .get_mut(&id)
562 .unwrap_or_else(|| panic!("instance {id} does not exist"));
563
564 instance.workload_class = workload_class;
565 }
566
567 fn connect_replica(
568 &mut self,
569 instance_id: StorageInstanceId,
570 replica_id: ReplicaId,
571 location: ClusterReplicaLocation,
572 ) {
573 let instance = self
574 .instances
575 .get_mut(&instance_id)
576 .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
577
578 let config = ReplicaConfig {
579 build_info: self.build_info,
580 location,
581 grpc_client: self.config.parameters.grpc_client.clone(),
582 };
583 instance.add_replica(replica_id, config);
584 }
585
586 fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId) {
587 let instance = self
588 .instances
589 .get_mut(&instance_id)
590 .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
591
592 let status_now = mz_ore::now::to_datetime((self.now)());
593 let mut source_status_updates = vec![];
594 let mut sink_status_updates = vec![];
595
596 let make_update = |id, object_type| StatusUpdate {
597 id,
598 status: Status::Paused,
599 timestamp: status_now,
600 error: None,
601 hints: BTreeSet::from([format!(
602 "The replica running this {object_type} has been dropped"
603 )]),
604 namespaced_errors: Default::default(),
605 replica_id: Some(replica_id),
606 };
607
608 for ingestion_id in instance.active_ingestions() {
609 if let Some(active_replicas) = self.dropped_objects.get_mut(ingestion_id) {
610 active_replicas.remove(&replica_id);
611 if active_replicas.is_empty() {
612 self.dropped_objects.remove(ingestion_id);
613 }
614 }
615
616 let ingestion = self
617 .collections
618 .get_mut(ingestion_id)
619 .expect("instance contains unknown ingestion");
620
621 let ingestion_description = match &ingestion.data_source {
622 DataSource::Ingestion(ingestion_description) => ingestion_description.clone(),
623 _ => panic!(
624 "unexpected data source for ingestion: {:?}",
625 ingestion.data_source
626 ),
627 };
628
629 let old_style_ingestion = *ingestion_id != ingestion_description.remap_collection_id;
630 let subsource_ids = ingestion_description.collection_ids().filter(|id| {
631 let should_discard =
636 old_style_ingestion && id == &ingestion_description.remap_collection_id;
637 !should_discard
638 });
639 for id in subsource_ids {
640 source_status_updates.push(make_update(id, "source"));
641 }
642 }
643
644 for id in instance.active_exports() {
645 if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
646 active_replicas.remove(&replica_id);
647 if active_replicas.is_empty() {
648 self.dropped_objects.remove(id);
649 }
650 }
651
652 sink_status_updates.push(make_update(*id, "sink"));
653 }
654
655 instance.drop_replica(replica_id);
656
657 if !self.read_only {
658 if !source_status_updates.is_empty() {
659 self.append_status_introspection_updates(
660 IntrospectionType::SourceStatusHistory,
661 source_status_updates,
662 );
663 }
664 if !sink_status_updates.is_empty() {
665 self.append_status_introspection_updates(
666 IntrospectionType::SinkStatusHistory,
667 sink_status_updates,
668 );
669 }
670 }
671 }
672
673 async fn evolve_nullability_for_bootstrap(
674 &mut self,
675 storage_metadata: &StorageMetadata,
676 collections: Vec<(GlobalId, RelationDesc)>,
677 ) -> Result<(), StorageError<Self::Timestamp>> {
678 let persist_client = self
679 .persist
680 .open(self.persist_location.clone())
681 .await
682 .unwrap();
683
684 for (global_id, relation_desc) in collections {
685 let shard_id = storage_metadata.get_collection_shard(global_id)?;
686 let diagnostics = Diagnostics {
687 shard_name: global_id.to_string(),
688 handle_purpose: "evolve nullability for bootstrap".to_string(),
689 };
690 let latest_schema = persist_client
691 .latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
692 .await
693 .expect("invalid persist usage");
694 let Some((schema_id, current_schema, _)) = latest_schema else {
695 tracing::debug!(?global_id, "no schema registered");
696 continue;
697 };
698 tracing::debug!(?global_id, ?current_schema, new_schema = ?relation_desc, "migrating schema");
699
700 let diagnostics = Diagnostics {
701 shard_name: global_id.to_string(),
702 handle_purpose: "evolve nullability for bootstrap".to_string(),
703 };
704 let evolve_result = persist_client
705 .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
706 shard_id,
707 schema_id,
708 &relation_desc,
709 &UnitSchema,
710 diagnostics,
711 )
712 .await
713 .expect("invalid persist usage");
714 match evolve_result {
715 CaESchema::Ok(_) => (),
716 CaESchema::ExpectedMismatch {
717 schema_id,
718 key,
719 val: _,
720 } => {
721 return Err(StorageError::PersistSchemaEvolveRace {
722 global_id,
723 shard_id,
724 schema_id,
725 relation_desc: key,
726 });
727 }
728 CaESchema::Incompatible => {
729 return Err(StorageError::PersistInvalidSchemaEvolve {
730 global_id,
731 shard_id,
732 });
733 }
734 };
735 }
736
737 Ok(())
738 }
739
740 #[instrument(name = "storage::create_collections")]
759 async fn create_collections_for_bootstrap(
760 &mut self,
761 storage_metadata: &StorageMetadata,
762 register_ts: Option<Self::Timestamp>,
763 mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
764 migrated_storage_collections: &BTreeSet<GlobalId>,
765 ) -> Result<(), StorageError<Self::Timestamp>> {
766 self.migrated_storage_collections
767 .extend(migrated_storage_collections.iter().cloned());
768
769 self.storage_collections
770 .create_collections_for_bootstrap(
771 storage_metadata,
772 register_ts.clone(),
773 collections.clone(),
774 migrated_storage_collections,
775 )
776 .await?;
777
778 drop(self.persist_warm_task.take());
781
782 collections.sort_by_key(|(id, _)| *id);
787 collections.dedup();
788 for pos in 1..collections.len() {
789 if collections[pos - 1].0 == collections[pos].0 {
790 return Err(StorageError::CollectionIdReused(collections[pos].0));
791 }
792 }
793
794 let enriched_with_metadata = collections
796 .into_iter()
797 .map(|(id, description)| {
798 let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
799
800 let txns_shard = description
803 .data_source
804 .in_txns()
805 .then(|| *self.txns_read.txns_id());
806
807 let metadata = CollectionMetadata {
808 persist_location: self.persist_location.clone(),
809 data_shard,
810 relation_desc: description.desc.clone(),
811 txns_shard,
812 };
813
814 Ok((id, description, metadata))
815 })
816 .collect_vec();
817
818 let persist_client = self
820 .persist
821 .open(self.persist_location.clone())
822 .await
823 .unwrap();
824 let persist_client = &persist_client;
825
826 use futures::stream::{StreamExt, TryStreamExt};
829 let this = &*self;
830 let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
831 .map(|data: Result<_, StorageError<Self::Timestamp>>| {
832 async move {
833 let (id, description, metadata) = data?;
834
835 debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
838
839 let write = this
840 .open_data_handles(
841 &id,
842 metadata.data_shard,
843 metadata.relation_desc.clone(),
844 persist_client,
845 )
846 .await;
847
848 Ok::<_, StorageError<T>>((id, description, write, metadata))
849 }
850 })
851 .buffer_unordered(50)
853 .try_collect()
866 .await?;
867
868 let mut to_execute = BTreeSet::new();
871 let mut new_collections = BTreeSet::new();
876 let mut table_registers = Vec::with_capacity(to_register.len());
877
878 to_register.sort_by_key(|(id, ..)| *id);
880
881 let (tables_to_register, collections_to_register): (Vec<_>, Vec<_>) = to_register
887 .into_iter()
888 .partition(|(_id, desc, ..)| desc.data_source == DataSource::Table);
889 let to_register = tables_to_register
890 .into_iter()
891 .rev()
892 .chain(collections_to_register.into_iter());
893
894 let mut new_source_statistic_entries = BTreeSet::new();
898 let mut new_webhook_statistic_entries = BTreeSet::new();
899 let mut new_sink_statistic_entries = BTreeSet::new();
900
901 for (id, description, write, metadata) in to_register {
902 let is_in_txns = |id, metadata: &CollectionMetadata| {
903 metadata.txns_shard.is_some()
904 && !(self.read_only && migrated_storage_collections.contains(&id))
905 };
906
907 to_execute.insert(id);
908 new_collections.insert(id);
909
910 let write_frontier = write.upper();
911
912 let storage_dependencies = self.determine_collection_dependencies(id, &description)?;
914
915 let dependency_read_holds = self
916 .storage_collections
917 .acquire_read_holds(storage_dependencies)
918 .expect("can acquire read holds");
919
920 let mut dependency_since = Antichain::from_elem(T::minimum());
921 for read_hold in dependency_read_holds.iter() {
922 dependency_since.join_assign(read_hold.since());
923 }
924
925 let data_source = description.data_source;
926
927 if !dependency_read_holds.is_empty()
936 && !is_in_txns(id, &metadata)
937 && !matches!(&data_source, DataSource::Sink { .. })
938 {
939 if dependency_since.is_empty() {
945 halt!(
946 "dependency since frontier is empty while dependent upper \
947 is not empty (dependent id={id}, write_frontier={:?}, dependency_read_holds={:?}), \
948 this indicates concurrent deletion of a collection",
949 write_frontier,
950 dependency_read_holds,
951 );
952 }
953
954 if description.primary.is_none() {
972 mz_ore::soft_assert_or_log!(
973 write_frontier.elements() == &[T::minimum()]
974 || write_frontier.is_empty()
975 || PartialOrder::less_than(&dependency_since, write_frontier),
976 "dependency since has advanced past dependent ({id}) upper \n
977 dependent ({id}): upper {:?} \n
978 dependency since {:?} \n
979 dependency read holds: {:?}",
980 write_frontier,
981 dependency_since,
982 dependency_read_holds,
983 );
984 }
985 }
986
987 let mut extra_state = CollectionStateExtra::None;
989 let mut maybe_instance_id = None;
990 match &data_source {
991 DataSource::Introspection(typ) => {
992 debug!(
993 ?data_source, meta = ?metadata,
994 "registering {id} with persist monotonic worker",
995 );
996 self.register_introspection_collection(
1002 id,
1003 *typ,
1004 write,
1005 persist_client.clone(),
1006 )?;
1007 }
1008 DataSource::Webhook => {
1009 debug!(
1010 ?data_source, meta = ?metadata,
1011 "registering {id} with persist monotonic worker",
1012 );
1013 new_source_statistic_entries.insert(id);
1014 new_webhook_statistic_entries.insert(id);
1017 self.collection_manager
1023 .register_append_only_collection(id, write, false, None);
1024 }
1025 DataSource::IngestionExport {
1026 ingestion_id,
1027 details,
1028 data_config,
1029 } => {
1030 debug!(
1031 ?data_source, meta = ?metadata,
1032 "not registering {id} with a controller persist worker",
1033 );
1034 let ingestion_state = self
1036 .collections
1037 .get_mut(ingestion_id)
1038 .expect("known to exist");
1039
1040 let instance_id = match &mut ingestion_state.data_source {
1041 DataSource::Ingestion(ingestion_desc) => {
1042 ingestion_desc.source_exports.insert(
1043 id,
1044 SourceExport {
1045 storage_metadata: (),
1046 details: details.clone(),
1047 data_config: data_config.clone(),
1048 },
1049 );
1050
1051 ingestion_desc.instance_id
1056 }
1057 _ => unreachable!(
1058 "SourceExport must only refer to primary sources that already exist"
1059 ),
1060 };
1061
1062 to_execute.remove(&id);
1064 to_execute.insert(*ingestion_id);
1065
1066 let ingestion_state = IngestionState {
1067 read_capabilities: MutableAntichain::from(dependency_since.clone()),
1068 dependency_read_holds,
1069 derived_since: dependency_since,
1070 write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1071 hold_policy: ReadPolicy::step_back(),
1072 instance_id,
1073 hydrated_on: BTreeSet::new(),
1074 };
1075
1076 extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1077 maybe_instance_id = Some(instance_id);
1078
1079 new_source_statistic_entries.insert(id);
1080 }
1081 DataSource::Table => {
1082 debug!(
1083 ?data_source, meta = ?metadata,
1084 "registering {id} with persist table worker",
1085 );
1086 table_registers.push((id, write));
1087 }
1088 DataSource::Progress | DataSource::Other => {
1089 debug!(
1090 ?data_source, meta = ?metadata,
1091 "not registering {id} with a controller persist worker",
1092 );
1093 }
1094 DataSource::Ingestion(ingestion_desc) => {
1095 debug!(
1096 ?data_source, meta = ?metadata,
1097 "not registering {id} with a controller persist worker",
1098 );
1099
1100 let mut dependency_since = Antichain::from_elem(T::minimum());
1101 for read_hold in dependency_read_holds.iter() {
1102 dependency_since.join_assign(read_hold.since());
1103 }
1104
1105 let ingestion_state = IngestionState {
1106 read_capabilities: MutableAntichain::from(dependency_since.clone()),
1107 dependency_read_holds,
1108 derived_since: dependency_since,
1109 write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1110 hold_policy: ReadPolicy::step_back(),
1111 instance_id: ingestion_desc.instance_id,
1112 hydrated_on: BTreeSet::new(),
1113 };
1114
1115 extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1116 maybe_instance_id = Some(ingestion_desc.instance_id);
1117
1118 new_source_statistic_entries.insert(id);
1119 }
1120 DataSource::Sink { desc } => {
1121 let mut dependency_since = Antichain::from_elem(T::minimum());
1122 for read_hold in dependency_read_holds.iter() {
1123 dependency_since.join_assign(read_hold.since());
1124 }
1125
1126 let [self_hold, read_hold] =
1127 dependency_read_holds.try_into().expect("two holds");
1128
1129 let state = ExportState::new(
1130 desc.instance_id,
1131 read_hold,
1132 self_hold,
1133 write_frontier.clone(),
1134 ReadPolicy::step_back(),
1135 );
1136 maybe_instance_id = Some(state.cluster_id);
1137 extra_state = CollectionStateExtra::Export(state);
1138
1139 new_sink_statistic_entries.insert(id);
1140 }
1141 }
1142
1143 let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(id, maybe_instance_id);
1144 let collection_state =
1145 CollectionState::new(data_source, metadata, extra_state, wallclock_lag_metrics);
1146
1147 self.collections.insert(id, collection_state);
1148 }
1149
1150 {
1151 let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1152
1153 for id in new_webhook_statistic_entries {
1156 source_statistics.webhook_statistics.entry(id).or_default();
1157 }
1158
1159 }
1163
1164 if !table_registers.is_empty() {
1166 let register_ts = register_ts
1167 .expect("caller should have provided a register_ts when creating a table");
1168
1169 if self.read_only {
1170 table_registers
1180 .retain(|(id, _write_handle)| migrated_storage_collections.contains(id));
1181
1182 self.persist_table_worker
1183 .register(register_ts, table_registers)
1184 .await
1185 .expect("table worker unexpectedly shut down");
1186 } else {
1187 self.persist_table_worker
1188 .register(register_ts, table_registers)
1189 .await
1190 .expect("table worker unexpectedly shut down");
1191 }
1192 }
1193
1194 self.append_shard_mappings(new_collections.into_iter(), Diff::ONE);
1195
1196 for id in to_execute {
1198 match &self.collection(id)?.data_source {
1199 DataSource::Ingestion(ingestion) => {
1200 if !self.read_only
1201 || (ENABLE_0DT_DEPLOYMENT_SOURCES.get(self.config.config_set())
1202 && ingestion.desc.connection.supports_read_only())
1203 {
1204 self.run_ingestion(id)?;
1205 }
1206 }
1207 DataSource::IngestionExport { .. } => unreachable!(
1208 "ingestion exports do not execute directly, but instead schedule their source to be re-executed"
1209 ),
1210 DataSource::Introspection(_)
1211 | DataSource::Webhook
1212 | DataSource::Table
1213 | DataSource::Progress
1214 | DataSource::Other => {}
1215 DataSource::Sink { .. } => {
1216 if !self.read_only {
1217 self.run_export(id)?;
1218 }
1219 }
1220 };
1221 }
1222
1223 Ok(())
1224 }
1225
1226 fn check_alter_ingestion_source_desc(
1227 &mut self,
1228 ingestion_id: GlobalId,
1229 source_desc: &SourceDesc,
1230 ) -> Result<(), StorageError<Self::Timestamp>> {
1231 let source_collection = self.collection(ingestion_id)?;
1232 let data_source = &source_collection.data_source;
1233 match &data_source {
1234 DataSource::Ingestion(cur_ingestion) => {
1235 cur_ingestion
1236 .desc
1237 .alter_compatible(ingestion_id, source_desc)?;
1238 }
1239 o => {
1240 tracing::info!(
1241 "{ingestion_id} inalterable because its data source is {:?} and not an ingestion",
1242 o
1243 );
1244 Err(AlterError { id: ingestion_id })?
1245 }
1246 }
1247
1248 Ok(())
1249 }
1250
1251 async fn alter_ingestion_connections(
1252 &mut self,
1253 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
1254 ) -> Result<(), StorageError<Self::Timestamp>> {
1255 self.storage_collections
1257 .alter_ingestion_connections(source_connections.clone())
1258 .await?;
1259
1260 let mut ingestions_to_run = BTreeSet::new();
1261
1262 for (id, conn) in source_connections {
1263 let collection = self
1264 .collections
1265 .get_mut(&id)
1266 .ok_or_else(|| StorageError::IdentifierMissing(id))?;
1267
1268 match &mut collection.data_source {
1269 DataSource::Ingestion(ingestion) => {
1270 if ingestion.desc.connection != conn {
1273 tracing::info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
1274 ingestion.desc.connection = conn;
1275 ingestions_to_run.insert(id);
1276 } else {
1277 tracing::warn!(
1278 "update_source_connection called on {id} but the \
1279 connection was the same"
1280 );
1281 }
1282 }
1283 o => {
1284 tracing::warn!("update_source_connection called on {:?}", o);
1285 Err(StorageError::IdentifierInvalid(id))?;
1286 }
1287 }
1288 }
1289
1290 for id in ingestions_to_run {
1291 self.run_ingestion(id)?;
1292 }
1293 Ok(())
1294 }
1295
1296 async fn alter_ingestion_export_data_configs(
1297 &mut self,
1298 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
1299 ) -> Result<(), StorageError<Self::Timestamp>> {
1300 self.storage_collections
1302 .alter_ingestion_export_data_configs(source_exports.clone())
1303 .await?;
1304
1305 let mut ingestions_to_run = BTreeSet::new();
1306
1307 for (source_export_id, new_data_config) in source_exports {
1308 let source_export_collection = self
1311 .collections
1312 .get_mut(&source_export_id)
1313 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1314 let ingestion_id = match &mut source_export_collection.data_source {
1315 DataSource::IngestionExport {
1316 ingestion_id,
1317 details: _,
1318 data_config,
1319 } => {
1320 *data_config = new_data_config.clone();
1321 *ingestion_id
1322 }
1323 o => {
1324 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1325 Err(StorageError::IdentifierInvalid(source_export_id))?
1326 }
1327 };
1328 let ingestion_collection = self
1331 .collections
1332 .get_mut(&ingestion_id)
1333 .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
1334
1335 match &mut ingestion_collection.data_source {
1336 DataSource::Ingestion(ingestion_desc) => {
1337 let source_export = ingestion_desc
1338 .source_exports
1339 .get_mut(&source_export_id)
1340 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1341
1342 if source_export.data_config != new_data_config {
1345 tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
1346 source_export.data_config = new_data_config;
1347
1348 ingestions_to_run.insert(ingestion_id);
1349 } else {
1350 tracing::warn!(
1351 "alter_ingestion_export_data_configs called on \
1352 export {source_export_id} of {ingestion_id} but \
1353 the data config was the same"
1354 );
1355 }
1356 }
1357 o => {
1358 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1359 Err(StorageError::IdentifierInvalid(ingestion_id))?
1360 }
1361 }
1362 }
1363
1364 for id in ingestions_to_run {
1365 self.run_ingestion(id)?;
1366 }
1367 Ok(())
1368 }
1369
1370 async fn alter_table_desc(
1371 &mut self,
1372 existing_collection: GlobalId,
1373 new_collection: GlobalId,
1374 new_desc: RelationDesc,
1375 expected_version: RelationVersion,
1376 register_ts: Self::Timestamp,
1377 ) -> Result<(), StorageError<Self::Timestamp>> {
1378 let data_shard = {
1379 let Controller {
1380 collections,
1381 storage_collections,
1382 ..
1383 } = self;
1384
1385 let existing = collections
1386 .get(&existing_collection)
1387 .ok_or(StorageError::IdentifierMissing(existing_collection))?;
1388 if existing.data_source != DataSource::Table {
1389 return Err(StorageError::IdentifierInvalid(existing_collection));
1390 }
1391
1392 storage_collections
1394 .alter_table_desc(
1395 existing_collection,
1396 new_collection,
1397 new_desc.clone(),
1398 expected_version,
1399 )
1400 .await?;
1401
1402 existing.collection_metadata.data_shard.clone()
1403 };
1404
1405 let persist_client = self
1406 .persist
1407 .open(self.persist_location.clone())
1408 .await
1409 .expect("invalid persist location");
1410 let write_handle = self
1411 .open_data_handles(
1412 &existing_collection,
1413 data_shard,
1414 new_desc.clone(),
1415 &persist_client,
1416 )
1417 .await;
1418
1419 let collection_meta = CollectionMetadata {
1420 persist_location: self.persist_location.clone(),
1421 data_shard,
1422 relation_desc: new_desc.clone(),
1423 txns_shard: Some(self.txns_read.txns_id().clone()),
1425 };
1426 let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(new_collection, None);
1428 let collection_state = CollectionState::new(
1429 DataSource::Table,
1430 collection_meta,
1431 CollectionStateExtra::None,
1432 wallclock_lag_metrics,
1433 );
1434
1435 self.collections.insert(new_collection, collection_state);
1438
1439 self.persist_table_worker
1440 .register(register_ts, vec![(new_collection, write_handle)])
1441 .await
1442 .expect("table worker unexpectedly shut down");
1443
1444 self.append_shard_mappings([new_collection].into_iter(), Diff::ONE);
1445
1446 Ok(())
1447 }
1448
1449 fn export(
1450 &self,
1451 id: GlobalId,
1452 ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1453 self.collections
1454 .get(&id)
1455 .and_then(|c| match &c.extra_state {
1456 CollectionStateExtra::Export(state) => Some(state),
1457 _ => None,
1458 })
1459 .ok_or(StorageError::IdentifierMissing(id))
1460 }
1461
1462 fn export_mut(
1463 &mut self,
1464 id: GlobalId,
1465 ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1466 self.collections
1467 .get_mut(&id)
1468 .and_then(|c| match &mut c.extra_state {
1469 CollectionStateExtra::Export(state) => Some(state),
1470 _ => None,
1471 })
1472 .ok_or(StorageError::IdentifierMissing(id))
1473 }
1474
1475 async fn create_oneshot_ingestion(
1477 &mut self,
1478 ingestion_id: uuid::Uuid,
1479 collection_id: GlobalId,
1480 instance_id: StorageInstanceId,
1481 request: OneshotIngestionRequest,
1482 result_tx: OneshotResultCallback<ProtoBatch>,
1483 ) -> Result<(), StorageError<Self::Timestamp>> {
1484 let collection_meta = self
1485 .collections
1486 .get(&collection_id)
1487 .ok_or_else(|| StorageError::IdentifierMissing(collection_id))?
1488 .collection_metadata
1489 .clone();
1490 let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1491 StorageError::Generic(anyhow::anyhow!("missing cluster {instance_id}"))
1493 })?;
1494 let oneshot_cmd = RunOneshotIngestion {
1495 ingestion_id,
1496 collection_id,
1497 collection_meta,
1498 request,
1499 };
1500
1501 if !self.read_only {
1502 instance.send(StorageCommand::RunOneshotIngestion(Box::new(oneshot_cmd)));
1503 let pending = PendingOneshotIngestion {
1504 result_tx,
1505 cluster_id: instance_id,
1506 };
1507 let novel = self
1508 .pending_oneshot_ingestions
1509 .insert(ingestion_id, pending);
1510 assert_none!(novel);
1511 Ok(())
1512 } else {
1513 Err(StorageError::ReadOnly)
1514 }
1515 }
1516
1517 fn cancel_oneshot_ingestion(
1518 &mut self,
1519 ingestion_id: uuid::Uuid,
1520 ) -> Result<(), StorageError<Self::Timestamp>> {
1521 if self.read_only {
1522 return Err(StorageError::ReadOnly);
1523 }
1524
1525 let pending = self
1526 .pending_oneshot_ingestions
1527 .remove(&ingestion_id)
1528 .ok_or_else(|| {
1529 StorageError::Generic(anyhow::anyhow!("missing oneshot ingestion {ingestion_id}"))
1531 })?;
1532
1533 match self.instances.get_mut(&pending.cluster_id) {
1534 Some(instance) => {
1535 instance.send(StorageCommand::CancelOneshotIngestion(ingestion_id));
1536 }
1537 None => {
1538 mz_ore::soft_panic_or_log!(
1539 "canceling oneshot ingestion on non-existent cluster, ingestion {:?}, instance {}",
1540 ingestion_id,
1541 pending.cluster_id,
1542 );
1543 }
1544 }
1545 pending.cancel();
1547
1548 Ok(())
1549 }
1550
1551 async fn alter_export(
1552 &mut self,
1553 id: GlobalId,
1554 new_description: ExportDescription<Self::Timestamp>,
1555 ) -> Result<(), StorageError<Self::Timestamp>> {
1556 let from_id = new_description.sink.from;
1557
1558 let desired_read_holds = vec![from_id.clone(), id.clone()];
1561 let [input_hold, self_hold] = self
1562 .storage_collections
1563 .acquire_read_holds(desired_read_holds)
1564 .expect("missing dependency")
1565 .try_into()
1566 .expect("expected number of holds");
1567 let from_storage_metadata = self.storage_collections.collection_metadata(from_id)?;
1568 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1569
1570 let cur_export = self.export_mut(id)?;
1572 let input_readable = cur_export
1573 .write_frontier
1574 .iter()
1575 .all(|t| input_hold.since().less_than(t));
1576 if !input_readable {
1577 return Err(StorageError::ReadBeforeSince(from_id));
1578 }
1579
1580 let new_export = ExportState {
1581 read_capabilities: cur_export.read_capabilities.clone(),
1582 cluster_id: new_description.instance_id,
1583 derived_since: cur_export.derived_since.clone(),
1584 read_holds: [input_hold, self_hold],
1585 read_policy: cur_export.read_policy.clone(),
1586 write_frontier: cur_export.write_frontier.clone(),
1587 };
1588 *cur_export = new_export;
1589
1590 let cmd = RunSinkCommand {
1591 id,
1592 description: StorageSinkDesc {
1593 from: from_id,
1594 from_desc: new_description.sink.from_desc,
1595 connection: new_description.sink.connection,
1596 envelope: new_description.sink.envelope,
1597 as_of: new_description.sink.as_of,
1598 version: new_description.sink.version,
1599 from_storage_metadata,
1600 with_snapshot: new_description.sink.with_snapshot,
1601 to_storage_metadata,
1602 commit_interval: new_description.sink.commit_interval,
1603 },
1604 };
1605
1606 let instance = self
1608 .instances
1609 .get_mut(&new_description.instance_id)
1610 .ok_or_else(|| StorageError::ExportInstanceMissing {
1611 storage_instance_id: new_description.instance_id,
1612 export_id: id,
1613 })?;
1614
1615 instance.send(StorageCommand::RunSink(Box::new(cmd)));
1616 Ok(())
1617 }
1618
1619 async fn alter_export_connections(
1621 &mut self,
1622 exports: BTreeMap<GlobalId, StorageSinkConnection>,
1623 ) -> Result<(), StorageError<Self::Timestamp>> {
1624 let mut updates_by_instance =
1625 BTreeMap::<StorageInstanceId, Vec<(RunSinkCommand<T>, ExportDescription<T>)>>::new();
1626
1627 for (id, connection) in exports {
1628 let (mut new_export_description, as_of): (ExportDescription<Self::Timestamp>, _) = {
1636 let export = &self.collections[&id];
1637 let DataSource::Sink { desc } = &export.data_source else {
1638 panic!("export exists")
1639 };
1640 let CollectionStateExtra::Export(state) = &export.extra_state else {
1641 panic!("export exists")
1642 };
1643 let export_description = desc.clone();
1644 let as_of = state.input_hold().since().clone();
1645
1646 (export_description, as_of)
1647 };
1648 let current_sink = new_export_description.sink.clone();
1649
1650 new_export_description.sink.connection = connection;
1651
1652 current_sink.alter_compatible(id, &new_export_description.sink)?;
1654
1655 let from_storage_metadata = self
1656 .storage_collections
1657 .collection_metadata(new_export_description.sink.from)?;
1658 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1659
1660 let cmd = RunSinkCommand {
1661 id,
1662 description: StorageSinkDesc {
1663 from: new_export_description.sink.from,
1664 from_desc: new_export_description.sink.from_desc.clone(),
1665 connection: new_export_description.sink.connection.clone(),
1666 envelope: new_export_description.sink.envelope,
1667 with_snapshot: new_export_description.sink.with_snapshot,
1668 version: new_export_description.sink.version,
1669 as_of: as_of.to_owned(),
1680 from_storage_metadata,
1681 to_storage_metadata,
1682 commit_interval: new_export_description.sink.commit_interval,
1683 },
1684 };
1685
1686 let update = updates_by_instance
1687 .entry(new_export_description.instance_id)
1688 .or_default();
1689 update.push((cmd, new_export_description));
1690 }
1691
1692 for (instance_id, updates) in updates_by_instance {
1693 let mut export_updates = BTreeMap::new();
1694 let mut cmds = Vec::with_capacity(updates.len());
1695
1696 for (cmd, export_state) in updates {
1697 export_updates.insert(cmd.id, export_state);
1698 cmds.push(cmd);
1699 }
1700
1701 let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1703 StorageError::ExportInstanceMissing {
1704 storage_instance_id: instance_id,
1705 export_id: *export_updates
1706 .keys()
1707 .next()
1708 .expect("set of exports not empty"),
1709 }
1710 })?;
1711
1712 for cmd in cmds {
1713 instance.send(StorageCommand::RunSink(Box::new(cmd)));
1714 }
1715
1716 for (id, new_export_description) in export_updates {
1718 let Some(state) = self.collections.get_mut(&id) else {
1719 panic!("export known to exist")
1720 };
1721 let DataSource::Sink { desc } = &mut state.data_source else {
1722 panic!("export known to exist")
1723 };
1724 *desc = new_export_description;
1725 }
1726 }
1727
1728 Ok(())
1729 }
1730
1731 fn drop_tables(
1746 &mut self,
1747 storage_metadata: &StorageMetadata,
1748 identifiers: Vec<GlobalId>,
1749 ts: Self::Timestamp,
1750 ) -> Result<(), StorageError<Self::Timestamp>> {
1751 let (table_write_ids, data_source_ids): (Vec<_>, Vec<_>) = identifiers
1753 .into_iter()
1754 .partition(|id| match self.collections[id].data_source {
1755 DataSource::Table => true,
1756 DataSource::IngestionExport { .. } | DataSource::Webhook => false,
1757 _ => panic!("identifier is not a table: {}", id),
1758 });
1759
1760 if table_write_ids.len() > 0 {
1762 let drop_notif = self
1763 .persist_table_worker
1764 .drop_handles(table_write_ids.clone(), ts);
1765 let tx = self.pending_table_handle_drops_tx.clone();
1766 mz_ore::task::spawn(|| "table-cleanup".to_string(), async move {
1767 drop_notif.await;
1768 for identifier in table_write_ids {
1769 let _ = tx.send(identifier);
1770 }
1771 });
1772 }
1773
1774 if data_source_ids.len() > 0 {
1776 self.validate_collection_ids(data_source_ids.iter().cloned())?;
1777 self.drop_sources_unvalidated(storage_metadata, data_source_ids)?;
1778 }
1779
1780 Ok(())
1781 }
1782
1783 fn drop_sources(
1784 &mut self,
1785 storage_metadata: &StorageMetadata,
1786 identifiers: Vec<GlobalId>,
1787 ) -> Result<(), StorageError<Self::Timestamp>> {
1788 self.validate_collection_ids(identifiers.iter().cloned())?;
1789 self.drop_sources_unvalidated(storage_metadata, identifiers)
1790 }
1791
1792 fn drop_sources_unvalidated(
1793 &mut self,
1794 storage_metadata: &StorageMetadata,
1795 ids: Vec<GlobalId>,
1796 ) -> Result<(), StorageError<Self::Timestamp>> {
1797 let mut ingestions_to_execute = BTreeSet::new();
1800 let mut ingestions_to_drop = BTreeSet::new();
1801 let mut source_statistics_to_drop = Vec::new();
1802
1803 let mut collections_to_drop = Vec::new();
1807
1808 for id in ids.iter() {
1809 let metadata = storage_metadata.get_collection_shard::<T>(*id);
1810 mz_ore::soft_assert_or_log!(
1811 matches!(metadata, Err(StorageError::IdentifierMissing(_))),
1812 "dropping {id}, but drop was not synchronized with storage \
1813 controller via `synchronize_collections`"
1814 );
1815
1816 let collection_state = self.collections.get(id);
1817
1818 if let Some(collection_state) = collection_state {
1819 match collection_state.data_source {
1820 DataSource::Webhook => {
1821 let fut = self.collection_manager.unregister_collection(*id);
1824 mz_ore::task::spawn(|| format!("storage-webhook-cleanup-{id}"), fut);
1825
1826 collections_to_drop.push(*id);
1827 source_statistics_to_drop.push(*id);
1828 }
1829 DataSource::Ingestion(_) => {
1830 ingestions_to_drop.insert(*id);
1831 source_statistics_to_drop.push(*id);
1832 }
1833 DataSource::IngestionExport { ingestion_id, .. } => {
1834 ingestions_to_execute.insert(ingestion_id);
1841
1842 let ingestion_state = match self.collections.get_mut(&ingestion_id) {
1844 Some(ingestion_collection) => ingestion_collection,
1845 None => {
1847 tracing::error!(
1848 "primary source {ingestion_id} seemingly dropped before subsource {id}"
1849 );
1850 continue;
1851 }
1852 };
1853
1854 match &mut ingestion_state.data_source {
1855 DataSource::Ingestion(ingestion_desc) => {
1856 let removed = ingestion_desc.source_exports.remove(id);
1857 mz_ore::soft_assert_or_log!(
1858 removed.is_some(),
1859 "dropped subsource {id} already removed from source exports"
1860 );
1861 }
1862 _ => unreachable!(
1863 "SourceExport must only refer to primary sources that already exist"
1864 ),
1865 };
1866
1867 ingestions_to_drop.insert(*id);
1871 source_statistics_to_drop.push(*id);
1872 }
1873 DataSource::Progress | DataSource::Table | DataSource::Other => {
1874 collections_to_drop.push(*id);
1875 }
1876 DataSource::Introspection(_) | DataSource::Sink { .. } => {
1877 soft_panic_or_log!(
1880 "drop_sources called on a {:?} (id={id}))",
1881 collection_state.data_source,
1882 );
1883 }
1884 }
1885 }
1886 }
1887
1888 ingestions_to_execute.retain(|id| !ingestions_to_drop.contains(id));
1890 for ingestion_id in ingestions_to_execute {
1891 self.run_ingestion(ingestion_id)?;
1892 }
1893
1894 let ingestion_policies = ingestions_to_drop
1901 .iter()
1902 .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
1903 .collect();
1904
1905 tracing::debug!(
1906 ?ingestion_policies,
1907 "dropping sources by setting read hold policies"
1908 );
1909 self.set_hold_policies(ingestion_policies);
1910
1911 let shards_to_update: BTreeSet<_> = ingestions_to_drop
1913 .iter()
1914 .chain(collections_to_drop.iter())
1915 .cloned()
1916 .collect();
1917 self.append_shard_mappings(shards_to_update.into_iter(), Diff::MINUS_ONE);
1918
1919 let status_now = mz_ore::now::to_datetime((self.now)());
1920 let mut status_updates = vec![];
1921 for id in ingestions_to_drop.iter() {
1922 status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
1923 }
1924
1925 if !self.read_only {
1926 self.append_status_introspection_updates(
1927 IntrospectionType::SourceStatusHistory,
1928 status_updates,
1929 );
1930 }
1931
1932 {
1933 let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1934 for id in source_statistics_to_drop {
1935 source_statistics
1936 .source_statistics
1937 .retain(|(stats_id, _), _| stats_id != &id);
1938 source_statistics
1939 .webhook_statistics
1940 .retain(|stats_id, _| stats_id != &id);
1941 }
1942 }
1943
1944 for id in ingestions_to_drop.iter().chain(collections_to_drop.iter()) {
1946 tracing::info!(%id, "dropping collection state");
1947 let collection = self
1948 .collections
1949 .remove(id)
1950 .expect("list populated after checking that self.collections contains it");
1951
1952 let instance = match &collection.extra_state {
1953 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
1954 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
1955 CollectionStateExtra::None => None,
1956 }
1957 .and_then(|i| self.instances.get(&i));
1958
1959 if let Some(instance) = instance {
1963 let active_replicas = instance.get_active_replicas_for_object(id);
1964 if !active_replicas.is_empty() {
1965 match &collection.data_source {
1972 DataSource::Ingestion(ingestion_desc) => {
1973 if *id != ingestion_desc.remap_collection_id {
1974 self.dropped_objects.insert(
1975 ingestion_desc.remap_collection_id,
1976 active_replicas.clone(),
1977 );
1978 }
1979 }
1980 _ => {}
1981 }
1982
1983 self.dropped_objects.insert(*id, active_replicas);
1984 }
1985 }
1986 }
1987
1988 self.storage_collections
1990 .drop_collections_unvalidated(storage_metadata, ids);
1991
1992 Ok(())
1993 }
1994
1995 fn drop_sinks(
1997 &mut self,
1998 storage_metadata: &StorageMetadata,
1999 identifiers: Vec<GlobalId>,
2000 ) -> Result<(), StorageError<Self::Timestamp>> {
2001 self.validate_export_ids(identifiers.iter().cloned())?;
2002 self.drop_sinks_unvalidated(storage_metadata, identifiers);
2003 Ok(())
2004 }
2005
2006 fn drop_sinks_unvalidated(
2007 &mut self,
2008 storage_metadata: &StorageMetadata,
2009 mut sinks_to_drop: Vec<GlobalId>,
2010 ) {
2011 sinks_to_drop.retain(|id| self.export(*id).is_ok());
2013
2014 let drop_policy = sinks_to_drop
2021 .iter()
2022 .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
2023 .collect();
2024
2025 tracing::debug!(
2026 ?drop_policy,
2027 "dropping sources by setting read hold policies"
2028 );
2029 self.set_hold_policies(drop_policy);
2030
2031 let status_now = mz_ore::now::to_datetime((self.now)());
2038
2039 let mut status_updates = vec![];
2041 {
2042 let mut sink_statistics = self.sink_statistics.lock().expect("poisoned");
2043 for id in sinks_to_drop.iter() {
2044 status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
2045 sink_statistics.retain(|(stats_id, _), _| stats_id != id);
2046 }
2047 }
2048
2049 if !self.read_only {
2050 self.append_status_introspection_updates(
2051 IntrospectionType::SinkStatusHistory,
2052 status_updates,
2053 );
2054 }
2055
2056 for id in sinks_to_drop.iter() {
2058 tracing::info!(%id, "dropping export state");
2059 let collection = self
2060 .collections
2061 .remove(id)
2062 .expect("list populated after checking that self.collections contains it");
2063
2064 let instance = match &collection.extra_state {
2065 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
2066 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
2067 CollectionStateExtra::None => None,
2068 }
2069 .and_then(|i| self.instances.get(&i));
2070
2071 if let Some(instance) = instance {
2075 let active_replicas = instance.get_active_replicas_for_object(id);
2076 if !active_replicas.is_empty() {
2077 self.dropped_objects.insert(*id, active_replicas);
2078 }
2079 }
2080 }
2081
2082 self.storage_collections
2084 .drop_collections_unvalidated(storage_metadata, sinks_to_drop);
2085 }
2086
2087 #[instrument(level = "debug")]
2088 fn append_table(
2089 &mut self,
2090 write_ts: Self::Timestamp,
2091 advance_to: Self::Timestamp,
2092 commands: Vec<(GlobalId, Vec<TableData>)>,
2093 ) -> Result<
2094 tokio::sync::oneshot::Receiver<Result<(), StorageError<Self::Timestamp>>>,
2095 StorageError<Self::Timestamp>,
2096 > {
2097 if self.read_only {
2098 if !commands
2101 .iter()
2102 .all(|(id, _)| id.is_system() && self.migrated_storage_collections.contains(id))
2103 {
2104 return Err(StorageError::ReadOnly);
2105 }
2106 }
2107
2108 for (id, updates) in commands.iter() {
2110 if !updates.is_empty() {
2111 if !write_ts.less_than(&advance_to) {
2112 return Err(StorageError::UpdateBeyondUpper(*id));
2113 }
2114 }
2115 }
2116
2117 Ok(self
2118 .persist_table_worker
2119 .append(write_ts, advance_to, commands))
2120 }
2121
2122 fn monotonic_appender(
2123 &self,
2124 id: GlobalId,
2125 ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>> {
2126 self.collection_manager.monotonic_appender(id)
2127 }
2128
2129 fn webhook_statistics(
2130 &self,
2131 id: GlobalId,
2132 ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>> {
2133 let source_statistics = self.source_statistics.lock().expect("poisoned");
2135 source_statistics
2136 .webhook_statistics
2137 .get(&id)
2138 .cloned()
2139 .ok_or(StorageError::IdentifierMissing(id))
2140 }
2141
2142 async fn ready(&mut self) {
2143 if self.maintenance_scheduled {
2144 return;
2145 }
2146
2147 if !self.pending_table_handle_drops_rx.is_empty() {
2148 return;
2149 }
2150
2151 tokio::select! {
2152 Some(m) = self.instance_response_rx.recv() => {
2153 self.stashed_responses.push(m);
2154 while let Ok(m) = self.instance_response_rx.try_recv() {
2155 self.stashed_responses.push(m);
2156 }
2157 }
2158 _ = self.maintenance_ticker.tick() => {
2159 self.maintenance_scheduled = true;
2160 },
2161 };
2162 }
2163
2164 #[instrument(level = "debug")]
2165 fn process(
2166 &mut self,
2167 storage_metadata: &StorageMetadata,
2168 ) -> Result<Option<Response<T>>, anyhow::Error> {
2169 if self.maintenance_scheduled {
2171 self.maintain();
2172 self.maintenance_scheduled = false;
2173 }
2174
2175 for instance in self.instances.values_mut() {
2176 instance.rehydrate_failed_replicas();
2177 }
2178
2179 let mut status_updates = vec![];
2180 let mut updated_frontiers = BTreeMap::new();
2181
2182 let stashed_responses = std::mem::take(&mut self.stashed_responses);
2184 for resp in stashed_responses {
2185 match resp {
2186 (_replica_id, StorageResponse::FrontierUpper(id, upper)) => {
2187 self.update_write_frontier(id, &upper);
2188 updated_frontiers.insert(id, upper);
2189 }
2190 (replica_id, StorageResponse::DroppedId(id)) => {
2191 let replica_id = replica_id.expect("DroppedId from unknown replica");
2192 if let Some(remaining_replicas) = self.dropped_objects.get_mut(&id) {
2193 remaining_replicas.remove(&replica_id);
2194 if remaining_replicas.is_empty() {
2195 self.dropped_objects.remove(&id);
2196 }
2197 } else {
2198 soft_panic_or_log!("unexpected DroppedId for {id}");
2199 }
2200 }
2201 (replica_id, StorageResponse::StatisticsUpdates(source_stats, sink_stats)) => {
2202 {
2204 let replica_id = if let Some(replica_id) = replica_id {
2211 replica_id
2212 } else {
2213 tracing::error!(
2214 ?source_stats,
2215 "missing replica_id for source statistics update"
2216 );
2217 continue;
2218 };
2219
2220 let mut shared_stats = self.source_statistics.lock().expect("poisoned");
2221
2222 for stat in source_stats {
2223 let collection_id = stat.id.clone();
2224
2225 if self.collection(collection_id).is_err() {
2226 continue;
2229 }
2230
2231 let entry = shared_stats
2232 .source_statistics
2233 .entry((stat.id, Some(replica_id)));
2234
2235 match entry {
2236 btree_map::Entry::Vacant(vacant_entry) => {
2237 let mut stats = ControllerSourceStatistics::new(
2238 collection_id,
2239 Some(replica_id),
2240 );
2241 stats.incorporate(stat);
2242 vacant_entry.insert(stats);
2243 }
2244 btree_map::Entry::Occupied(mut occupied_entry) => {
2245 occupied_entry.get_mut().incorporate(stat);
2246 }
2247 }
2248 }
2249 }
2250
2251 {
2252 let replica_id = if let Some(replica_id) = replica_id {
2263 replica_id
2264 } else {
2265 tracing::error!(
2266 ?sink_stats,
2267 "missing replica_id for sink statistics update"
2268 );
2269 continue;
2270 };
2271
2272 let mut shared_stats = self.sink_statistics.lock().expect("poisoned");
2273
2274 for stat in sink_stats {
2275 let collection_id = stat.id.clone();
2276
2277 if self.collection(collection_id).is_err() {
2278 continue;
2281 }
2282
2283 let entry = shared_stats.entry((stat.id, Some(replica_id)));
2284
2285 match entry {
2286 btree_map::Entry::Vacant(vacant_entry) => {
2287 let mut stats =
2288 ControllerSinkStatistics::new(collection_id, replica_id);
2289 stats.incorporate(stat);
2290 vacant_entry.insert(stats);
2291 }
2292 btree_map::Entry::Occupied(mut occupied_entry) => {
2293 occupied_entry.get_mut().incorporate(stat);
2294 }
2295 }
2296 }
2297 }
2298 }
2299 (replica_id, StorageResponse::StatusUpdate(mut status_update)) => {
2300 match status_update.status {
2316 Status::Running => {
2317 let collection = self.collections.get_mut(&status_update.id);
2318 match collection {
2319 Some(collection) => {
2320 match collection.extra_state {
2321 CollectionStateExtra::Ingestion(
2322 ref mut ingestion_state,
2323 ) => {
2324 if ingestion_state.hydrated_on.is_empty() {
2325 tracing::debug!(ingestion_id = %status_update.id, "ingestion is hydrated");
2326 }
2327 ingestion_state.hydrated_on.insert(replica_id.expect(
2328 "replica id should be present for status running",
2329 ));
2330 }
2331 CollectionStateExtra::Export(_) => {
2332 }
2334 CollectionStateExtra::None => {
2335 }
2337 }
2338 }
2339 None => (), }
2342 }
2343 Status::Paused => {
2344 let collection = self.collections.get_mut(&status_update.id);
2345 match collection {
2346 Some(collection) => {
2347 match collection.extra_state {
2348 CollectionStateExtra::Ingestion(
2349 ref mut ingestion_state,
2350 ) => {
2351 tracing::debug!(ingestion_id = %status_update.id, "ingestion is now paused");
2358 ingestion_state.hydrated_on.clear();
2359 }
2360 CollectionStateExtra::Export(_) => {
2361 }
2363 CollectionStateExtra::None => {
2364 }
2366 }
2367 }
2368 None => (), }
2371 }
2372 _ => (),
2373 }
2374
2375 if let Some(id) = replica_id {
2377 status_update.replica_id = Some(id);
2378 }
2379 status_updates.push(status_update);
2380 }
2381 (_replica_id, StorageResponse::StagedBatches(batches)) => {
2382 for (ingestion_id, batches) in batches {
2383 match self.pending_oneshot_ingestions.remove(&ingestion_id) {
2384 Some(pending) => {
2385 if let Some(instance) = self.instances.get_mut(&pending.cluster_id)
2388 {
2389 instance
2390 .send(StorageCommand::CancelOneshotIngestion(ingestion_id));
2391 }
2392 (pending.result_tx)(batches)
2394 }
2395 None => {
2396 }
2399 }
2400 }
2401 }
2402 }
2403 }
2404
2405 self.record_status_updates(status_updates);
2406
2407 let mut dropped_table_ids = Vec::new();
2409 while let Ok(dropped_id) = self.pending_table_handle_drops_rx.try_recv() {
2410 dropped_table_ids.push(dropped_id);
2411 }
2412 if !dropped_table_ids.is_empty() {
2413 self.drop_sources(storage_metadata, dropped_table_ids)?;
2414 }
2415
2416 if updated_frontiers.is_empty() {
2417 Ok(None)
2418 } else {
2419 Ok(Some(Response::FrontierUpdates(
2420 updated_frontiers.into_iter().collect(),
2421 )))
2422 }
2423 }
2424
2425 async fn inspect_persist_state(
2426 &self,
2427 id: GlobalId,
2428 ) -> Result<serde_json::Value, anyhow::Error> {
2429 let collection = &self.storage_collections.collection_metadata(id)?;
2430 let client = self
2431 .persist
2432 .open(collection.persist_location.clone())
2433 .await?;
2434 let shard_state = client
2435 .inspect_shard::<Self::Timestamp>(&collection.data_shard)
2436 .await?;
2437 let json_state = serde_json::to_value(shard_state)?;
2438 Ok(json_state)
2439 }
2440
2441 fn append_introspection_updates(
2442 &mut self,
2443 type_: IntrospectionType,
2444 updates: Vec<(Row, Diff)>,
2445 ) {
2446 let id = self.introspection_ids[&type_];
2447 let updates = updates.into_iter().map(|update| update.into()).collect();
2448 self.collection_manager.blind_write(id, updates);
2449 }
2450
2451 fn append_status_introspection_updates(
2452 &mut self,
2453 type_: IntrospectionType,
2454 updates: Vec<StatusUpdate>,
2455 ) {
2456 let id = self.introspection_ids[&type_];
2457 let updates: Vec<_> = updates.into_iter().map(|update| update.into()).collect();
2458 if !updates.is_empty() {
2459 self.collection_manager.blind_write(id, updates);
2460 }
2461 }
2462
2463 fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp) {
2464 let id = self.introspection_ids[&type_];
2465 self.collection_manager.differential_write(id, op);
2466 }
2467
2468 fn append_only_introspection_tx(
2469 &self,
2470 type_: IntrospectionType,
2471 ) -> mpsc::UnboundedSender<(
2472 Vec<AppendOnlyUpdate>,
2473 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2474 )> {
2475 let id = self.introspection_ids[&type_];
2476 self.collection_manager.append_only_write_sender(id)
2477 }
2478
2479 fn differential_introspection_tx(
2480 &self,
2481 type_: IntrospectionType,
2482 ) -> mpsc::UnboundedSender<(
2483 StorageWriteOp,
2484 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2485 )> {
2486 let id = self.introspection_ids[&type_];
2487 self.collection_manager.differential_write_sender(id)
2488 }
2489
2490 async fn real_time_recent_timestamp(
2491 &self,
2492 timestamp_objects: BTreeSet<GlobalId>,
2493 timeout: Duration,
2494 ) -> Result<
2495 BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>,
2496 StorageError<Self::Timestamp>,
2497 > {
2498 use mz_storage_types::sources::GenericSourceConnection;
2499
2500 let mut rtr_futures = BTreeMap::new();
2501
2502 for id in timestamp_objects.into_iter().filter(GlobalId::is_user) {
2504 let collection = match self.collection(id) {
2505 Ok(c) => c,
2506 Err(_) => continue,
2508 };
2509
2510 let (source_conn, remap_id) = match &collection.data_source {
2511 DataSource::Ingestion(IngestionDescription {
2512 desc: SourceDesc { connection, .. },
2513 remap_collection_id,
2514 ..
2515 }) => match connection {
2516 GenericSourceConnection::Kafka(_)
2517 | GenericSourceConnection::Postgres(_)
2518 | GenericSourceConnection::MySql(_)
2519 | GenericSourceConnection::SqlServer(_) => {
2520 (connection.clone(), *remap_collection_id)
2521 }
2522
2523 GenericSourceConnection::LoadGenerator(_) => continue,
2528 },
2529 _ => {
2531 continue;
2532 }
2533 };
2534
2535 let config = self.config().clone();
2537
2538 let read_handle = self.read_handle_for_snapshot(remap_id).await?;
2546
2547 let remap_read_hold = self
2550 .storage_collections
2551 .acquire_read_holds(vec![remap_id])
2552 .map_err(|_e| StorageError::ReadBeforeSince(remap_id))?
2553 .expect_element(|| "known to be exactly one");
2554
2555 let remap_as_of = remap_read_hold
2556 .since()
2557 .to_owned()
2558 .into_option()
2559 .ok_or(StorageError::ReadBeforeSince(remap_id))?;
2560
2561 rtr_futures.insert(
2562 id,
2563 tokio::time::timeout(timeout, async move {
2564 use mz_storage_types::sources::SourceConnection as _;
2565
2566 let as_of = Antichain::from_elem(remap_as_of);
2569 let remap_subscribe = read_handle
2570 .subscribe(as_of.clone())
2571 .await
2572 .map_err(|_| StorageError::ReadBeforeSince(remap_id))?;
2573
2574 tracing::debug!(?id, type_ = source_conn.name(), upstream = ?source_conn.external_reference(), "fetching real time recency");
2575
2576 let result = rtr::real_time_recency_ts(source_conn, id, config, as_of, remap_subscribe)
2577 .await.map_err(|e| {
2578 tracing::debug!(?id, "real time recency error: {:?}", e);
2579 e
2580 });
2581
2582 drop(remap_read_hold);
2584
2585 result
2586 }),
2587 );
2588 }
2589
2590 Ok(Box::pin(async move {
2591 let (ids, futs): (Vec<_>, Vec<_>) = rtr_futures.into_iter().unzip();
2592 ids.into_iter()
2593 .zip_eq(futures::future::join_all(futs).await)
2594 .try_fold(T::minimum(), |curr, (id, per_source_res)| {
2595 let new =
2596 per_source_res.map_err(|_e: Elapsed| StorageError::RtrTimeout(id))??;
2597 Ok::<_, StorageError<Self::Timestamp>>(std::cmp::max(curr, new))
2598 })
2599 }))
2600 }
2601
2602 fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2603 let Self {
2605 build_info: _,
2606 now: _,
2607 read_only,
2608 collections,
2609 dropped_objects,
2610 persist_table_worker: _,
2611 txns_read: _,
2612 txns_metrics: _,
2613 stashed_responses,
2614 pending_table_handle_drops_tx: _,
2615 pending_table_handle_drops_rx: _,
2616 pending_oneshot_ingestions,
2617 collection_manager: _,
2618 introspection_ids,
2619 introspection_tokens: _,
2620 source_statistics: _,
2621 sink_statistics: _,
2622 statistics_interval_sender: _,
2623 instances,
2624 initialized,
2625 config,
2626 persist_location,
2627 persist: _,
2628 metrics: _,
2629 recorded_frontiers,
2630 recorded_replica_frontiers,
2631 wallclock_lag: _,
2632 wallclock_lag_last_recorded,
2633 storage_collections: _,
2634 migrated_storage_collections,
2635 maintenance_ticker: _,
2636 maintenance_scheduled,
2637 instance_response_tx: _,
2638 instance_response_rx: _,
2639 persist_warm_task: _,
2640 } = self;
2641
2642 let collections: BTreeMap<_, _> = collections
2643 .iter()
2644 .map(|(id, c)| (id.to_string(), format!("{c:?}")))
2645 .collect();
2646 let dropped_objects: BTreeMap<_, _> = dropped_objects
2647 .iter()
2648 .map(|(id, rs)| (id.to_string(), format!("{rs:?}")))
2649 .collect();
2650 let stashed_responses: Vec<_> =
2651 stashed_responses.iter().map(|r| format!("{r:?}")).collect();
2652 let pending_oneshot_ingestions: BTreeMap<_, _> = pending_oneshot_ingestions
2653 .iter()
2654 .map(|(uuid, i)| (uuid.to_string(), format!("{i:?}")))
2655 .collect();
2656 let introspection_ids: BTreeMap<_, _> = introspection_ids
2657 .iter()
2658 .map(|(typ, id)| (format!("{typ:?}"), id.to_string()))
2659 .collect();
2660 let instances: BTreeMap<_, _> = instances
2661 .iter()
2662 .map(|(id, i)| (id.to_string(), format!("{i:?}")))
2663 .collect();
2664 let recorded_frontiers: BTreeMap<_, _> = recorded_frontiers
2665 .iter()
2666 .map(|(id, fs)| (id.to_string(), format!("{fs:?}")))
2667 .collect();
2668 let recorded_replica_frontiers: Vec<_> = recorded_replica_frontiers
2669 .iter()
2670 .map(|((gid, rid), f)| (gid.to_string(), rid.to_string(), format!("{f:?}")))
2671 .collect();
2672 let migrated_storage_collections: Vec<_> = migrated_storage_collections
2673 .iter()
2674 .map(|id| id.to_string())
2675 .collect();
2676
2677 Ok(serde_json::json!({
2678 "read_only": read_only,
2679 "collections": collections,
2680 "dropped_objects": dropped_objects,
2681 "stashed_responses": stashed_responses,
2682 "pending_oneshot_ingestions": pending_oneshot_ingestions,
2683 "introspection_ids": introspection_ids,
2684 "instances": instances,
2685 "initialized": initialized,
2686 "config": format!("{config:?}"),
2687 "persist_location": format!("{persist_location:?}"),
2688 "recorded_frontiers": recorded_frontiers,
2689 "recorded_replica_frontiers": recorded_replica_frontiers,
2690 "wallclock_lag_last_recorded": format!("{wallclock_lag_last_recorded:?}"),
2691 "migrated_storage_collections": migrated_storage_collections,
2692 "maintenance_scheduled": maintenance_scheduled,
2693 }))
2694 }
2695}
2696
2697pub fn prepare_initialization<T>(txn: &mut dyn StorageTxn<T>) -> Result<(), StorageError<T>> {
2704 if txn.get_txn_wal_shard().is_none() {
2705 let txns_id = ShardId::new();
2706 txn.write_txn_wal_shard(txns_id)?;
2707 }
2708
2709 Ok(())
2710}
2711
2712impl<T> Controller<T>
2713where
2714 T: Timestamp
2715 + Lattice
2716 + TotalOrder
2717 + Codec64
2718 + From<EpochMillis>
2719 + TimestampManipulation
2720 + Into<Datum<'static>>,
2721 Self: StorageController<Timestamp = T>,
2722{
2723 pub async fn new(
2731 build_info: &'static BuildInfo,
2732 persist_location: PersistLocation,
2733 persist_clients: Arc<PersistClientCache>,
2734 now: NowFn,
2735 wallclock_lag: WallclockLagFn<T>,
2736 txns_metrics: Arc<TxnMetrics>,
2737 read_only: bool,
2738 metrics_registry: &MetricsRegistry,
2739 controller_metrics: ControllerMetrics,
2740 connection_context: ConnectionContext,
2741 txn: &dyn StorageTxn<T>,
2742 storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
2743 ) -> Self {
2744 let txns_client = persist_clients
2745 .open(persist_location.clone())
2746 .await
2747 .expect("location should be valid");
2748
2749 let persist_warm_task = warm_persist_state_in_background(
2750 txns_client.clone(),
2751 txn.get_collection_metadata().into_values(),
2752 );
2753 let persist_warm_task = Some(persist_warm_task.abort_on_drop());
2754
2755 let txns_id = txn
2759 .get_txn_wal_shard()
2760 .expect("must call prepare initialization before creating storage controller");
2761
2762 let persist_table_worker = if read_only {
2763 let txns_write = txns_client
2764 .open_writer(
2765 txns_id,
2766 Arc::new(TxnsCodecRow::desc()),
2767 Arc::new(UnitSchema),
2768 Diagnostics {
2769 shard_name: "txns".to_owned(),
2770 handle_purpose: "follow txns upper".to_owned(),
2771 },
2772 )
2773 .await
2774 .expect("txns schema shouldn't change");
2775 persist_handles::PersistTableWriteWorker::new_read_only_mode(txns_write)
2776 } else {
2777 let mut txns = TxnsHandle::open(
2778 T::minimum(),
2779 txns_client.clone(),
2780 txns_client.dyncfgs().clone(),
2781 Arc::clone(&txns_metrics),
2782 txns_id,
2783 )
2784 .await;
2785 txns.upgrade_version().await;
2786 persist_handles::PersistTableWriteWorker::new_txns(txns)
2787 };
2788 let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
2789
2790 let collection_manager = collection_mgmt::CollectionManager::new(read_only, now.clone());
2791
2792 let introspection_ids = BTreeMap::new();
2793 let introspection_tokens = Arc::new(Mutex::new(BTreeMap::new()));
2794
2795 let (statistics_interval_sender, _) =
2796 channel(mz_storage_types::parameters::STATISTICS_INTERVAL_DEFAULT);
2797
2798 let (pending_table_handle_drops_tx, pending_table_handle_drops_rx) =
2799 tokio::sync::mpsc::unbounded_channel();
2800
2801 let mut maintenance_ticker = tokio::time::interval(Duration::from_secs(1));
2802 maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2803
2804 let (instance_response_tx, instance_response_rx) = mpsc::unbounded_channel();
2805
2806 let metrics = StorageControllerMetrics::new(metrics_registry, controller_metrics);
2807
2808 let now_dt = mz_ore::now::to_datetime(now());
2809
2810 Self {
2811 build_info,
2812 collections: BTreeMap::default(),
2813 dropped_objects: Default::default(),
2814 persist_table_worker,
2815 txns_read,
2816 txns_metrics,
2817 stashed_responses: vec![],
2818 pending_table_handle_drops_tx,
2819 pending_table_handle_drops_rx,
2820 pending_oneshot_ingestions: BTreeMap::default(),
2821 collection_manager,
2822 introspection_ids,
2823 introspection_tokens,
2824 now,
2825 read_only,
2826 source_statistics: Arc::new(Mutex::new(statistics::SourceStatistics {
2827 source_statistics: BTreeMap::new(),
2828 webhook_statistics: BTreeMap::new(),
2829 })),
2830 sink_statistics: Arc::new(Mutex::new(BTreeMap::new())),
2831 statistics_interval_sender,
2832 instances: BTreeMap::new(),
2833 initialized: false,
2834 config: StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs()),
2835 persist_location,
2836 persist: persist_clients,
2837 metrics,
2838 recorded_frontiers: BTreeMap::new(),
2839 recorded_replica_frontiers: BTreeMap::new(),
2840 wallclock_lag,
2841 wallclock_lag_last_recorded: now_dt,
2842 storage_collections,
2843 migrated_storage_collections: BTreeSet::new(),
2844 maintenance_ticker,
2845 maintenance_scheduled: false,
2846 instance_response_rx,
2847 instance_response_tx,
2848 persist_warm_task,
2849 }
2850 }
2851
2852 #[instrument(level = "debug")]
2860 fn set_hold_policies(&mut self, policies: Vec<(GlobalId, ReadPolicy<T>)>) {
2861 let mut read_capability_changes = BTreeMap::default();
2862
2863 for (id, policy) in policies.into_iter() {
2864 if let Some(collection) = self.collections.get_mut(&id) {
2865 let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state
2866 {
2867 CollectionStateExtra::Ingestion(ingestion) => (
2868 ingestion.write_frontier.borrow(),
2869 &mut ingestion.derived_since,
2870 &mut ingestion.hold_policy,
2871 ),
2872 CollectionStateExtra::None => {
2873 unreachable!("set_hold_policies is only called for ingestions");
2874 }
2875 CollectionStateExtra::Export(export) => (
2876 export.write_frontier.borrow(),
2877 &mut export.derived_since,
2878 &mut export.read_policy,
2879 ),
2880 };
2881
2882 let new_derived_since = policy.frontier(write_frontier);
2883 let mut update = swap_updates(derived_since, new_derived_since);
2884 if !update.is_empty() {
2885 read_capability_changes.insert(id, update);
2886 }
2887
2888 *hold_policy = policy;
2889 }
2890 }
2891
2892 if !read_capability_changes.is_empty() {
2893 self.update_hold_capabilities(&mut read_capability_changes);
2894 }
2895 }
2896
2897 #[instrument(level = "debug", fields(updates))]
2898 fn update_write_frontier(&mut self, id: GlobalId, new_upper: &Antichain<T>) {
2899 let mut read_capability_changes = BTreeMap::default();
2900
2901 if let Some(collection) = self.collections.get_mut(&id) {
2902 let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state {
2903 CollectionStateExtra::Ingestion(ingestion) => (
2904 &mut ingestion.write_frontier,
2905 &mut ingestion.derived_since,
2906 &ingestion.hold_policy,
2907 ),
2908 CollectionStateExtra::None => {
2909 if matches!(collection.data_source, DataSource::Progress) {
2910 } else {
2912 tracing::error!(
2913 ?collection,
2914 ?new_upper,
2915 "updated write frontier for collection which is not an ingestion"
2916 );
2917 }
2918 return;
2919 }
2920 CollectionStateExtra::Export(export) => (
2921 &mut export.write_frontier,
2922 &mut export.derived_since,
2923 &export.read_policy,
2924 ),
2925 };
2926
2927 if PartialOrder::less_than(write_frontier, new_upper) {
2928 write_frontier.clone_from(new_upper);
2929 }
2930
2931 let new_derived_since = hold_policy.frontier(write_frontier.borrow());
2932 let mut update = swap_updates(derived_since, new_derived_since);
2933 if !update.is_empty() {
2934 read_capability_changes.insert(id, update);
2935 }
2936 } else if self.dropped_objects.contains_key(&id) {
2937 } else {
2940 soft_panic_or_log!("spurious upper update for {id}: {new_upper:?}");
2941 }
2942
2943 if !read_capability_changes.is_empty() {
2944 self.update_hold_capabilities(&mut read_capability_changes);
2945 }
2946 }
2947
2948 #[instrument(level = "debug", fields(updates))]
2952 fn update_hold_capabilities(&mut self, updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>) {
2953 let mut collections_net = BTreeMap::new();
2955
2956 while let Some(key) = updates.keys().rev().next().cloned() {
2961 let mut update = updates.remove(&key).unwrap();
2962
2963 if key.is_user() {
2964 debug!(id = %key, ?update, "update_hold_capability");
2965 }
2966
2967 if let Some(collection) = self.collections.get_mut(&key) {
2968 match &mut collection.extra_state {
2969 CollectionStateExtra::Ingestion(ingestion) => {
2970 let changes = ingestion.read_capabilities.update_iter(update.drain());
2971 update.extend(changes);
2972
2973 let (changes, frontier, _cluster_id) =
2974 collections_net.entry(key).or_insert_with(|| {
2975 (
2976 <ChangeBatch<_>>::new(),
2977 Antichain::new(),
2978 ingestion.instance_id,
2979 )
2980 });
2981
2982 changes.extend(update.drain());
2983 *frontier = ingestion.read_capabilities.frontier().to_owned();
2984 }
2985 CollectionStateExtra::None => {
2986 soft_panic_or_log!(
2988 "trying to update holds for collection {collection:?} which is not \
2989 an ingestion: {update:?}"
2990 );
2991 continue;
2992 }
2993 CollectionStateExtra::Export(export) => {
2994 let changes = export.read_capabilities.update_iter(update.drain());
2995 update.extend(changes);
2996
2997 let (changes, frontier, _cluster_id) =
2998 collections_net.entry(key).or_insert_with(|| {
2999 (<ChangeBatch<_>>::new(), Antichain::new(), export.cluster_id)
3000 });
3001
3002 changes.extend(update.drain());
3003 *frontier = export.read_capabilities.frontier().to_owned();
3004 }
3005 }
3006 } else {
3007 tracing::warn!(id = ?key, ?update, "update_hold_capabilities for unknown object");
3009 }
3010 }
3011
3012 for (key, (mut changes, frontier, cluster_id)) in collections_net {
3015 if !changes.is_empty() {
3016 if key.is_user() {
3017 debug!(id = %key, ?frontier, "downgrading ingestion read holds!");
3018 }
3019
3020 let collection = self
3021 .collections
3022 .get_mut(&key)
3023 .expect("missing collection state");
3024
3025 let read_holds = match &mut collection.extra_state {
3026 CollectionStateExtra::Ingestion(ingestion) => {
3027 ingestion.dependency_read_holds.as_mut_slice()
3028 }
3029 CollectionStateExtra::Export(export) => export.read_holds.as_mut_slice(),
3030 CollectionStateExtra::None => {
3031 soft_panic_or_log!(
3032 "trying to downgrade read holds for collection which is not an \
3033 ingestion: {collection:?}"
3034 );
3035 continue;
3036 }
3037 };
3038
3039 for read_hold in read_holds.iter_mut() {
3040 read_hold
3041 .try_downgrade(frontier.clone())
3042 .expect("we only advance the frontier");
3043 }
3044
3045 if let Some(instance) = self.instances.get_mut(&cluster_id) {
3047 instance.send(StorageCommand::AllowCompaction(key, frontier.clone()));
3048 } else {
3049 soft_panic_or_log!(
3050 "missing instance client for cluster {cluster_id} while we still have outstanding AllowCompaction command {frontier:?} for {key}"
3051 );
3052 }
3053 }
3054 }
3055 }
3056
3057 fn validate_collection_ids(
3059 &self,
3060 ids: impl Iterator<Item = GlobalId>,
3061 ) -> Result<(), StorageError<T>> {
3062 for id in ids {
3063 self.storage_collections.check_exists(id)?;
3064 }
3065 Ok(())
3066 }
3067
3068 fn validate_export_ids(
3070 &self,
3071 ids: impl Iterator<Item = GlobalId>,
3072 ) -> Result<(), StorageError<T>> {
3073 for id in ids {
3074 self.export(id)?;
3075 }
3076 Ok(())
3077 }
3078
3079 async fn open_data_handles(
3087 &self,
3088 id: &GlobalId,
3089 shard: ShardId,
3090 relation_desc: RelationDesc,
3091 persist_client: &PersistClient,
3092 ) -> WriteHandle<SourceData, (), T, StorageDiff> {
3093 let diagnostics = Diagnostics {
3094 shard_name: id.to_string(),
3095 handle_purpose: format!("controller data for {}", id),
3096 };
3097
3098 let mut write = persist_client
3099 .open_writer(
3100 shard,
3101 Arc::new(relation_desc),
3102 Arc::new(UnitSchema),
3103 diagnostics.clone(),
3104 )
3105 .await
3106 .expect("invalid persist usage");
3107
3108 write.fetch_recent_upper().await;
3117
3118 write
3119 }
3120
3121 fn register_introspection_collection(
3126 &mut self,
3127 id: GlobalId,
3128 introspection_type: IntrospectionType,
3129 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
3130 persist_client: PersistClient,
3131 ) -> Result<(), StorageError<T>> {
3132 tracing::info!(%id, ?introspection_type, "registering introspection collection");
3133
3134 let force_writable = self.read_only && self.migrated_storage_collections.contains(&id);
3138 if force_writable {
3139 assert!(id.is_system(), "unexpected non-system global id: {id:?}");
3140 info!("writing to migrated storage collection {id} in read-only mode");
3141 }
3142
3143 let prev = self.introspection_ids.insert(introspection_type, id);
3144 assert!(
3145 prev.is_none(),
3146 "cannot have multiple IDs for introspection type"
3147 );
3148
3149 let metadata = self.storage_collections.collection_metadata(id)?.clone();
3150
3151 let read_handle_fn = move || {
3152 let persist_client = persist_client.clone();
3153 let metadata = metadata.clone();
3154
3155 let fut = async move {
3156 let read_handle = persist_client
3157 .open_leased_reader::<SourceData, (), T, StorageDiff>(
3158 metadata.data_shard,
3159 Arc::new(metadata.relation_desc.clone()),
3160 Arc::new(UnitSchema),
3161 Diagnostics {
3162 shard_name: id.to_string(),
3163 handle_purpose: format!("snapshot {}", id),
3164 },
3165 USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3166 )
3167 .await
3168 .expect("invalid persist usage");
3169 read_handle
3170 };
3171
3172 fut.boxed()
3173 };
3174
3175 let recent_upper = write_handle.shared_upper();
3176
3177 match CollectionManagerKind::from(&introspection_type) {
3178 CollectionManagerKind::Differential => {
3183 let statistics_retention_duration =
3184 dyncfgs::STATISTICS_RETENTION_DURATION.get(self.config().config_set());
3185
3186 let introspection_config = DifferentialIntrospectionConfig {
3188 recent_upper,
3189 introspection_type,
3190 storage_collections: Arc::clone(&self.storage_collections),
3191 collection_manager: self.collection_manager.clone(),
3192 source_statistics: Arc::clone(&self.source_statistics),
3193 sink_statistics: Arc::clone(&self.sink_statistics),
3194 statistics_interval: self.config.parameters.statistics_interval.clone(),
3195 statistics_interval_receiver: self.statistics_interval_sender.subscribe(),
3196 statistics_retention_duration,
3197 metrics: self.metrics.clone(),
3198 introspection_tokens: Arc::clone(&self.introspection_tokens),
3199 };
3200 self.collection_manager.register_differential_collection(
3201 id,
3202 write_handle,
3203 read_handle_fn,
3204 force_writable,
3205 introspection_config,
3206 );
3207 }
3208 CollectionManagerKind::AppendOnly => {
3216 let introspection_config = AppendOnlyIntrospectionConfig {
3217 introspection_type,
3218 config_set: Arc::clone(self.config.config_set()),
3219 parameters: self.config.parameters.clone(),
3220 storage_collections: Arc::clone(&self.storage_collections),
3221 };
3222 self.collection_manager.register_append_only_collection(
3223 id,
3224 write_handle,
3225 force_writable,
3226 Some(introspection_config),
3227 );
3228 }
3229 }
3230
3231 Ok(())
3232 }
3233
3234 fn reconcile_dangling_statistics(&self) {
3237 self.source_statistics
3238 .lock()
3239 .expect("poisoned")
3240 .source_statistics
3241 .retain(|(k, _replica_id), _| self.storage_collections.check_exists(*k).is_ok());
3243 self.sink_statistics
3244 .lock()
3245 .expect("poisoned")
3246 .retain(|(k, _replica_id), _| self.export(*k).is_ok());
3247 }
3248
3249 #[instrument(level = "debug")]
3259 fn append_shard_mappings<I>(&self, global_ids: I, diff: Diff)
3260 where
3261 I: Iterator<Item = GlobalId>,
3262 {
3263 mz_ore::soft_assert_or_log!(
3264 diff == Diff::MINUS_ONE || diff == Diff::ONE,
3265 "use 1 for insert or -1 for delete"
3266 );
3267
3268 let id = *self
3269 .introspection_ids
3270 .get(&IntrospectionType::ShardMapping)
3271 .expect("should be registered before this call");
3272
3273 let mut updates = vec![];
3274 let mut row_buf = Row::default();
3276
3277 for global_id in global_ids {
3278 let shard_id = if let Some(collection) = self.collections.get(&global_id) {
3279 collection.collection_metadata.data_shard.clone()
3280 } else {
3281 panic!("unknown global id: {}", global_id);
3282 };
3283
3284 let mut packer = row_buf.packer();
3285 packer.push(Datum::from(global_id.to_string().as_str()));
3286 packer.push(Datum::from(shard_id.to_string().as_str()));
3287 updates.push((row_buf.clone(), diff));
3288 }
3289
3290 self.collection_manager.differential_append(id, updates);
3291 }
3292
3293 fn determine_collection_dependencies(
3295 &self,
3296 self_id: GlobalId,
3297 collection_desc: &CollectionDescription<T>,
3298 ) -> Result<Vec<GlobalId>, StorageError<T>> {
3299 let mut dependencies = Vec::new();
3300
3301 if let Some(id) = collection_desc.primary {
3302 dependencies.push(id);
3303 }
3304
3305 match &collection_desc.data_source {
3306 DataSource::Introspection(_)
3307 | DataSource::Webhook
3308 | DataSource::Table
3309 | DataSource::Progress
3310 | DataSource::Other => (),
3311 DataSource::IngestionExport { ingestion_id, .. } => {
3312 let source_collection = self.collection(*ingestion_id)?;
3315 let ingestion_remap_collection_id = match &source_collection.data_source {
3316 DataSource::Ingestion(ingestion) => ingestion.remap_collection_id,
3317 _ => unreachable!(
3318 "SourceExport must only refer to primary sources that already exist"
3319 ),
3320 };
3321
3322 dependencies.extend([self_id, ingestion_remap_collection_id]);
3328 }
3329 DataSource::Ingestion(ingestion) => {
3331 dependencies.push(self_id);
3336 if self_id != ingestion.remap_collection_id {
3337 dependencies.push(ingestion.remap_collection_id);
3338 }
3339 }
3340 DataSource::Sink { desc } => {
3341 dependencies.extend([self_id, desc.sink.from]);
3343 }
3344 };
3345
3346 Ok(dependencies)
3347 }
3348
3349 async fn read_handle_for_snapshot(
3350 &self,
3351 id: GlobalId,
3352 ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
3353 let metadata = self.storage_collections.collection_metadata(id)?;
3354 read_handle_for_snapshot(&self.persist, id, &metadata).await
3355 }
3356
3357 fn record_status_updates(&mut self, updates: Vec<StatusUpdate>) {
3360 if self.read_only {
3361 return;
3362 }
3363
3364 let mut sink_status_updates = vec![];
3365 let mut source_status_updates = vec![];
3366
3367 for update in updates {
3368 let id = update.id;
3369 if self.export(id).is_ok() {
3370 sink_status_updates.push(update);
3371 } else if self.storage_collections.check_exists(id).is_ok() {
3372 source_status_updates.push(update);
3373 }
3374 }
3375
3376 self.append_status_introspection_updates(
3377 IntrospectionType::SourceStatusHistory,
3378 source_status_updates,
3379 );
3380 self.append_status_introspection_updates(
3381 IntrospectionType::SinkStatusHistory,
3382 sink_status_updates,
3383 );
3384 }
3385
3386 fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, StorageError<T>> {
3387 self.collections
3388 .get(&id)
3389 .ok_or(StorageError::IdentifierMissing(id))
3390 }
3391
3392 fn run_ingestion(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3395 tracing::info!(%id, "starting ingestion");
3396
3397 let collection = self.collection(id)?;
3398 let ingestion_description = match &collection.data_source {
3399 DataSource::Ingestion(i) => i.clone(),
3400 _ => {
3401 tracing::warn!("run_ingestion called on non-ingestion ID {}", id);
3402 Err(StorageError::IdentifierInvalid(id))?
3403 }
3404 };
3405
3406 let mut source_exports = BTreeMap::new();
3408 for (export_id, export) in ingestion_description.source_exports.clone() {
3409 let export_storage_metadata = self.collection(export_id)?.collection_metadata.clone();
3410 source_exports.insert(
3411 export_id,
3412 SourceExport {
3413 storage_metadata: export_storage_metadata,
3414 details: export.details,
3415 data_config: export.data_config,
3416 },
3417 );
3418 }
3419
3420 let remap_collection = self.collection(ingestion_description.remap_collection_id)?;
3421
3422 let description = IngestionDescription::<CollectionMetadata> {
3423 source_exports,
3424 remap_metadata: remap_collection.collection_metadata.clone(),
3425 desc: ingestion_description.desc.clone(),
3427 instance_id: ingestion_description.instance_id,
3428 remap_collection_id: ingestion_description.remap_collection_id,
3429 };
3430
3431 let storage_instance_id = description.instance_id;
3432 let instance = self
3434 .instances
3435 .get_mut(&storage_instance_id)
3436 .ok_or_else(|| StorageError::IngestionInstanceMissing {
3437 storage_instance_id,
3438 ingestion_id: id,
3439 })?;
3440
3441 let augmented_ingestion = Box::new(RunIngestionCommand { id, description });
3442 instance.send(StorageCommand::RunIngestion(augmented_ingestion));
3443
3444 Ok(())
3445 }
3446
3447 fn run_export(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3450 let DataSource::Sink { desc: description } = &self.collections[&id].data_source else {
3451 return Err(StorageError::IdentifierMissing(id));
3452 };
3453
3454 let from_storage_metadata = self
3455 .storage_collections
3456 .collection_metadata(description.sink.from)?;
3457 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
3458
3459 let export_state = self.storage_collections.collection_frontiers(id)?;
3463 let mut as_of = description.sink.as_of.clone();
3464 as_of.join_assign(&export_state.implied_capability);
3465 let with_snapshot = description.sink.with_snapshot
3466 && !PartialOrder::less_than(&as_of, &export_state.write_frontier);
3467
3468 info!(
3469 sink_id = %id,
3470 from_id = %description.sink.from,
3471 write_frontier = ?export_state.write_frontier,
3472 ?as_of,
3473 ?with_snapshot,
3474 "run_export"
3475 );
3476
3477 let cmd = RunSinkCommand {
3478 id,
3479 description: StorageSinkDesc {
3480 from: description.sink.from,
3481 from_desc: description.sink.from_desc.clone(),
3482 connection: description.sink.connection.clone(),
3483 envelope: description.sink.envelope,
3484 as_of,
3485 version: description.sink.version,
3486 from_storage_metadata,
3487 with_snapshot,
3488 to_storage_metadata,
3489 commit_interval: description.sink.commit_interval,
3490 },
3491 };
3492
3493 let storage_instance_id = description.instance_id.clone();
3494
3495 let instance = self
3496 .instances
3497 .get_mut(&storage_instance_id)
3498 .ok_or_else(|| StorageError::ExportInstanceMissing {
3499 storage_instance_id,
3500 export_id: id,
3501 })?;
3502
3503 instance.send(StorageCommand::RunSink(Box::new(cmd)));
3504
3505 Ok(())
3506 }
3507
3508 fn update_frontier_introspection(&mut self) {
3513 let mut global_frontiers = BTreeMap::new();
3514 let mut replica_frontiers = BTreeMap::new();
3515
3516 for collection_frontiers in self.storage_collections.active_collection_frontiers() {
3517 let id = collection_frontiers.id;
3518 let since = collection_frontiers.read_capabilities;
3519 let upper = collection_frontiers.write_frontier;
3520
3521 let instance = self
3522 .collections
3523 .get(&id)
3524 .and_then(|collection_state| match &collection_state.extra_state {
3525 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
3526 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
3527 CollectionStateExtra::None => None,
3528 })
3529 .and_then(|i| self.instances.get(&i));
3530
3531 if let Some(instance) = instance {
3532 for replica_id in instance.replica_ids() {
3533 replica_frontiers.insert((id, replica_id), upper.clone());
3534 }
3535 }
3536
3537 global_frontiers.insert(id, (since, upper));
3538 }
3539
3540 let mut global_updates = Vec::new();
3541 let mut replica_updates = Vec::new();
3542
3543 let mut push_global_update =
3544 |id: GlobalId, (since, upper): (Antichain<T>, Antichain<T>), diff: Diff| {
3545 let read_frontier = since.into_option().map_or(Datum::Null, |t| t.into());
3546 let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3547 let row = Row::pack_slice(&[
3548 Datum::String(&id.to_string()),
3549 read_frontier,
3550 write_frontier,
3551 ]);
3552 global_updates.push((row, diff));
3553 };
3554
3555 let mut push_replica_update =
3556 |(id, replica_id): (GlobalId, ReplicaId), upper: Antichain<T>, diff: Diff| {
3557 let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3558 let row = Row::pack_slice(&[
3559 Datum::String(&id.to_string()),
3560 Datum::String(&replica_id.to_string()),
3561 write_frontier,
3562 ]);
3563 replica_updates.push((row, diff));
3564 };
3565
3566 let mut old_global_frontiers =
3567 std::mem::replace(&mut self.recorded_frontiers, global_frontiers);
3568 for (&id, new) in &self.recorded_frontiers {
3569 match old_global_frontiers.remove(&id) {
3570 Some(old) if &old != new => {
3571 push_global_update(id, new.clone(), Diff::ONE);
3572 push_global_update(id, old, Diff::MINUS_ONE);
3573 }
3574 Some(_) => (),
3575 None => push_global_update(id, new.clone(), Diff::ONE),
3576 }
3577 }
3578 for (id, old) in old_global_frontiers {
3579 push_global_update(id, old, Diff::MINUS_ONE);
3580 }
3581
3582 let mut old_replica_frontiers =
3583 std::mem::replace(&mut self.recorded_replica_frontiers, replica_frontiers);
3584 for (&key, new) in &self.recorded_replica_frontiers {
3585 match old_replica_frontiers.remove(&key) {
3586 Some(old) if &old != new => {
3587 push_replica_update(key, new.clone(), Diff::ONE);
3588 push_replica_update(key, old, Diff::MINUS_ONE);
3589 }
3590 Some(_) => (),
3591 None => push_replica_update(key, new.clone(), Diff::ONE),
3592 }
3593 }
3594 for (key, old) in old_replica_frontiers {
3595 push_replica_update(key, old, Diff::MINUS_ONE);
3596 }
3597
3598 let id = self.introspection_ids[&IntrospectionType::Frontiers];
3599 self.collection_manager
3600 .differential_append(id, global_updates);
3601
3602 let id = self.introspection_ids[&IntrospectionType::ReplicaFrontiers];
3603 self.collection_manager
3604 .differential_append(id, replica_updates);
3605 }
3606
3607 fn refresh_wallclock_lag(&mut self) {
3626 let now_ms = (self.now)();
3627 let histogram_period =
3628 WallclockLagHistogramPeriod::from_epoch_millis(now_ms, self.config.config_set());
3629
3630 let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
3631 Some(ts) => (self.wallclock_lag)(ts.clone()),
3632 None => Duration::ZERO,
3633 };
3634
3635 for frontiers in self.storage_collections.active_collection_frontiers() {
3636 let id = frontiers.id;
3637 let Some(collection) = self.collections.get_mut(&id) else {
3638 continue;
3639 };
3640
3641 let collection_unreadable =
3642 PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
3643 let lag = if collection_unreadable {
3644 WallclockLag::Undefined
3645 } else {
3646 let lag = frontier_lag(&frontiers.write_frontier);
3647 WallclockLag::Seconds(lag.as_secs())
3648 };
3649
3650 collection.wallclock_lag_max = collection.wallclock_lag_max.max(lag);
3651
3652 let secs = lag.unwrap_seconds_or(u64::MAX);
3655 collection.wallclock_lag_metrics.observe(secs);
3656
3657 if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
3658 let bucket = lag.map_seconds(|secs| secs.next_power_of_two());
3659
3660 let instance_id = match &collection.extra_state {
3661 CollectionStateExtra::Ingestion(i) => Some(i.instance_id),
3662 CollectionStateExtra::Export(e) => Some(e.cluster_id()),
3663 CollectionStateExtra::None => None,
3664 };
3665 let workload_class = instance_id
3666 .and_then(|id| self.instances.get(&id))
3667 .and_then(|i| i.workload_class.clone());
3668 let labels = match workload_class {
3669 Some(wc) => [("workload_class", wc.clone())].into(),
3670 None => BTreeMap::new(),
3671 };
3672
3673 let key = (histogram_period, bucket, labels);
3674 *stash.entry(key).or_default() += Diff::ONE;
3675 }
3676 }
3677
3678 self.maybe_record_wallclock_lag();
3680 }
3681
3682 fn maybe_record_wallclock_lag(&mut self) {
3690 if self.read_only {
3691 return;
3692 }
3693
3694 let duration_trunc = |datetime: DateTime<_>, interval| {
3695 let td = TimeDelta::from_std(interval).ok()?;
3696 datetime.duration_trunc(td).ok()
3697 };
3698
3699 let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(self.config.config_set());
3700 let now_dt = mz_ore::now::to_datetime((self.now)());
3701 let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
3702 soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
3703 let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
3704 duration_trunc(now_dt, *default).unwrap()
3705 });
3706 if now_trunc <= self.wallclock_lag_last_recorded {
3707 return;
3708 }
3709
3710 let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
3711
3712 let mut history_updates = Vec::new();
3713 let mut histogram_updates = Vec::new();
3714 let mut row_buf = Row::default();
3715 for frontiers in self.storage_collections.active_collection_frontiers() {
3716 let id = frontiers.id;
3717 let Some(collection) = self.collections.get_mut(&id) else {
3718 continue;
3719 };
3720
3721 let max_lag = std::mem::replace(&mut collection.wallclock_lag_max, WallclockLag::MIN);
3722 let row = Row::pack_slice(&[
3723 Datum::String(&id.to_string()),
3724 Datum::Null,
3725 max_lag.into_interval_datum(),
3726 Datum::TimestampTz(now_ts),
3727 ]);
3728 history_updates.push((row, Diff::ONE));
3729
3730 let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
3731 continue;
3732 };
3733
3734 for ((period, lag, labels), count) in std::mem::take(stash) {
3735 let mut packer = row_buf.packer();
3736 packer.extend([
3737 Datum::TimestampTz(period.start),
3738 Datum::TimestampTz(period.end),
3739 Datum::String(&id.to_string()),
3740 lag.into_uint64_datum(),
3741 ]);
3742 let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
3743 packer.push_dict(labels);
3744
3745 histogram_updates.push((row_buf.clone(), count));
3746 }
3747 }
3748
3749 if !history_updates.is_empty() {
3750 self.append_introspection_updates(
3751 IntrospectionType::WallclockLagHistory,
3752 history_updates,
3753 );
3754 }
3755 if !histogram_updates.is_empty() {
3756 self.append_introspection_updates(
3757 IntrospectionType::WallclockLagHistogram,
3758 histogram_updates,
3759 );
3760 }
3761
3762 self.wallclock_lag_last_recorded = now_trunc;
3763 }
3764
3765 fn maintain(&mut self) {
3770 self.update_frontier_introspection();
3771 self.refresh_wallclock_lag();
3772
3773 for instance in self.instances.values_mut() {
3775 instance.refresh_state_metrics();
3776 }
3777 }
3778}
3779
3780impl From<&IntrospectionType> for CollectionManagerKind {
3781 fn from(value: &IntrospectionType) -> Self {
3782 match value {
3783 IntrospectionType::ShardMapping
3784 | IntrospectionType::Frontiers
3785 | IntrospectionType::ReplicaFrontiers
3786 | IntrospectionType::StorageSourceStatistics
3787 | IntrospectionType::StorageSinkStatistics
3788 | IntrospectionType::ComputeDependencies
3789 | IntrospectionType::ComputeOperatorHydrationStatus
3790 | IntrospectionType::ComputeMaterializedViewRefreshes
3791 | IntrospectionType::ComputeErrorCounts
3792 | IntrospectionType::ComputeHydrationTimes => CollectionManagerKind::Differential,
3793
3794 IntrospectionType::SourceStatusHistory
3795 | IntrospectionType::SinkStatusHistory
3796 | IntrospectionType::PrivatelinkConnectionStatusHistory
3797 | IntrospectionType::ReplicaStatusHistory
3798 | IntrospectionType::ReplicaMetricsHistory
3799 | IntrospectionType::WallclockLagHistory
3800 | IntrospectionType::WallclockLagHistogram
3801 | IntrospectionType::PreparedStatementHistory
3802 | IntrospectionType::StatementExecutionHistory
3803 | IntrospectionType::SessionHistory
3804 | IntrospectionType::StatementLifecycleHistory
3805 | IntrospectionType::SqlText => CollectionManagerKind::AppendOnly,
3806 }
3807 }
3808}
3809
3810async fn snapshot_statistics<T>(
3816 id: GlobalId,
3817 upper: Antichain<T>,
3818 storage_collections: &Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
3819) -> Vec<Row>
3820where
3821 T: Codec64 + From<EpochMillis> + TimestampManipulation,
3822{
3823 match upper.as_option() {
3824 Some(f) if f > &T::minimum() => {
3825 let as_of = f.step_back().unwrap();
3826
3827 let snapshot = storage_collections.snapshot(id, as_of).await.unwrap();
3828 snapshot
3829 .into_iter()
3830 .map(|(row, diff)| {
3831 assert_eq!(diff, 1);
3832 row
3833 })
3834 .collect()
3835 }
3836 _ => Vec::new(),
3839 }
3840}
3841
3842async fn read_handle_for_snapshot<T>(
3843 persist: &PersistClientCache,
3844 id: GlobalId,
3845 metadata: &CollectionMetadata,
3846) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>>
3847where
3848 T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
3849{
3850 let persist_client = persist
3851 .open(metadata.persist_location.clone())
3852 .await
3853 .unwrap();
3854
3855 let read_handle = persist_client
3860 .open_leased_reader::<SourceData, (), _, _>(
3861 metadata.data_shard,
3862 Arc::new(metadata.relation_desc.clone()),
3863 Arc::new(UnitSchema),
3864 Diagnostics {
3865 shard_name: id.to_string(),
3866 handle_purpose: format!("snapshot {}", id),
3867 },
3868 USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3869 )
3870 .await
3871 .expect("invalid persist usage");
3872 Ok(read_handle)
3873}
3874
3875#[derive(Debug)]
3877struct CollectionState<T: TimelyTimestamp> {
3878 pub data_source: DataSource<T>,
3880
3881 pub collection_metadata: CollectionMetadata,
3882
3883 pub extra_state: CollectionStateExtra<T>,
3884
3885 wallclock_lag_max: WallclockLag,
3887 wallclock_lag_histogram_stash: Option<
3894 BTreeMap<
3895 (
3896 WallclockLagHistogramPeriod,
3897 WallclockLag,
3898 BTreeMap<&'static str, String>,
3899 ),
3900 Diff,
3901 >,
3902 >,
3903 wallclock_lag_metrics: WallclockLagMetrics,
3905}
3906
3907impl<T: TimelyTimestamp> CollectionState<T> {
3908 fn new(
3909 data_source: DataSource<T>,
3910 collection_metadata: CollectionMetadata,
3911 extra_state: CollectionStateExtra<T>,
3912 wallclock_lag_metrics: WallclockLagMetrics,
3913 ) -> Self {
3914 let wallclock_lag_histogram_stash = match &data_source {
3918 DataSource::Other => None,
3919 _ => Some(Default::default()),
3920 };
3921
3922 Self {
3923 data_source,
3924 collection_metadata,
3925 extra_state,
3926 wallclock_lag_max: WallclockLag::MIN,
3927 wallclock_lag_histogram_stash,
3928 wallclock_lag_metrics,
3929 }
3930 }
3931}
3932
3933#[derive(Debug)]
3935enum CollectionStateExtra<T: TimelyTimestamp> {
3936 Ingestion(IngestionState<T>),
3937 Export(ExportState<T>),
3938 None,
3939}
3940
3941#[derive(Debug)]
3943struct IngestionState<T: TimelyTimestamp> {
3944 pub read_capabilities: MutableAntichain<T>,
3946
3947 pub derived_since: Antichain<T>,
3950
3951 pub dependency_read_holds: Vec<ReadHold<T>>,
3953
3954 pub write_frontier: Antichain<T>,
3956
3957 pub hold_policy: ReadPolicy<T>,
3964
3965 pub instance_id: StorageInstanceId,
3967
3968 pub hydrated_on: BTreeSet<ReplicaId>,
3970}
3971
3972struct StatusHistoryDesc<K> {
3977 retention_policy: StatusHistoryRetentionPolicy,
3978 extract_key: Box<dyn Fn(&[Datum]) -> K + Send>,
3979 extract_time: Box<dyn Fn(&[Datum]) -> CheckedTimestamp<DateTime<Utc>> + Send>,
3980}
3981enum StatusHistoryRetentionPolicy {
3982 LastN(usize),
3984 TimeWindow(Duration),
3986}
3987
3988fn source_status_history_desc(
3989 params: &StorageParameters,
3990) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3991 let desc = &MZ_SOURCE_STATUS_HISTORY_DESC;
3992 let (source_id_idx, _) = desc.get_by_name(&"source_id".into()).expect("exists");
3993 let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3994 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3995
3996 StatusHistoryDesc {
3997 retention_policy: StatusHistoryRetentionPolicy::LastN(
3998 params.keep_n_source_status_history_entries,
3999 ),
4000 extract_key: Box::new(move |datums| {
4001 (
4002 GlobalId::from_str(datums[source_id_idx].unwrap_str()).expect("GlobalId column"),
4003 if datums[replica_id_idx].is_null() {
4004 None
4005 } else {
4006 Some(
4007 ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
4008 .expect("ReplicaId column"),
4009 )
4010 },
4011 )
4012 }),
4013 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4014 }
4015}
4016
4017fn sink_status_history_desc(
4018 params: &StorageParameters,
4019) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
4020 let desc = &MZ_SINK_STATUS_HISTORY_DESC;
4021 let (sink_id_idx, _) = desc.get_by_name(&"sink_id".into()).expect("exists");
4022 let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
4023 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4024
4025 StatusHistoryDesc {
4026 retention_policy: StatusHistoryRetentionPolicy::LastN(
4027 params.keep_n_sink_status_history_entries,
4028 ),
4029 extract_key: Box::new(move |datums| {
4030 (
4031 GlobalId::from_str(datums[sink_id_idx].unwrap_str()).expect("GlobalId column"),
4032 if datums[replica_id_idx].is_null() {
4033 None
4034 } else {
4035 Some(
4036 ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
4037 .expect("ReplicaId column"),
4038 )
4039 },
4040 )
4041 }),
4042 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4043 }
4044}
4045
4046fn privatelink_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<GlobalId> {
4047 let desc = &MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC;
4048 let (key_idx, _) = desc.get_by_name(&"connection_id".into()).expect("exists");
4049 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4050
4051 StatusHistoryDesc {
4052 retention_policy: StatusHistoryRetentionPolicy::LastN(
4053 params.keep_n_privatelink_status_history_entries,
4054 ),
4055 extract_key: Box::new(move |datums| {
4056 GlobalId::from_str(datums[key_idx].unwrap_str()).expect("GlobalId column")
4057 }),
4058 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4059 }
4060}
4061
4062fn replica_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<(GlobalId, u64)> {
4063 let desc = &REPLICA_STATUS_HISTORY_DESC;
4064 let (replica_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
4065 let (process_idx, _) = desc.get_by_name(&"process_id".into()).expect("exists");
4066 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4067
4068 StatusHistoryDesc {
4069 retention_policy: StatusHistoryRetentionPolicy::TimeWindow(
4070 params.replica_status_history_retention_window,
4071 ),
4072 extract_key: Box::new(move |datums| {
4073 (
4074 GlobalId::from_str(datums[replica_idx].unwrap_str()).expect("GlobalId column"),
4075 datums[process_idx].unwrap_uint64(),
4076 )
4077 }),
4078 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4079 }
4080}
4081
4082fn swap_updates<T: Timestamp>(
4084 from: &mut Antichain<T>,
4085 mut replace_with: Antichain<T>,
4086) -> ChangeBatch<T> {
4087 let mut update = ChangeBatch::new();
4088 if PartialOrder::less_equal(from, &replace_with) {
4089 update.extend(replace_with.iter().map(|time| (time.clone(), 1)));
4090 std::mem::swap(from, &mut replace_with);
4091 update.extend(replace_with.iter().map(|time| (time.clone(), -1)));
4092 }
4093 update
4094}