1use std::any::Any;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::{Debug, Display};
15use std::num::NonZeroI64;
16use std::str::FromStr;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use async_trait::async_trait;
21use chrono::{DateTime, DurationRound, TimeDelta, Utc};
22use derivative::Derivative;
23use differential_dataflow::lattice::Lattice;
24use futures::FutureExt;
25use futures::StreamExt;
26use itertools::Itertools;
27use mz_build_info::BuildInfo;
28use mz_cluster_client::client::ClusterReplicaLocation;
29use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
30use mz_cluster_client::{ReplicaId, WallclockLagFn};
31use mz_controller_types::dyncfgs::{
32 ENABLE_0DT_DEPLOYMENT_SOURCES, ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION,
33 WALLCLOCK_LAG_RECORDING_INTERVAL,
34};
35use mz_ore::collections::CollectionExt;
36use mz_ore::metrics::MetricsRegistry;
37use mz_ore::now::{EpochMillis, NowFn};
38use mz_ore::task::AbortOnDropHandle;
39use mz_ore::{assert_none, halt, instrument, soft_panic_or_log};
40use mz_persist_client::batch::ProtoBatch;
41use mz_persist_client::cache::PersistClientCache;
42use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
43use mz_persist_client::read::ReadHandle;
44use mz_persist_client::schema::CaESchema;
45use mz_persist_client::write::WriteHandle;
46use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
47use mz_persist_types::Codec64;
48use mz_persist_types::codec_impls::UnitSchema;
49use mz_proto::RustType;
50use mz_repr::adt::timestamp::CheckedTimestamp;
51use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
52use mz_storage_client::client::{
53 AppendOnlyUpdate, ProtoStorageCommand, ProtoStorageResponse, RunIngestionCommand,
54 RunOneshotIngestion, RunSinkCommand, Status, StatusUpdate, StorageCommand, StorageResponse,
55 TableData,
56};
57use mz_storage_client::controller::{
58 BoxFuture, CollectionDescription, DataSource, ExportDescription, ExportState,
59 IntrospectionType, MonotonicAppender, PersistEpoch, Response, StorageController,
60 StorageMetadata, StorageTxn, StorageWriteOp, WallclockLag, WallclockLagHistogramPeriod,
61};
62use mz_storage_client::healthcheck::{
63 MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC,
64 MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_STATUS_HISTORY_DESC,
65};
66use mz_storage_client::metrics::StorageControllerMetrics;
67use mz_storage_client::statistics::{
68 SinkStatisticsUpdate, SourceStatisticsUpdate, WebhookStatistics,
69};
70use mz_storage_client::storage_collections::StorageCollections;
71use mz_storage_types::configuration::StorageConfiguration;
72use mz_storage_types::connections::ConnectionContext;
73use mz_storage_types::connections::inline::InlinedConnection;
74use mz_storage_types::controller::{AlterError, CollectionMetadata, StorageError, TxnsCodecRow};
75use mz_storage_types::instances::StorageInstanceId;
76use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
77use mz_storage_types::parameters::StorageParameters;
78use mz_storage_types::read_holds::ReadHold;
79use mz_storage_types::read_policy::ReadPolicy;
80use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
81use mz_storage_types::sources::{
82 GenericSourceConnection, IngestionDescription, SourceConnection, SourceData, SourceDesc,
83 SourceExport, SourceExportDataConfig,
84};
85use mz_storage_types::{AlterCompatible, StorageDiff, dyncfgs};
86use mz_txn_wal::metrics::Metrics as TxnMetrics;
87use mz_txn_wal::txn_read::TxnsRead;
88use mz_txn_wal::txns::TxnsHandle;
89use timely::order::{PartialOrder, TotalOrder};
90use timely::progress::Timestamp as TimelyTimestamp;
91use timely::progress::frontier::MutableAntichain;
92use timely::progress::{Antichain, ChangeBatch, Timestamp};
93use tokio::sync::watch::{Sender, channel};
94use tokio::sync::{mpsc, oneshot};
95use tokio::time::MissedTickBehavior;
96use tokio::time::error::Elapsed;
97use tracing::{debug, info, warn};
98
99use crate::collection_mgmt::{
100 AppendOnlyIntrospectionConfig, CollectionManagerKind, DifferentialIntrospectionConfig,
101};
102use crate::instance::{Instance, ReplicaConfig};
103use crate::statistics::StatsState;
104
105mod collection_mgmt;
106mod history;
107mod instance;
108mod persist_handles;
109mod rtr;
110mod statistics;
111
112#[derive(Derivative)]
113#[derivative(Debug)]
114struct PendingOneshotIngestion {
115 #[derivative(Debug = "ignore")]
117 result_tx: OneshotResultCallback<ProtoBatch>,
118 cluster_id: StorageInstanceId,
120}
121
122impl PendingOneshotIngestion {
123 pub(crate) fn cancel(self) {
127 (self.result_tx)(vec![Err("canceled".to_string())])
128 }
129}
130
131#[derive(Derivative)]
133#[derivative(Debug)]
134pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation>
135{
136 build_info: &'static BuildInfo,
138 now: NowFn,
140 envd_epoch: NonZeroI64,
142
143 read_only: bool,
149
150 pub(crate) collections: BTreeMap<GlobalId, CollectionState<T>>,
155
156 dropped_objects: BTreeMap<GlobalId, BTreeSet<ReplicaId>>,
165
166 pub(crate) persist_table_worker: persist_handles::PersistTableWriteWorker<T>,
168 txns_read: TxnsRead<T>,
170 txns_metrics: Arc<TxnMetrics>,
171 stashed_responses: Vec<(Option<ReplicaId>, StorageResponse<T>)>,
172 #[derivative(Debug = "ignore")]
174 pending_table_handle_drops_tx: mpsc::UnboundedSender<GlobalId>,
175 #[derivative(Debug = "ignore")]
177 pending_table_handle_drops_rx: mpsc::UnboundedReceiver<GlobalId>,
178 #[derivative(Debug = "ignore")]
180 pending_oneshot_ingestions: BTreeMap<uuid::Uuid, PendingOneshotIngestion>,
181
182 pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
184
185 pub(crate) introspection_ids: BTreeMap<IntrospectionType, GlobalId>,
187 introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
192
193 source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
198 sink_statistics: Arc<Mutex<BTreeMap<GlobalId, statistics::StatsState<SinkStatisticsUpdate>>>>,
201 statistics_interval_sender: Sender<Duration>,
203
204 instances: BTreeMap<StorageInstanceId, Instance<T>>,
206 initialized: bool,
208 config: StorageConfiguration,
210 persist_location: PersistLocation,
212 persist: Arc<PersistClientCache>,
214 metrics: StorageControllerMetrics,
216 recorded_frontiers: BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)>,
219 recorded_replica_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<T>>,
222
223 #[derivative(Debug = "ignore")]
225 wallclock_lag: WallclockLagFn<T>,
226 wallclock_lag_last_recorded: DateTime<Utc>,
228
229 storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
231 migrated_storage_collections: BTreeSet<GlobalId>,
233
234 maintenance_ticker: tokio::time::Interval,
236 maintenance_scheduled: bool,
238
239 instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
241 instance_response_rx: mpsc::UnboundedReceiver<(Option<ReplicaId>, StorageResponse<T>)>,
243
244 persist_warm_task: Option<AbortOnDropHandle<Box<dyn Debug + Send>>>,
246}
247
248fn warm_persist_state_in_background(
253 client: PersistClient,
254 shard_ids: impl Iterator<Item = ShardId> + Send + 'static,
255) -> mz_ore::task::JoinHandle<Box<dyn Debug + Send>> {
256 const MAX_CONCURRENT_WARMS: usize = 16;
258 let logic = async move {
259 let fetchers: Vec<_> = tokio_stream::iter(shard_ids)
260 .map(|shard_id| {
261 let client = client.clone();
262 async move {
263 client
264 .create_batch_fetcher::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
265 shard_id,
266 Arc::new(RelationDesc::empty()),
267 Arc::new(UnitSchema),
268 true,
269 Diagnostics::from_purpose("warm persist load state"),
270 )
271 .await
272 }
273 })
274 .buffer_unordered(MAX_CONCURRENT_WARMS)
275 .collect()
276 .await;
277 let fetchers: Box<dyn Debug + Send> = Box::new(fetchers);
278 fetchers
279 };
280 mz_ore::task::spawn(|| "warm_persist_load_state", logic)
281}
282
283#[async_trait(?Send)]
284impl<T> StorageController for Controller<T>
285where
286 T: Timestamp
287 + Lattice
288 + TotalOrder
289 + Codec64
290 + From<EpochMillis>
291 + TimestampManipulation
292 + Into<Datum<'static>>
293 + Display,
294 StorageCommand<T>: RustType<ProtoStorageCommand>,
295 StorageResponse<T>: RustType<ProtoStorageResponse>,
296{
297 type Timestamp = T;
298
299 fn initialization_complete(&mut self) {
300 self.reconcile_dangling_statistics();
301 self.initialized = true;
302
303 for instance in self.instances.values_mut() {
304 instance.send(StorageCommand::InitializationComplete);
305 }
306 }
307
308 fn update_parameters(&mut self, config_params: StorageParameters) {
309 self.storage_collections
310 .update_parameters(config_params.clone());
311
312 self.persist.cfg().apply_from(&config_params.dyncfg_updates);
315
316 for instance in self.instances.values_mut() {
317 let params = Box::new(config_params.clone());
318 instance.send(StorageCommand::UpdateConfiguration(params));
319 }
320 self.config.update(config_params);
321 self.statistics_interval_sender
322 .send_replace(self.config.parameters.statistics_interval);
323 self.collection_manager.update_user_batch_duration(
324 self.config
325 .parameters
326 .user_storage_managed_collections_batch_duration,
327 );
328 }
329
330 fn config(&self) -> &StorageConfiguration {
332 &self.config
333 }
334
335 fn collection_metadata(
336 &self,
337 id: GlobalId,
338 ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>> {
339 self.storage_collections.collection_metadata(id)
340 }
341
342 fn collection_hydrated(
343 &self,
344 collection_id: GlobalId,
345 ) -> Result<bool, StorageError<Self::Timestamp>> {
346 let collection = self.collection(collection_id)?;
347
348 let instance_id = match &collection.data_source {
349 DataSource::Ingestion(ingestion_description) => ingestion_description.instance_id,
350 DataSource::IngestionExport { ingestion_id, .. } => {
351 let ingestion_state = self.collections.get(ingestion_id).expect("known to exist");
352
353 let instance_id = match &ingestion_state.data_source {
354 DataSource::Ingestion(ingestion_desc) => ingestion_desc.instance_id,
355 _ => unreachable!("SourceExport must only refer to primary source"),
356 };
357
358 instance_id
359 }
360 _ => return Ok(true),
361 };
362
363 let instance = self.instances.get(&instance_id).ok_or_else(|| {
364 StorageError::IngestionInstanceMissing {
365 storage_instance_id: instance_id,
366 ingestion_id: collection_id,
367 }
368 })?;
369
370 if instance.replica_ids().next().is_none() {
371 return Ok(true);
374 }
375
376 match &collection.extra_state {
377 CollectionStateExtra::Ingestion(ingestion_state) => {
378 Ok(ingestion_state.hydrated_on.len() >= 1)
380 }
381 CollectionStateExtra::Export(_) => {
382 Ok(true)
387 }
388 CollectionStateExtra::None => {
389 Ok(true)
393 }
394 }
395 }
396
397 #[mz_ore::instrument(level = "debug")]
398 fn collections_hydrated_on_replicas(
399 &self,
400 target_replica_ids: Option<Vec<ReplicaId>>,
401 exclude_collections: &BTreeSet<GlobalId>,
402 ) -> Result<bool, StorageError<Self::Timestamp>> {
403 let target_replicas: Option<BTreeSet<ReplicaId>> =
406 target_replica_ids.map(|ids| ids.into_iter().collect());
407
408 let mut all_hydrated = true;
409 for (collection_id, collection_state) in self.collections.iter() {
410 if collection_id.is_transient() || exclude_collections.contains(collection_id) {
411 continue;
412 }
413 let hydrated = match &collection_state.extra_state {
414 CollectionStateExtra::Ingestion(state) => {
415 match &target_replicas {
416 Some(target_replicas) => !state.hydrated_on.is_disjoint(target_replicas),
417 None => {
418 state.hydrated_on.len() >= 1
421 }
422 }
423 }
424 CollectionStateExtra::Export(_) => {
425 true
430 }
431 CollectionStateExtra::None => {
432 true
436 }
437 };
438 if !hydrated {
439 tracing::info!(%collection_id, "collection is not hydrated on any replica");
440 all_hydrated = false;
441 }
444 }
445 Ok(all_hydrated)
446 }
447
448 fn collection_frontiers(
449 &self,
450 id: GlobalId,
451 ) -> Result<
452 (Antichain<Self::Timestamp>, Antichain<Self::Timestamp>),
453 StorageError<Self::Timestamp>,
454 > {
455 let frontiers = self.storage_collections.collection_frontiers(id)?;
456 Ok((frontiers.implied_capability, frontiers.write_frontier))
457 }
458
459 fn collections_frontiers(
460 &self,
461 mut ids: Vec<GlobalId>,
462 ) -> Result<Vec<(GlobalId, Antichain<T>, Antichain<T>)>, StorageError<Self::Timestamp>> {
463 let mut result = vec![];
464 ids.retain(|&id| match self.export(id) {
469 Ok(export) => {
470 result.push((
471 id,
472 export.input_hold().since().clone(),
473 export.write_frontier.clone(),
474 ));
475 false
476 }
477 Err(_) => true,
478 });
479 result.extend(
480 self.storage_collections
481 .collections_frontiers(ids)?
482 .into_iter()
483 .map(|frontiers| {
484 (
485 frontiers.id,
486 frontiers.implied_capability,
487 frontiers.write_frontier,
488 )
489 }),
490 );
491
492 Ok(result)
493 }
494
495 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
496 self.storage_collections.active_collection_metadatas()
497 }
498
499 fn active_ingestions(
500 &self,
501 instance_id: StorageInstanceId,
502 ) -> Box<dyn Iterator<Item = &GlobalId> + '_> {
503 Box::new(self.instances[&instance_id].active_ingestions())
504 }
505
506 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
507 self.storage_collections.check_exists(id)
508 }
509
510 fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>) {
511 let metrics = self.metrics.for_instance(id);
512 let mut instance = Instance::new(
513 workload_class,
514 self.envd_epoch,
515 metrics,
516 Arc::clone(self.config().config_set()),
517 self.now.clone(),
518 self.instance_response_tx.clone(),
519 );
520 if self.initialized {
521 instance.send(StorageCommand::InitializationComplete);
522 }
523 if !self.read_only {
524 instance.send(StorageCommand::AllowWrites);
525 }
526
527 let params = Box::new(self.config.parameters.clone());
528 instance.send(StorageCommand::UpdateConfiguration(params));
529
530 let old_instance = self.instances.insert(id, instance);
531 assert_none!(old_instance, "storage instance {id} already exists");
532 }
533
534 fn drop_instance(&mut self, id: StorageInstanceId) {
535 let instance = self.instances.remove(&id);
536 assert!(instance.is_some(), "storage instance {id} does not exist");
537 }
538
539 fn update_instance_workload_class(
540 &mut self,
541 id: StorageInstanceId,
542 workload_class: Option<String>,
543 ) {
544 let instance = self
545 .instances
546 .get_mut(&id)
547 .unwrap_or_else(|| panic!("instance {id} does not exist"));
548
549 instance.workload_class = workload_class;
550 }
551
552 fn connect_replica(
553 &mut self,
554 instance_id: StorageInstanceId,
555 replica_id: ReplicaId,
556 location: ClusterReplicaLocation,
557 ) {
558 let instance = self
559 .instances
560 .get_mut(&instance_id)
561 .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
562
563 let config = ReplicaConfig {
564 build_info: self.build_info,
565 location,
566 grpc_client: self.config.parameters.grpc_client.clone(),
567 };
568 instance.add_replica(replica_id, config);
569 }
570
571 fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId) {
572 let instance = self
573 .instances
574 .get_mut(&instance_id)
575 .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
576
577 let status_now = mz_ore::now::to_datetime((self.now)());
578 let mut source_status_updates = vec![];
579 let mut sink_status_updates = vec![];
580
581 let make_update = |id, object_type| StatusUpdate {
582 id,
583 status: Status::Paused,
584 timestamp: status_now,
585 error: None,
586 hints: BTreeSet::from([format!(
587 "The replica running this {object_type} has been dropped"
588 )]),
589 namespaced_errors: Default::default(),
590 replica_id: Some(replica_id),
591 };
592
593 for id in instance.active_ingestions() {
594 if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
595 active_replicas.remove(&replica_id);
596 if active_replicas.is_empty() {
597 self.dropped_objects.remove(id);
598 }
599 }
600
601 let ingestion = self
602 .collections
603 .get_mut(id)
604 .expect("instance contains unknown ingestion");
605
606 let ingestion_description = match &ingestion.data_source {
607 DataSource::Ingestion(ingestion_description) => ingestion_description.clone(),
608 _ => panic!(
609 "unexpected data source for ingestion: {:?}",
610 ingestion.data_source
611 ),
612 };
613
614 let subsource_ids = ingestion_description
624 .collection_ids()
625 .filter(|id| id != &ingestion_description.remap_collection_id);
626 for id in subsource_ids {
627 source_status_updates.push(make_update(id, "source"));
628 }
629 }
630
631 for id in instance.active_exports() {
632 if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
633 active_replicas.remove(&replica_id);
634 if active_replicas.is_empty() {
635 self.dropped_objects.remove(id);
636 }
637 }
638
639 sink_status_updates.push(make_update(*id, "sink"));
640 }
641
642 instance.drop_replica(replica_id);
643
644 if !self.read_only {
645 if !source_status_updates.is_empty() {
646 self.append_status_introspection_updates(
647 IntrospectionType::SourceStatusHistory,
648 source_status_updates,
649 );
650 }
651 if !sink_status_updates.is_empty() {
652 self.append_status_introspection_updates(
653 IntrospectionType::SinkStatusHistory,
654 sink_status_updates,
655 );
656 }
657 }
658 }
659
660 async fn evolve_nullability_for_bootstrap(
661 &mut self,
662 storage_metadata: &StorageMetadata,
663 collections: Vec<(GlobalId, RelationDesc)>,
664 ) -> Result<(), StorageError<Self::Timestamp>> {
665 let persist_client = self
666 .persist
667 .open(self.persist_location.clone())
668 .await
669 .unwrap();
670
671 for (global_id, relation_desc) in collections {
672 let shard_id = storage_metadata.get_collection_shard(global_id)?;
673 let diagnostics = Diagnostics {
674 shard_name: global_id.to_string(),
675 handle_purpose: "evolve nullability for bootstrap".to_string(),
676 };
677 let latest_schema = persist_client
678 .latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
679 .await
680 .expect("invalid persist usage");
681 let Some((schema_id, current_schema, _)) = latest_schema else {
682 tracing::debug!(?global_id, "no schema registered");
683 continue;
684 };
685 tracing::debug!(?global_id, ?current_schema, new_schema = ?relation_desc, "migrating schema");
686
687 let diagnostics = Diagnostics {
688 shard_name: global_id.to_string(),
689 handle_purpose: "evolve nullability for bootstrap".to_string(),
690 };
691 let evolve_result = persist_client
692 .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
693 shard_id,
694 schema_id,
695 &relation_desc,
696 &UnitSchema,
697 diagnostics,
698 )
699 .await
700 .expect("invalid persist usage");
701 match evolve_result {
702 CaESchema::Ok(_) => (),
703 CaESchema::ExpectedMismatch {
704 schema_id,
705 key,
706 val: _,
707 } => {
708 return Err(StorageError::PersistSchemaEvolveRace {
709 global_id,
710 shard_id,
711 schema_id,
712 relation_desc: key,
713 });
714 }
715 CaESchema::Incompatible => {
716 return Err(StorageError::PersistInvalidSchemaEvolve {
717 global_id,
718 shard_id,
719 });
720 }
721 };
722 }
723
724 Ok(())
725 }
726
727 #[instrument(name = "storage::create_collections")]
746 async fn create_collections_for_bootstrap(
747 &mut self,
748 storage_metadata: &StorageMetadata,
749 register_ts: Option<Self::Timestamp>,
750 mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
751 migrated_storage_collections: &BTreeSet<GlobalId>,
752 ) -> Result<(), StorageError<Self::Timestamp>> {
753 self.migrated_storage_collections
754 .extend(migrated_storage_collections.iter().cloned());
755
756 self.storage_collections
757 .create_collections_for_bootstrap(
758 storage_metadata,
759 register_ts.clone(),
760 collections.clone(),
761 migrated_storage_collections,
762 )
763 .await?;
764
765 drop(self.persist_warm_task.take());
768
769 collections.sort_by_key(|(id, _)| *id);
774 collections.dedup();
775 for pos in 1..collections.len() {
776 if collections[pos - 1].0 == collections[pos].0 {
777 return Err(StorageError::CollectionIdReused(collections[pos].0));
778 }
779 }
780
781 let enriched_with_metadata = collections
783 .into_iter()
784 .map(|(id, description)| {
785 let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
786
787 let get_shard = |id| -> Result<ShardId, StorageError<T>> {
788 let shard = storage_metadata.get_collection_shard::<T>(id)?;
789 Ok(shard)
790 };
791
792 let remap_shard = match &description.data_source {
793 DataSource::Ingestion(IngestionDescription {
795 remap_collection_id,
796 ..
797 }) => {
798 Some(get_shard(*remap_collection_id)?)
801 }
802 _ => None,
803 };
804
805 let txns_shard = description
808 .data_source
809 .in_txns()
810 .then(|| *self.txns_read.txns_id());
811
812 let metadata = CollectionMetadata {
813 persist_location: self.persist_location.clone(),
814 remap_shard,
815 data_shard,
816 relation_desc: description.desc.clone(),
817 txns_shard,
818 };
819
820 Ok((id, description, metadata))
821 })
822 .collect_vec();
823
824 let persist_client = self
826 .persist
827 .open(self.persist_location.clone())
828 .await
829 .unwrap();
830 let persist_client = &persist_client;
831
832 use futures::stream::{StreamExt, TryStreamExt};
835 let this = &*self;
836 let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
837 .map(|data: Result<_, StorageError<Self::Timestamp>>| {
838 async move {
839 let (id, description, metadata) = data?;
840
841 debug!(
844 "mapping GlobalId={} to remap shard ({:?}), data shard ({})",
845 id, metadata.remap_shard, metadata.data_shard
846 );
847
848 let write = this
849 .open_data_handles(
850 &id,
851 metadata.data_shard,
852 metadata.relation_desc.clone(),
853 persist_client,
854 )
855 .await;
856
857 Ok::<_, StorageError<T>>((id, description, write, metadata))
858 }
859 })
860 .buffer_unordered(50)
862 .try_collect()
875 .await?;
876
877 let mut to_execute = BTreeSet::new();
880 let mut new_collections = BTreeSet::new();
885 let mut table_registers = Vec::with_capacity(to_register.len());
886
887 to_register.sort_by_key(|(id, ..)| *id);
889
890 let (tables_to_register, collections_to_register): (Vec<_>, Vec<_>) = to_register
896 .into_iter()
897 .partition(|(_id, desc, ..)| matches!(desc.data_source, DataSource::Table { .. }));
898 let to_register = tables_to_register
899 .into_iter()
900 .rev()
901 .chain(collections_to_register.into_iter());
902
903 let mut new_source_statistic_entries = BTreeSet::new();
907 let mut new_webhook_statistic_entries = BTreeSet::new();
908 let mut new_sink_statistic_entries = BTreeSet::new();
909
910 for (id, description, write, metadata) in to_register {
911 let is_in_txns = |id, metadata: &CollectionMetadata| {
912 metadata.txns_shard.is_some()
913 && !(self.read_only && migrated_storage_collections.contains(&id))
914 };
915
916 let mut data_source = description.data_source;
917
918 to_execute.insert(id);
919 new_collections.insert(id);
920
921 if let DataSource::Ingestion(ingestion) = &mut data_source {
926 let export = ingestion.desc.primary_source_export();
927 ingestion.source_exports.insert(id, export);
928 }
929
930 let write_frontier = write.upper();
931
932 let storage_dependencies = self.determine_collection_dependencies(id, &data_source)?;
934
935 let dependency_read_holds = self
936 .storage_collections
937 .acquire_read_holds(storage_dependencies)
938 .expect("can acquire read holds");
939
940 let mut dependency_since = Antichain::from_elem(T::minimum());
941 for read_hold in dependency_read_holds.iter() {
942 dependency_since.join_assign(read_hold.since());
943 }
944
945 if !dependency_read_holds.is_empty()
954 && !is_in_txns(id, &metadata)
955 && !matches!(&data_source, DataSource::Sink { .. })
956 {
957 if dependency_since.is_empty() {
963 halt!(
964 "dependency since frontier is empty while dependent upper \
965 is not empty (dependent id={id}, write_frontier={:?}, dependency_read_holds={:?}), \
966 this indicates concurrent deletion of a collection",
967 write_frontier,
968 dependency_read_holds,
969 );
970 }
971
972 mz_ore::soft_assert_or_log!(
990 write_frontier.elements() == &[T::minimum()]
991 || write_frontier.is_empty()
992 || PartialOrder::less_than(&dependency_since, write_frontier),
993 "dependency since has advanced past dependent ({id}) upper \n
994 dependent ({id}): upper {:?} \n
995 dependency since {:?} \n
996 dependency read holds: {:?}",
997 write_frontier,
998 dependency_since,
999 dependency_read_holds,
1000 );
1001 }
1002
1003 let mut extra_state = CollectionStateExtra::None;
1005 let mut maybe_instance_id = None;
1006 match &data_source {
1007 DataSource::Introspection(typ) => {
1008 debug!(
1009 ?data_source, meta = ?metadata,
1010 "registering {id} with persist monotonic worker",
1011 );
1012 self.register_introspection_collection(
1018 id,
1019 *typ,
1020 write,
1021 persist_client.clone(),
1022 )?;
1023 }
1024 DataSource::Webhook => {
1025 debug!(
1026 ?data_source, meta = ?metadata,
1027 "registering {id} with persist monotonic worker",
1028 );
1029 new_source_statistic_entries.insert(id);
1030 new_webhook_statistic_entries.insert(id);
1033 self.collection_manager
1039 .register_append_only_collection(id, write, false, None);
1040 }
1041 DataSource::IngestionExport {
1042 ingestion_id,
1043 details,
1044 data_config,
1045 } => {
1046 debug!(
1047 ?data_source, meta = ?metadata,
1048 "not registering {id} with a controller persist worker",
1049 );
1050 let ingestion_state = self
1052 .collections
1053 .get_mut(ingestion_id)
1054 .expect("known to exist");
1055
1056 let instance_id = match &mut ingestion_state.data_source {
1057 DataSource::Ingestion(ingestion_desc) => {
1058 ingestion_desc.source_exports.insert(
1059 id,
1060 SourceExport {
1061 storage_metadata: (),
1062 details: details.clone(),
1063 data_config: data_config.clone(),
1064 },
1065 );
1066
1067 ingestion_desc.instance_id
1072 }
1073 _ => unreachable!(
1074 "SourceExport must only refer to primary sources that already exist"
1075 ),
1076 };
1077
1078 to_execute.remove(&id);
1080 to_execute.insert(*ingestion_id);
1081
1082 let ingestion_state = IngestionState {
1083 read_capabilities: MutableAntichain::from(dependency_since.clone()),
1084 dependency_read_holds,
1085 derived_since: dependency_since,
1086 write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1087 hold_policy: ReadPolicy::step_back(),
1088 instance_id,
1089 hydrated_on: BTreeSet::new(),
1090 };
1091
1092 extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1093 maybe_instance_id = Some(instance_id);
1094
1095 new_source_statistic_entries.insert(id);
1096 }
1097 DataSource::Table { .. } => {
1098 debug!(
1099 ?data_source, meta = ?metadata,
1100 "registering {id} with persist table worker",
1101 );
1102 table_registers.push((id, write));
1103 }
1104 DataSource::Progress | DataSource::Other => {
1105 debug!(
1106 ?data_source, meta = ?metadata,
1107 "not registering {id} with a controller persist worker",
1108 );
1109 }
1110 DataSource::Ingestion(ingestion_desc) => {
1111 debug!(
1112 ?data_source, meta = ?metadata,
1113 "not registering {id} with a controller persist worker",
1114 );
1115
1116 let mut dependency_since = Antichain::from_elem(T::minimum());
1117 for read_hold in dependency_read_holds.iter() {
1118 dependency_since.join_assign(read_hold.since());
1119 }
1120
1121 let ingestion_state = IngestionState {
1122 read_capabilities: MutableAntichain::from(dependency_since.clone()),
1123 dependency_read_holds,
1124 derived_since: dependency_since,
1125 write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1126 hold_policy: ReadPolicy::step_back(),
1127 instance_id: ingestion_desc.instance_id,
1128 hydrated_on: BTreeSet::new(),
1129 };
1130
1131 extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1132 maybe_instance_id = Some(ingestion_desc.instance_id);
1133
1134 new_source_statistic_entries.insert(id);
1135 }
1136 DataSource::Sink { desc } => {
1137 let mut dependency_since = Antichain::from_elem(T::minimum());
1138 for read_hold in dependency_read_holds.iter() {
1139 dependency_since.join_assign(read_hold.since());
1140 }
1141
1142 let [self_hold, read_hold] =
1143 dependency_read_holds.try_into().expect("two holds");
1144
1145 let state = ExportState::new(
1146 desc.instance_id,
1147 read_hold,
1148 self_hold,
1149 write_frontier.clone(),
1150 ReadPolicy::step_back(),
1151 );
1152 maybe_instance_id = Some(state.cluster_id);
1153 extra_state = CollectionStateExtra::Export(state);
1154
1155 new_sink_statistic_entries.insert(id);
1156 }
1157 }
1158
1159 let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(id, maybe_instance_id);
1160 let collection_state =
1161 CollectionState::new(data_source, metadata, extra_state, wallclock_lag_metrics);
1162
1163 self.collections.insert(id, collection_state);
1164 }
1165
1166 {
1167 let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1173 let mut sink_statistics = self.sink_statistics.lock().expect("poisoned");
1174
1175 for id in new_source_statistic_entries {
1176 source_statistics
1177 .source_statistics
1178 .entry(id)
1179 .or_insert_with(|| StatsState::new(SourceStatisticsUpdate::new(id)));
1180 }
1181 for id in new_webhook_statistic_entries {
1182 source_statistics.webhook_statistics.entry(id).or_default();
1183 }
1184 for id in new_sink_statistic_entries {
1185 sink_statistics
1186 .entry(id)
1187 .or_insert_with(|| StatsState::new(SinkStatisticsUpdate::new(id)));
1188 }
1189 }
1190
1191 if !table_registers.is_empty() {
1193 let register_ts = register_ts
1194 .expect("caller should have provided a register_ts when creating a table");
1195
1196 if self.read_only {
1197 table_registers
1207 .retain(|(id, _write_handle)| migrated_storage_collections.contains(id));
1208
1209 self.persist_table_worker
1210 .register(register_ts, table_registers)
1211 .await
1212 .expect("table worker unexpectedly shut down");
1213 } else {
1214 self.persist_table_worker
1215 .register(register_ts, table_registers)
1216 .await
1217 .expect("table worker unexpectedly shut down");
1218 }
1219 }
1220
1221 self.append_shard_mappings(new_collections.into_iter(), Diff::ONE);
1222
1223 for id in to_execute {
1225 match &self.collection(id)?.data_source {
1226 DataSource::Ingestion(ingestion) => {
1227 if !self.read_only
1228 || (ENABLE_0DT_DEPLOYMENT_SOURCES.get(self.config.config_set())
1229 && ingestion.desc.connection.supports_read_only())
1230 {
1231 self.run_ingestion(id)?;
1232 }
1233 }
1234 DataSource::IngestionExport { .. } => unreachable!(
1235 "ingestion exports do not execute directly, but instead schedule their source to be re-executed"
1236 ),
1237 DataSource::Introspection(_)
1238 | DataSource::Webhook
1239 | DataSource::Table { .. }
1240 | DataSource::Progress
1241 | DataSource::Other => {}
1242 DataSource::Sink { .. } => {
1243 if !self.read_only {
1244 self.run_export(id)?;
1245 }
1246 }
1247 };
1248 }
1249
1250 Ok(())
1251 }
1252
1253 fn check_alter_ingestion_source_desc(
1254 &mut self,
1255 ingestion_id: GlobalId,
1256 source_desc: &SourceDesc,
1257 ) -> Result<(), StorageError<Self::Timestamp>> {
1258 let source_collection = self.collection(ingestion_id)?;
1259 let data_source = &source_collection.data_source;
1260 match &data_source {
1261 DataSource::Ingestion(cur_ingestion) => {
1262 cur_ingestion
1263 .desc
1264 .alter_compatible(ingestion_id, source_desc)?;
1265 }
1266 o => {
1267 tracing::info!(
1268 "{ingestion_id} inalterable because its data source is {:?} and not an ingestion",
1269 o
1270 );
1271 Err(AlterError { id: ingestion_id })?
1272 }
1273 }
1274
1275 Ok(())
1276 }
1277
1278 async fn alter_ingestion_source_desc(
1279 &mut self,
1280 ingestion_id: GlobalId,
1281 source_desc: SourceDesc,
1282 ) -> Result<(), StorageError<Self::Timestamp>> {
1283 self.check_alter_ingestion_source_desc(ingestion_id, &source_desc)?;
1284
1285 let collection = self
1288 .collections
1289 .get_mut(&ingestion_id)
1290 .expect("validated exists");
1291 let curr_ingestion = match &mut collection.data_source {
1292 DataSource::Ingestion(curr_ingestion) => curr_ingestion,
1293 _ => unreachable!("verified collection refers to ingestion"),
1294 };
1295
1296 curr_ingestion.desc = source_desc;
1297 tracing::debug!("altered {ingestion_id}'s SourceDesc");
1298
1299 Ok(())
1309 }
1310
1311 async fn alter_ingestion_connections(
1312 &mut self,
1313 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
1314 ) -> Result<(), StorageError<Self::Timestamp>> {
1315 self.storage_collections
1317 .alter_ingestion_connections(source_connections.clone())
1318 .await?;
1319
1320 let mut ingestions_to_run = BTreeSet::new();
1321
1322 for (id, conn) in source_connections {
1323 let collection = self
1324 .collections
1325 .get_mut(&id)
1326 .ok_or_else(|| StorageError::IdentifierMissing(id))?;
1327
1328 match &mut collection.data_source {
1329 DataSource::Ingestion(ingestion) => {
1330 if ingestion.desc.connection != conn {
1333 tracing::info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
1334 ingestion.desc.connection = conn;
1335 ingestions_to_run.insert(id);
1336 } else {
1337 tracing::warn!(
1338 "update_source_connection called on {id} but the \
1339 connection was the same"
1340 );
1341 }
1342 }
1343 o => {
1344 tracing::warn!("update_source_connection called on {:?}", o);
1345 Err(StorageError::IdentifierInvalid(id))?;
1346 }
1347 }
1348 }
1349
1350 for id in ingestions_to_run {
1351 self.run_ingestion(id)?;
1352 }
1353 Ok(())
1354 }
1355
1356 async fn alter_ingestion_export_data_configs(
1357 &mut self,
1358 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
1359 ) -> Result<(), StorageError<Self::Timestamp>> {
1360 self.storage_collections
1362 .alter_ingestion_export_data_configs(source_exports.clone())
1363 .await?;
1364
1365 let mut ingestions_to_run = BTreeSet::new();
1366
1367 for (source_export_id, new_data_config) in source_exports {
1368 let source_export_collection = self
1371 .collections
1372 .get_mut(&source_export_id)
1373 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1374 let ingestion_id = match &mut source_export_collection.data_source {
1375 DataSource::IngestionExport {
1376 ingestion_id,
1377 details: _,
1378 data_config,
1379 } => {
1380 *data_config = new_data_config.clone();
1381 *ingestion_id
1382 }
1383 o => {
1384 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1385 Err(StorageError::IdentifierInvalid(source_export_id))?
1386 }
1387 };
1388 let ingestion_collection = self
1391 .collections
1392 .get_mut(&ingestion_id)
1393 .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
1394
1395 match &mut ingestion_collection.data_source {
1396 DataSource::Ingestion(ingestion_desc) => {
1397 let source_export = ingestion_desc
1398 .source_exports
1399 .get_mut(&source_export_id)
1400 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1401
1402 if source_export.data_config != new_data_config {
1405 tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
1406 source_export.data_config = new_data_config;
1407
1408 ingestions_to_run.insert(ingestion_id);
1409 } else {
1410 tracing::warn!(
1411 "alter_ingestion_export_data_configs called on \
1412 export {source_export_id} of {ingestion_id} but \
1413 the data config was the same"
1414 );
1415 }
1416 }
1417 o => {
1418 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1419 Err(StorageError::IdentifierInvalid(ingestion_id))?
1420 }
1421 }
1422 }
1423
1424 for id in ingestions_to_run {
1425 self.run_ingestion(id)?;
1426 }
1427 Ok(())
1428 }
1429
1430 async fn alter_table_desc(
1431 &mut self,
1432 existing_collection: GlobalId,
1433 new_collection: GlobalId,
1434 new_desc: RelationDesc,
1435 expected_version: RelationVersion,
1436 register_ts: Self::Timestamp,
1437 ) -> Result<(), StorageError<Self::Timestamp>> {
1438 let data_shard = {
1439 let Controller {
1440 collections,
1441 storage_collections,
1442 ..
1443 } = self;
1444
1445 let existing = collections
1446 .get(&existing_collection)
1447 .ok_or(StorageError::IdentifierMissing(existing_collection))?;
1448 if !matches!(existing.data_source, DataSource::Table { .. }) {
1449 return Err(StorageError::IdentifierInvalid(existing_collection));
1450 }
1451
1452 storage_collections
1454 .alter_table_desc(
1455 existing_collection,
1456 new_collection,
1457 new_desc.clone(),
1458 expected_version,
1459 )
1460 .await?;
1461
1462 existing.collection_metadata.data_shard.clone()
1463 };
1464
1465 let persist_client = self
1466 .persist
1467 .open(self.persist_location.clone())
1468 .await
1469 .expect("invalid persist location");
1470 let write_handle = self
1471 .open_data_handles(
1472 &existing_collection,
1473 data_shard,
1474 new_desc.clone(),
1475 &persist_client,
1476 )
1477 .await;
1478
1479 let collection_desc = CollectionDescription::<T>::for_table(new_desc.clone(), None);
1481 let collection_meta = CollectionMetadata {
1482 persist_location: self.persist_location.clone(),
1483 data_shard,
1484 relation_desc: new_desc.clone(),
1485 remap_shard: None,
1487 txns_shard: Some(self.txns_read.txns_id().clone()),
1488 };
1489 let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(new_collection, None);
1491 let collection_state = CollectionState::new(
1492 collection_desc.data_source.clone(),
1493 collection_meta,
1494 CollectionStateExtra::None,
1495 wallclock_lag_metrics,
1496 );
1497
1498 self.collections.insert(new_collection, collection_state);
1501 let existing = self
1502 .collections
1503 .get_mut(&existing_collection)
1504 .expect("missing existing collection");
1505 assert!(matches!(
1506 existing.data_source,
1507 DataSource::Table { primary: None }
1508 ));
1509 existing.data_source = DataSource::Table {
1510 primary: Some(new_collection),
1511 };
1512
1513 self.persist_table_worker
1514 .register(register_ts, vec![(new_collection, write_handle)])
1515 .await
1516 .expect("table worker unexpectedly shut down");
1517
1518 Ok(())
1519 }
1520
1521 fn export(
1522 &self,
1523 id: GlobalId,
1524 ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1525 self.collections
1526 .get(&id)
1527 .and_then(|c| match &c.extra_state {
1528 CollectionStateExtra::Export(state) => Some(state),
1529 _ => None,
1530 })
1531 .ok_or(StorageError::IdentifierMissing(id))
1532 }
1533
1534 fn export_mut(
1535 &mut self,
1536 id: GlobalId,
1537 ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1538 self.collections
1539 .get_mut(&id)
1540 .and_then(|c| match &mut c.extra_state {
1541 CollectionStateExtra::Export(state) => Some(state),
1542 _ => None,
1543 })
1544 .ok_or(StorageError::IdentifierMissing(id))
1545 }
1546
1547 async fn create_oneshot_ingestion(
1549 &mut self,
1550 ingestion_id: uuid::Uuid,
1551 collection_id: GlobalId,
1552 instance_id: StorageInstanceId,
1553 request: OneshotIngestionRequest,
1554 result_tx: OneshotResultCallback<ProtoBatch>,
1555 ) -> Result<(), StorageError<Self::Timestamp>> {
1556 let collection_meta = self
1557 .collections
1558 .get(&collection_id)
1559 .ok_or_else(|| StorageError::IdentifierMissing(collection_id))?
1560 .collection_metadata
1561 .clone();
1562 let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1563 StorageError::Generic(anyhow::anyhow!("missing cluster {instance_id}"))
1565 })?;
1566 let oneshot_cmd = RunOneshotIngestion {
1567 ingestion_id,
1568 collection_id,
1569 collection_meta,
1570 request,
1571 };
1572
1573 if !self.read_only {
1574 instance.send(StorageCommand::RunOneshotIngestion(Box::new(oneshot_cmd)));
1575 let pending = PendingOneshotIngestion {
1576 result_tx,
1577 cluster_id: instance_id,
1578 };
1579 let novel = self
1580 .pending_oneshot_ingestions
1581 .insert(ingestion_id, pending);
1582 assert_none!(novel);
1583 Ok(())
1584 } else {
1585 Err(StorageError::ReadOnly)
1586 }
1587 }
1588
1589 fn cancel_oneshot_ingestion(
1590 &mut self,
1591 ingestion_id: uuid::Uuid,
1592 ) -> Result<(), StorageError<Self::Timestamp>> {
1593 if self.read_only {
1594 return Err(StorageError::ReadOnly);
1595 }
1596
1597 let pending = self
1598 .pending_oneshot_ingestions
1599 .remove(&ingestion_id)
1600 .ok_or_else(|| {
1601 StorageError::Generic(anyhow::anyhow!("missing oneshot ingestion {ingestion_id}"))
1603 })?;
1604
1605 match self.instances.get_mut(&pending.cluster_id) {
1606 Some(instance) => {
1607 instance.send(StorageCommand::CancelOneshotIngestion(ingestion_id));
1608 }
1609 None => {
1610 mz_ore::soft_panic_or_log!(
1611 "canceling oneshot ingestion on non-existent cluster, ingestion {:?}, instance {}",
1612 ingestion_id,
1613 pending.cluster_id,
1614 );
1615 }
1616 }
1617 pending.cancel();
1619
1620 Ok(())
1621 }
1622
1623 async fn alter_export(
1624 &mut self,
1625 id: GlobalId,
1626 new_description: ExportDescription<Self::Timestamp>,
1627 ) -> Result<(), StorageError<Self::Timestamp>> {
1628 let from_id = new_description.sink.from;
1629
1630 let desired_read_holds = vec![from_id.clone(), id.clone()];
1633 let [input_hold, self_hold] = self
1634 .storage_collections
1635 .acquire_read_holds(desired_read_holds)
1636 .expect("missing dependency")
1637 .try_into()
1638 .expect("expected number of holds");
1639 let from_storage_metadata = self.storage_collections.collection_metadata(from_id)?;
1640 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1641
1642 let cur_export = self.export_mut(id)?;
1644 let input_readable = cur_export
1645 .write_frontier
1646 .iter()
1647 .all(|t| input_hold.since().less_than(t));
1648 if !input_readable {
1649 return Err(StorageError::ReadBeforeSince(from_id));
1650 }
1651
1652 let new_export = ExportState {
1653 read_capabilities: cur_export.read_capabilities.clone(),
1654 cluster_id: new_description.instance_id,
1655 derived_since: cur_export.derived_since.clone(),
1656 read_holds: [input_hold, self_hold],
1657 read_policy: cur_export.read_policy.clone(),
1658 write_frontier: cur_export.write_frontier.clone(),
1659 };
1660 *cur_export = new_export;
1661
1662 let cmd = RunSinkCommand {
1663 id,
1664 description: StorageSinkDesc {
1665 from: from_id,
1666 from_desc: new_description.sink.from_desc,
1667 connection: new_description.sink.connection,
1668 envelope: new_description.sink.envelope,
1669 as_of: new_description.sink.as_of,
1670 version: new_description.sink.version,
1671 from_storage_metadata,
1672 with_snapshot: new_description.sink.with_snapshot,
1673 to_storage_metadata,
1674 },
1675 };
1676
1677 let instance = self
1679 .instances
1680 .get_mut(&new_description.instance_id)
1681 .ok_or_else(|| StorageError::ExportInstanceMissing {
1682 storage_instance_id: new_description.instance_id,
1683 export_id: id,
1684 })?;
1685
1686 instance.send(StorageCommand::RunSink(Box::new(cmd)));
1687 Ok(())
1688 }
1689
1690 async fn alter_export_connections(
1692 &mut self,
1693 exports: BTreeMap<GlobalId, StorageSinkConnection>,
1694 ) -> Result<(), StorageError<Self::Timestamp>> {
1695 let mut updates_by_instance =
1696 BTreeMap::<StorageInstanceId, Vec<(RunSinkCommand<T>, ExportDescription<T>)>>::new();
1697
1698 for (id, connection) in exports {
1699 let (mut new_export_description, as_of): (ExportDescription<Self::Timestamp>, _) = {
1707 let export = &self.collections[&id];
1708 let DataSource::Sink { desc } = &export.data_source else {
1709 panic!("export exists")
1710 };
1711 let CollectionStateExtra::Export(state) = &export.extra_state else {
1712 panic!("export exists")
1713 };
1714 let export_description = desc.clone();
1715 let as_of = state.input_hold().since().clone();
1716
1717 (export_description, as_of)
1718 };
1719 let current_sink = new_export_description.sink.clone();
1720
1721 new_export_description.sink.connection = connection;
1722
1723 current_sink.alter_compatible(id, &new_export_description.sink)?;
1725
1726 let from_storage_metadata = self
1727 .storage_collections
1728 .collection_metadata(new_export_description.sink.from)?;
1729 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1730
1731 let cmd = RunSinkCommand {
1732 id,
1733 description: StorageSinkDesc {
1734 from: new_export_description.sink.from,
1735 from_desc: new_export_description.sink.from_desc.clone(),
1736 connection: new_export_description.sink.connection.clone(),
1737 envelope: new_export_description.sink.envelope,
1738 with_snapshot: new_export_description.sink.with_snapshot,
1739 version: new_export_description.sink.version,
1740 as_of: as_of.to_owned(),
1751 from_storage_metadata,
1752 to_storage_metadata,
1753 },
1754 };
1755
1756 let update = updates_by_instance
1757 .entry(new_export_description.instance_id)
1758 .or_default();
1759 update.push((cmd, new_export_description));
1760 }
1761
1762 for (instance_id, updates) in updates_by_instance {
1763 let mut export_updates = BTreeMap::new();
1764 let mut cmds = Vec::with_capacity(updates.len());
1765
1766 for (cmd, export_state) in updates {
1767 export_updates.insert(cmd.id, export_state);
1768 cmds.push(cmd);
1769 }
1770
1771 let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1773 StorageError::ExportInstanceMissing {
1774 storage_instance_id: instance_id,
1775 export_id: *export_updates
1776 .keys()
1777 .next()
1778 .expect("set of exports not empty"),
1779 }
1780 })?;
1781
1782 for cmd in cmds {
1783 instance.send(StorageCommand::RunSink(Box::new(cmd)));
1784 }
1785
1786 for (id, new_export_description) in export_updates {
1788 let Some(state) = self.collections.get_mut(&id) else {
1789 panic!("export known to exist")
1790 };
1791 let DataSource::Sink { desc } = &mut state.data_source else {
1792 panic!("export known to exist")
1793 };
1794 *desc = new_export_description;
1795 }
1796 }
1797
1798 Ok(())
1799 }
1800
1801 fn drop_tables(
1816 &mut self,
1817 storage_metadata: &StorageMetadata,
1818 identifiers: Vec<GlobalId>,
1819 ts: Self::Timestamp,
1820 ) -> Result<(), StorageError<Self::Timestamp>> {
1821 let (table_write_ids, data_source_ids): (Vec<_>, Vec<_>) = identifiers
1823 .into_iter()
1824 .partition(|id| match self.collections[id].data_source {
1825 DataSource::Table { .. } => true,
1826 DataSource::IngestionExport { .. } | DataSource::Webhook => false,
1827 _ => panic!("identifier is not a table: {}", id),
1828 });
1829
1830 if table_write_ids.len() > 0 {
1832 let drop_notif = self
1833 .persist_table_worker
1834 .drop_handles(table_write_ids.clone(), ts);
1835 let tx = self.pending_table_handle_drops_tx.clone();
1836 mz_ore::task::spawn(|| "table-cleanup".to_string(), async move {
1837 drop_notif.await;
1838 for identifier in table_write_ids {
1839 let _ = tx.send(identifier);
1840 }
1841 });
1842 }
1843
1844 if data_source_ids.len() > 0 {
1846 self.validate_collection_ids(data_source_ids.iter().cloned())?;
1847 self.drop_sources_unvalidated(storage_metadata, data_source_ids)?;
1848 }
1849
1850 Ok(())
1851 }
1852
1853 fn drop_sources(
1854 &mut self,
1855 storage_metadata: &StorageMetadata,
1856 identifiers: Vec<GlobalId>,
1857 ) -> Result<(), StorageError<Self::Timestamp>> {
1858 self.validate_collection_ids(identifiers.iter().cloned())?;
1859 self.drop_sources_unvalidated(storage_metadata, identifiers)
1860 }
1861
1862 fn drop_sources_unvalidated(
1863 &mut self,
1864 storage_metadata: &StorageMetadata,
1865 ids: Vec<GlobalId>,
1866 ) -> Result<(), StorageError<Self::Timestamp>> {
1867 let mut ingestions_to_execute = BTreeSet::new();
1870 let mut ingestions_to_drop = BTreeSet::new();
1871 let mut source_statistics_to_drop = Vec::new();
1872
1873 let mut collections_to_drop = Vec::new();
1877
1878 for id in ids.iter() {
1879 let metadata = storage_metadata.get_collection_shard::<T>(*id);
1880 mz_ore::soft_assert_or_log!(
1881 matches!(metadata, Err(StorageError::IdentifierMissing(_))),
1882 "dropping {id}, but drop was not synchronized with storage \
1883 controller via `synchronize_collections`"
1884 );
1885
1886 let collection_state = self.collections.get(id);
1887
1888 if let Some(collection_state) = collection_state {
1889 match collection_state.data_source {
1890 DataSource::Webhook => {
1891 let fut = self.collection_manager.unregister_collection(*id);
1894 mz_ore::task::spawn(|| format!("storage-webhook-cleanup-{id}"), fut);
1895
1896 collections_to_drop.push(*id);
1897 source_statistics_to_drop.push(*id);
1898 }
1899 DataSource::Ingestion(_) => {
1900 ingestions_to_drop.insert(*id);
1901 source_statistics_to_drop.push(*id);
1902 }
1903 DataSource::IngestionExport { ingestion_id, .. } => {
1904 ingestions_to_execute.insert(ingestion_id);
1911
1912 let ingestion_state = match self.collections.get_mut(&ingestion_id) {
1914 Some(ingestion_collection) => ingestion_collection,
1915 None => {
1917 tracing::error!(
1918 "primary source {ingestion_id} seemingly dropped before subsource {id}"
1919 );
1920 continue;
1921 }
1922 };
1923
1924 match &mut ingestion_state.data_source {
1925 DataSource::Ingestion(ingestion_desc) => {
1926 let removed = ingestion_desc.source_exports.remove(id);
1927 mz_ore::soft_assert_or_log!(
1928 removed.is_some(),
1929 "dropped subsource {id} already removed from source exports"
1930 );
1931 }
1932 _ => unreachable!(
1933 "SourceExport must only refer to primary sources that already exist"
1934 ),
1935 };
1936
1937 ingestions_to_drop.insert(*id);
1941 source_statistics_to_drop.push(*id);
1942 }
1943 DataSource::Progress | DataSource::Table { .. } | DataSource::Other => {
1944 collections_to_drop.push(*id);
1945 }
1946 DataSource::Introspection(_) | DataSource::Sink { .. } => {
1947 soft_panic_or_log!(
1950 "drop_sources called on a {:?} (id={id}))",
1951 collection_state.data_source,
1952 );
1953 }
1954 }
1955 }
1956 }
1957
1958 ingestions_to_execute.retain(|id| !ingestions_to_drop.contains(id));
1960 for ingestion_id in ingestions_to_execute {
1961 self.run_ingestion(ingestion_id)?;
1962 }
1963
1964 let ingestion_policies = ingestions_to_drop
1971 .iter()
1972 .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
1973 .collect();
1974
1975 tracing::debug!(
1976 ?ingestion_policies,
1977 "dropping sources by setting read hold policies"
1978 );
1979 self.set_hold_policies(ingestion_policies);
1980
1981 let shards_to_update: BTreeSet<_> = ingestions_to_drop
1983 .iter()
1984 .chain(collections_to_drop.iter())
1985 .cloned()
1986 .collect();
1987 self.append_shard_mappings(shards_to_update.into_iter(), Diff::MINUS_ONE);
1988
1989 let status_now = mz_ore::now::to_datetime((self.now)());
1990 let mut status_updates = vec![];
1991 for id in ingestions_to_drop.iter() {
1992 status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
1993 }
1994
1995 if !self.read_only {
1996 self.append_status_introspection_updates(
1997 IntrospectionType::SourceStatusHistory,
1998 status_updates,
1999 );
2000 }
2001
2002 {
2003 let mut source_statistics = self.source_statistics.lock().expect("poisoned");
2004 for id in source_statistics_to_drop {
2005 source_statistics.source_statistics.remove(&id);
2006 source_statistics.webhook_statistics.remove(&id);
2007 }
2008 }
2009
2010 for id in ingestions_to_drop.iter().chain(collections_to_drop.iter()) {
2012 tracing::info!(%id, "dropping collection state");
2013 let collection = self
2014 .collections
2015 .remove(id)
2016 .expect("list populated after checking that self.collections contains it");
2017
2018 let instance = match &collection.extra_state {
2019 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
2020 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
2021 CollectionStateExtra::None => None,
2022 }
2023 .and_then(|i| self.instances.get(&i));
2024
2025 if let Some(instance) = instance {
2029 let active_replicas = instance.get_active_replicas_for_object(id);
2030 if !active_replicas.is_empty() {
2031 match &collection.data_source {
2038 DataSource::Ingestion(ingestion_desc) => {
2039 self.dropped_objects.insert(
2040 ingestion_desc.remap_collection_id,
2041 active_replicas.clone(),
2042 );
2043 }
2044 _ => {}
2045 }
2046
2047 self.dropped_objects.insert(*id, active_replicas);
2048 }
2049 }
2050 }
2051
2052 self.storage_collections
2054 .drop_collections_unvalidated(storage_metadata, ids);
2055
2056 Ok(())
2057 }
2058
2059 fn drop_sinks(
2061 &mut self,
2062 storage_metadata: &StorageMetadata,
2063 identifiers: Vec<GlobalId>,
2064 ) -> Result<(), StorageError<Self::Timestamp>> {
2065 self.validate_export_ids(identifiers.iter().cloned())?;
2066 self.drop_sinks_unvalidated(storage_metadata, identifiers);
2067 Ok(())
2068 }
2069
2070 fn drop_sinks_unvalidated(
2071 &mut self,
2072 storage_metadata: &StorageMetadata,
2073 mut sinks_to_drop: Vec<GlobalId>,
2074 ) {
2075 sinks_to_drop.retain(|id| self.export(*id).is_ok());
2077
2078 let drop_policy = sinks_to_drop
2085 .iter()
2086 .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
2087 .collect();
2088
2089 tracing::debug!(
2090 ?drop_policy,
2091 "dropping sources by setting read hold policies"
2092 );
2093 self.set_hold_policies(drop_policy);
2094
2095 let status_now = mz_ore::now::to_datetime((self.now)());
2102
2103 let mut status_updates = vec![];
2105 {
2106 let mut sink_statistics = self.sink_statistics.lock().expect("poisoned");
2107 for id in sinks_to_drop.iter() {
2108 status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
2109 sink_statistics.remove(id);
2110 }
2111 }
2112
2113 if !self.read_only {
2114 self.append_status_introspection_updates(
2115 IntrospectionType::SinkStatusHistory,
2116 status_updates,
2117 );
2118 }
2119
2120 for id in sinks_to_drop.iter() {
2122 tracing::info!(%id, "dropping export state");
2123 let collection = self
2124 .collections
2125 .remove(id)
2126 .expect("list populated after checking that self.collections contains it");
2127
2128 let instance = match &collection.extra_state {
2129 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
2130 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
2131 CollectionStateExtra::None => None,
2132 }
2133 .and_then(|i| self.instances.get(&i));
2134
2135 if let Some(instance) = instance {
2139 let active_replicas = instance.get_active_replicas_for_object(id);
2140 if !active_replicas.is_empty() {
2141 self.dropped_objects.insert(*id, active_replicas);
2142 }
2143 }
2144 }
2145
2146 self.storage_collections
2148 .drop_collections_unvalidated(storage_metadata, sinks_to_drop);
2149 }
2150
2151 #[instrument(level = "debug")]
2152 fn append_table(
2153 &mut self,
2154 write_ts: Self::Timestamp,
2155 advance_to: Self::Timestamp,
2156 commands: Vec<(GlobalId, Vec<TableData>)>,
2157 ) -> Result<
2158 tokio::sync::oneshot::Receiver<Result<(), StorageError<Self::Timestamp>>>,
2159 StorageError<Self::Timestamp>,
2160 > {
2161 if self.read_only {
2162 if !commands
2165 .iter()
2166 .all(|(id, _)| id.is_system() && self.migrated_storage_collections.contains(id))
2167 {
2168 return Err(StorageError::ReadOnly);
2169 }
2170 }
2171
2172 for (id, updates) in commands.iter() {
2174 if !updates.is_empty() {
2175 if !write_ts.less_than(&advance_to) {
2176 return Err(StorageError::UpdateBeyondUpper(*id));
2177 }
2178 }
2179 }
2180
2181 Ok(self
2182 .persist_table_worker
2183 .append(write_ts, advance_to, commands))
2184 }
2185
2186 fn monotonic_appender(
2187 &self,
2188 id: GlobalId,
2189 ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>> {
2190 self.collection_manager.monotonic_appender(id)
2191 }
2192
2193 fn webhook_statistics(
2194 &self,
2195 id: GlobalId,
2196 ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>> {
2197 let source_statistics = self.source_statistics.lock().expect("poisoned");
2199 source_statistics
2200 .webhook_statistics
2201 .get(&id)
2202 .cloned()
2203 .ok_or(StorageError::IdentifierMissing(id))
2204 }
2205
2206 async fn ready(&mut self) {
2207 if self.maintenance_scheduled {
2208 return;
2209 }
2210
2211 if !self.pending_table_handle_drops_rx.is_empty() {
2212 return;
2213 }
2214
2215 tokio::select! {
2216 Some(m) = self.instance_response_rx.recv() => {
2217 self.stashed_responses.push(m);
2218 while let Ok(m) = self.instance_response_rx.try_recv() {
2219 self.stashed_responses.push(m);
2220 }
2221 }
2222 _ = self.maintenance_ticker.tick() => {
2223 self.maintenance_scheduled = true;
2224 },
2225 };
2226 }
2227
2228 #[instrument(level = "debug")]
2229 fn process(
2230 &mut self,
2231 storage_metadata: &StorageMetadata,
2232 ) -> Result<Option<Response<T>>, anyhow::Error> {
2233 if self.maintenance_scheduled {
2235 self.maintain();
2236 self.maintenance_scheduled = false;
2237 }
2238
2239 for instance in self.instances.values_mut() {
2240 instance.rehydrate_failed_replicas();
2241 }
2242
2243 let mut status_updates = vec![];
2244 let mut updated_frontiers = BTreeMap::new();
2245
2246 let stashed_responses = std::mem::take(&mut self.stashed_responses);
2248 for resp in stashed_responses {
2249 match resp {
2250 (_replica_id, StorageResponse::FrontierUpper(id, upper)) => {
2251 self.update_write_frontier(id, &upper);
2252 updated_frontiers.insert(id, upper);
2253 }
2254 (replica_id, StorageResponse::DroppedId(id)) => {
2255 let replica_id = replica_id.expect("DroppedId from unknown replica");
2256 if let Some(remaining_replicas) = self.dropped_objects.get_mut(&id) {
2257 remaining_replicas.remove(&replica_id);
2258 if remaining_replicas.is_empty() {
2259 self.dropped_objects.remove(&id);
2260 }
2261 } else {
2262 soft_panic_or_log!("unexpected DroppedId for {id}");
2263 }
2264 }
2265 (_replica_id, StorageResponse::StatisticsUpdates(source_stats, sink_stats)) => {
2266 {
2274 let mut shared_stats = self.source_statistics.lock().expect("poisoned");
2275 for stat in source_stats {
2276 shared_stats
2278 .source_statistics
2279 .entry(stat.id)
2280 .and_modify(|current| current.stat().incorporate(stat));
2281 }
2282 }
2283
2284 {
2285 let mut shared_stats = self.sink_statistics.lock().expect("poisoned");
2286 for stat in sink_stats {
2287 shared_stats
2289 .entry(stat.id)
2290 .and_modify(|current| current.stat().incorporate(stat));
2291 }
2292 }
2293 }
2294 (replica_id, StorageResponse::StatusUpdate(mut status_update)) => {
2295 match status_update.status {
2311 Status::Running => {
2312 let collection = self.collections.get_mut(&status_update.id);
2313 match collection {
2314 Some(collection) => {
2315 match collection.extra_state {
2316 CollectionStateExtra::Ingestion(
2317 ref mut ingestion_state,
2318 ) => {
2319 if ingestion_state.hydrated_on.is_empty() {
2320 tracing::debug!(ingestion_id = %status_update.id, "ingestion is hydrated");
2321 }
2322 ingestion_state.hydrated_on.insert(replica_id.expect(
2323 "replica id should be present for status running",
2324 ));
2325 }
2326 CollectionStateExtra::Export(_) => {
2327 }
2329 CollectionStateExtra::None => {
2330 }
2332 }
2333 }
2334 None => (), }
2337 }
2338 Status::Paused => {
2339 let collection = self.collections.get_mut(&status_update.id);
2340 match collection {
2341 Some(collection) => {
2342 match collection.extra_state {
2343 CollectionStateExtra::Ingestion(
2344 ref mut ingestion_state,
2345 ) => {
2346 tracing::debug!(ingestion_id = %status_update.id, "ingestion is now paused");
2353 ingestion_state.hydrated_on.clear();
2354 }
2355 CollectionStateExtra::Export(_) => {
2356 }
2358 CollectionStateExtra::None => {
2359 }
2361 }
2362 }
2363 None => (), }
2366 }
2367 _ => (),
2368 }
2369
2370 if let Some(id) = replica_id {
2372 status_update.replica_id = Some(id);
2373 }
2374 status_updates.push(status_update);
2375 }
2376 (_replica_id, StorageResponse::StagedBatches(batches)) => {
2377 for (ingestion_id, batches) in batches {
2378 match self.pending_oneshot_ingestions.remove(&ingestion_id) {
2379 Some(pending) => {
2380 if let Some(instance) = self.instances.get_mut(&pending.cluster_id)
2383 {
2384 instance
2385 .send(StorageCommand::CancelOneshotIngestion(ingestion_id));
2386 }
2387 (pending.result_tx)(batches)
2389 }
2390 None => {
2391 }
2394 }
2395 }
2396 }
2397 }
2398 }
2399
2400 self.record_status_updates(status_updates);
2401
2402 let mut dropped_table_ids = Vec::new();
2404 while let Ok(dropped_id) = self.pending_table_handle_drops_rx.try_recv() {
2405 dropped_table_ids.push(dropped_id);
2406 }
2407 if !dropped_table_ids.is_empty() {
2408 self.drop_sources(storage_metadata, dropped_table_ids)?;
2409 }
2410
2411 if updated_frontiers.is_empty() {
2412 Ok(None)
2413 } else {
2414 Ok(Some(Response::FrontierUpdates(
2415 updated_frontiers.into_iter().collect(),
2416 )))
2417 }
2418 }
2419
2420 async fn inspect_persist_state(
2421 &self,
2422 id: GlobalId,
2423 ) -> Result<serde_json::Value, anyhow::Error> {
2424 let collection = &self.storage_collections.collection_metadata(id)?;
2425 let client = self
2426 .persist
2427 .open(collection.persist_location.clone())
2428 .await?;
2429 let shard_state = client
2430 .inspect_shard::<Self::Timestamp>(&collection.data_shard)
2431 .await?;
2432 let json_state = serde_json::to_value(shard_state)?;
2433 Ok(json_state)
2434 }
2435
2436 fn append_introspection_updates(
2437 &mut self,
2438 type_: IntrospectionType,
2439 updates: Vec<(Row, Diff)>,
2440 ) {
2441 let id = self.introspection_ids[&type_];
2442 let updates = updates.into_iter().map(|update| update.into()).collect();
2443 self.collection_manager.blind_write(id, updates);
2444 }
2445
2446 fn append_status_introspection_updates(
2447 &mut self,
2448 type_: IntrospectionType,
2449 updates: Vec<StatusUpdate>,
2450 ) {
2451 let id = self.introspection_ids[&type_];
2452 let updates: Vec<_> = updates.into_iter().map(|update| update.into()).collect();
2453 if !updates.is_empty() {
2454 self.collection_manager.blind_write(id, updates);
2455 }
2456 }
2457
2458 fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp) {
2459 let id = self.introspection_ids[&type_];
2460 self.collection_manager.differential_write(id, op);
2461 }
2462
2463 fn append_only_introspection_tx(
2464 &self,
2465 type_: IntrospectionType,
2466 ) -> mpsc::UnboundedSender<(
2467 Vec<AppendOnlyUpdate>,
2468 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2469 )> {
2470 let id = self.introspection_ids[&type_];
2471 self.collection_manager.append_only_write_sender(id)
2472 }
2473
2474 fn differential_introspection_tx(
2475 &self,
2476 type_: IntrospectionType,
2477 ) -> mpsc::UnboundedSender<(
2478 StorageWriteOp,
2479 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2480 )> {
2481 let id = self.introspection_ids[&type_];
2482 self.collection_manager.differential_write_sender(id)
2483 }
2484
2485 async fn real_time_recent_timestamp(
2486 &self,
2487 timestamp_objects: BTreeSet<GlobalId>,
2488 timeout: Duration,
2489 ) -> Result<
2490 BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>,
2491 StorageError<Self::Timestamp>,
2492 > {
2493 use mz_storage_types::sources::GenericSourceConnection;
2494
2495 let mut rtr_futures = BTreeMap::new();
2496
2497 for id in timestamp_objects.into_iter().filter(GlobalId::is_user) {
2499 let collection = match self.collection(id) {
2500 Ok(c) => c,
2501 Err(_) => continue,
2503 };
2504
2505 let (source_conn, remap_id) = match &collection.data_source {
2506 DataSource::Ingestion(IngestionDescription {
2507 desc: SourceDesc { connection, .. },
2508 remap_collection_id,
2509 ..
2510 }) => match connection {
2511 GenericSourceConnection::Kafka(_)
2512 | GenericSourceConnection::Postgres(_)
2513 | GenericSourceConnection::MySql(_)
2514 | GenericSourceConnection::SqlServer(_) => {
2515 (connection.clone(), *remap_collection_id)
2516 }
2517
2518 GenericSourceConnection::LoadGenerator(_) => continue,
2523 },
2524 _ => {
2526 continue;
2527 }
2528 };
2529
2530 let config = self.config().clone();
2532
2533 let read_handle = self.read_handle_for_snapshot(remap_id).await?;
2541
2542 let remap_read_hold = self
2545 .storage_collections
2546 .acquire_read_holds(vec![remap_id])
2547 .map_err(|_e| StorageError::ReadBeforeSince(remap_id))?
2548 .expect_element(|| "known to be exactly one");
2549
2550 let remap_as_of = remap_read_hold
2551 .since()
2552 .to_owned()
2553 .into_option()
2554 .ok_or(StorageError::ReadBeforeSince(remap_id))?;
2555
2556 rtr_futures.insert(
2557 id,
2558 tokio::time::timeout(timeout, async move {
2559 use mz_storage_types::sources::SourceConnection as _;
2560
2561 let as_of = Antichain::from_elem(remap_as_of);
2564 let remap_subscribe = read_handle
2565 .subscribe(as_of.clone())
2566 .await
2567 .map_err(|_| StorageError::ReadBeforeSince(remap_id))?;
2568
2569 tracing::debug!(?id, type_ = source_conn.name(), upstream = ?source_conn.external_reference(), "fetching real time recency");
2570
2571 let result = rtr::real_time_recency_ts(source_conn, id, config, as_of, remap_subscribe)
2572 .await.map_err(|e| {
2573 tracing::debug!(?id, "real time recency error: {:?}", e);
2574 e
2575 });
2576
2577 drop(remap_read_hold);
2579
2580 result
2581 }),
2582 );
2583 }
2584
2585 Ok(Box::pin(async move {
2586 let (ids, futs): (Vec<_>, Vec<_>) = rtr_futures.into_iter().unzip();
2587 ids.into_iter()
2588 .zip_eq(futures::future::join_all(futs).await)
2589 .try_fold(T::minimum(), |curr, (id, per_source_res)| {
2590 let new =
2591 per_source_res.map_err(|_e: Elapsed| StorageError::RtrTimeout(id))??;
2592 Ok::<_, StorageError<Self::Timestamp>>(std::cmp::max(curr, new))
2593 })
2594 }))
2595 }
2596}
2597
2598pub fn prepare_initialization<T>(txn: &mut dyn StorageTxn<T>) -> Result<(), StorageError<T>> {
2605 if txn.get_txn_wal_shard().is_none() {
2606 let txns_id = ShardId::new();
2607 txn.write_txn_wal_shard(txns_id)?;
2608 }
2609
2610 Ok(())
2611}
2612
2613impl<T> Controller<T>
2614where
2615 T: Timestamp
2616 + Lattice
2617 + TotalOrder
2618 + Codec64
2619 + From<EpochMillis>
2620 + TimestampManipulation
2621 + Into<Datum<'static>>,
2622 StorageCommand<T>: RustType<ProtoStorageCommand>,
2623 StorageResponse<T>: RustType<ProtoStorageResponse>,
2624 Self: StorageController<Timestamp = T>,
2625{
2626 pub async fn new(
2634 build_info: &'static BuildInfo,
2635 persist_location: PersistLocation,
2636 persist_clients: Arc<PersistClientCache>,
2637 now: NowFn,
2638 wallclock_lag: WallclockLagFn<T>,
2639 txns_metrics: Arc<TxnMetrics>,
2640 envd_epoch: NonZeroI64,
2641 read_only: bool,
2642 metrics_registry: &MetricsRegistry,
2643 controller_metrics: ControllerMetrics,
2644 connection_context: ConnectionContext,
2645 txn: &dyn StorageTxn<T>,
2646 storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
2647 ) -> Self {
2648 let txns_client = persist_clients
2649 .open(persist_location.clone())
2650 .await
2651 .expect("location should be valid");
2652
2653 let persist_warm_task = warm_persist_state_in_background(
2654 txns_client.clone(),
2655 txn.get_collection_metadata().into_values(),
2656 );
2657 let persist_warm_task = Some(persist_warm_task.abort_on_drop());
2658
2659 let txns_id = txn
2663 .get_txn_wal_shard()
2664 .expect("must call prepare initialization before creating storage controller");
2665
2666 let persist_table_worker = if read_only {
2667 let txns_write = txns_client
2668 .open_writer(
2669 txns_id,
2670 Arc::new(TxnsCodecRow::desc()),
2671 Arc::new(UnitSchema),
2672 Diagnostics {
2673 shard_name: "txns".to_owned(),
2674 handle_purpose: "follow txns upper".to_owned(),
2675 },
2676 )
2677 .await
2678 .expect("txns schema shouldn't change");
2679 persist_handles::PersistTableWriteWorker::new_read_only_mode(txns_write)
2680 } else {
2681 let txns = TxnsHandle::open(
2682 T::minimum(),
2683 txns_client.clone(),
2684 txns_client.dyncfgs().clone(),
2685 Arc::clone(&txns_metrics),
2686 txns_id,
2687 )
2688 .await;
2689 persist_handles::PersistTableWriteWorker::new_txns(txns)
2690 };
2691 let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
2692
2693 let collection_manager = collection_mgmt::CollectionManager::new(read_only, now.clone());
2694
2695 let introspection_ids = BTreeMap::new();
2696 let introspection_tokens = Arc::new(Mutex::new(BTreeMap::new()));
2697
2698 let (statistics_interval_sender, _) =
2699 channel(mz_storage_types::parameters::STATISTICS_INTERVAL_DEFAULT);
2700
2701 let (pending_table_handle_drops_tx, pending_table_handle_drops_rx) =
2702 tokio::sync::mpsc::unbounded_channel();
2703
2704 let mut maintenance_ticker = tokio::time::interval(Duration::from_secs(1));
2705 maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2706
2707 let (instance_response_tx, instance_response_rx) = mpsc::unbounded_channel();
2708
2709 let metrics = StorageControllerMetrics::new(metrics_registry, controller_metrics);
2710
2711 let now_dt = mz_ore::now::to_datetime(now());
2712
2713 Self {
2714 build_info,
2715 collections: BTreeMap::default(),
2716 dropped_objects: Default::default(),
2717 persist_table_worker,
2718 txns_read,
2719 txns_metrics,
2720 stashed_responses: vec![],
2721 pending_table_handle_drops_tx,
2722 pending_table_handle_drops_rx,
2723 pending_oneshot_ingestions: BTreeMap::default(),
2724 collection_manager,
2725 introspection_ids,
2726 introspection_tokens,
2727 now,
2728 envd_epoch,
2729 read_only,
2730 source_statistics: Arc::new(Mutex::new(statistics::SourceStatistics {
2731 source_statistics: BTreeMap::new(),
2732 webhook_statistics: BTreeMap::new(),
2733 })),
2734 sink_statistics: Arc::new(Mutex::new(BTreeMap::new())),
2735 statistics_interval_sender,
2736 instances: BTreeMap::new(),
2737 initialized: false,
2738 config: StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs()),
2739 persist_location,
2740 persist: persist_clients,
2741 metrics,
2742 recorded_frontiers: BTreeMap::new(),
2743 recorded_replica_frontiers: BTreeMap::new(),
2744 wallclock_lag,
2745 wallclock_lag_last_recorded: now_dt,
2746 storage_collections,
2747 migrated_storage_collections: BTreeSet::new(),
2748 maintenance_ticker,
2749 maintenance_scheduled: false,
2750 instance_response_rx,
2751 instance_response_tx,
2752 persist_warm_task,
2753 }
2754 }
2755
2756 #[instrument(level = "debug")]
2764 fn set_hold_policies(&mut self, policies: Vec<(GlobalId, ReadPolicy<T>)>) {
2765 let mut read_capability_changes = BTreeMap::default();
2766
2767 for (id, policy) in policies.into_iter() {
2768 if let Some(collection) = self.collections.get_mut(&id) {
2769 let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state
2770 {
2771 CollectionStateExtra::Ingestion(ingestion) => (
2772 ingestion.write_frontier.borrow(),
2773 &mut ingestion.derived_since,
2774 &mut ingestion.hold_policy,
2775 ),
2776 CollectionStateExtra::None => {
2777 unreachable!("set_hold_policies is only called for ingestions");
2778 }
2779 CollectionStateExtra::Export(export) => (
2780 export.write_frontier.borrow(),
2781 &mut export.derived_since,
2782 &mut export.read_policy,
2783 ),
2784 };
2785
2786 let new_derived_since = policy.frontier(write_frontier);
2787 let mut update = swap_updates(derived_since, new_derived_since);
2788 if !update.is_empty() {
2789 read_capability_changes.insert(id, update);
2790 }
2791
2792 *hold_policy = policy;
2793 }
2794 }
2795
2796 if !read_capability_changes.is_empty() {
2797 self.update_hold_capabilities(&mut read_capability_changes);
2798 }
2799 }
2800
2801 #[instrument(level = "debug", fields(updates))]
2802 fn update_write_frontier(&mut self, id: GlobalId, new_upper: &Antichain<T>) {
2803 let mut read_capability_changes = BTreeMap::default();
2804
2805 if let Some(collection) = self.collections.get_mut(&id) {
2806 let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state {
2807 CollectionStateExtra::Ingestion(ingestion) => (
2808 &mut ingestion.write_frontier,
2809 &mut ingestion.derived_since,
2810 &ingestion.hold_policy,
2811 ),
2812 CollectionStateExtra::None => {
2813 if matches!(collection.data_source, DataSource::Progress) {
2814 } else {
2816 tracing::error!(
2817 ?collection,
2818 ?new_upper,
2819 "updated write frontier for collection which is not an ingestion"
2820 );
2821 }
2822 return;
2823 }
2824 CollectionStateExtra::Export(export) => (
2825 &mut export.write_frontier,
2826 &mut export.derived_since,
2827 &export.read_policy,
2828 ),
2829 };
2830
2831 if PartialOrder::less_than(write_frontier, new_upper) {
2832 write_frontier.clone_from(new_upper);
2833 }
2834
2835 let new_derived_since = hold_policy.frontier(write_frontier.borrow());
2836 let mut update = swap_updates(derived_since, new_derived_since);
2837 if !update.is_empty() {
2838 read_capability_changes.insert(id, update);
2839 }
2840 } else if self.dropped_objects.contains_key(&id) {
2841 } else {
2844 soft_panic_or_log!("spurious upper update for {id}: {new_upper:?}");
2845 }
2846
2847 if !read_capability_changes.is_empty() {
2848 self.update_hold_capabilities(&mut read_capability_changes);
2849 }
2850 }
2851
2852 #[instrument(level = "debug", fields(updates))]
2856 fn update_hold_capabilities(&mut self, updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>) {
2857 let mut collections_net = BTreeMap::new();
2859
2860 while let Some(key) = updates.keys().rev().next().cloned() {
2865 let mut update = updates.remove(&key).unwrap();
2866
2867 if key.is_user() {
2868 debug!(id = %key, ?update, "update_hold_capability");
2869 }
2870
2871 if let Some(collection) = self.collections.get_mut(&key) {
2872 match &mut collection.extra_state {
2873 CollectionStateExtra::Ingestion(ingestion) => {
2874 let changes = ingestion.read_capabilities.update_iter(update.drain());
2875 update.extend(changes);
2876
2877 let (changes, frontier, _cluster_id) =
2878 collections_net.entry(key).or_insert_with(|| {
2879 (
2880 <ChangeBatch<_>>::new(),
2881 Antichain::new(),
2882 ingestion.instance_id,
2883 )
2884 });
2885
2886 changes.extend(update.drain());
2887 *frontier = ingestion.read_capabilities.frontier().to_owned();
2888 }
2889 CollectionStateExtra::None => {
2890 soft_panic_or_log!(
2892 "trying to update holds for collection {collection:?} which is not \
2893 an ingestion: {update:?}"
2894 );
2895 continue;
2896 }
2897 CollectionStateExtra::Export(export) => {
2898 let changes = export.read_capabilities.update_iter(update.drain());
2899 update.extend(changes);
2900
2901 let (changes, frontier, _cluster_id) =
2902 collections_net.entry(key).or_insert_with(|| {
2903 (<ChangeBatch<_>>::new(), Antichain::new(), export.cluster_id)
2904 });
2905
2906 changes.extend(update.drain());
2907 *frontier = export.read_capabilities.frontier().to_owned();
2908 }
2909 }
2910 } else {
2911 tracing::warn!(id = ?key, ?update, "update_hold_capabilities for unknown object");
2913 }
2914 }
2915
2916 for (key, (mut changes, frontier, cluster_id)) in collections_net {
2919 if !changes.is_empty() {
2920 if key.is_user() {
2921 debug!(id = %key, ?frontier, "downgrading ingestion read holds!");
2922 }
2923
2924 let collection = self
2925 .collections
2926 .get_mut(&key)
2927 .expect("missing collection state");
2928
2929 let read_holds = match &mut collection.extra_state {
2930 CollectionStateExtra::Ingestion(ingestion) => {
2931 ingestion.dependency_read_holds.as_mut_slice()
2932 }
2933 CollectionStateExtra::Export(export) => export.read_holds.as_mut_slice(),
2934 CollectionStateExtra::None => {
2935 soft_panic_or_log!(
2936 "trying to downgrade read holds for collection which is not an \
2937 ingestion: {collection:?}"
2938 );
2939 continue;
2940 }
2941 };
2942
2943 for read_hold in read_holds.iter_mut() {
2944 read_hold
2945 .try_downgrade(frontier.clone())
2946 .expect("we only advance the frontier");
2947 }
2948
2949 if let Some(instance) = self.instances.get_mut(&cluster_id) {
2951 instance.send(StorageCommand::AllowCompaction(key, frontier.clone()));
2952 } else {
2953 soft_panic_or_log!(
2954 "missing instance client for cluster {cluster_id} while we still have outstanding AllowCompaction command {frontier:?} for {key}"
2955 );
2956 }
2957 }
2958 }
2959 }
2960
2961 fn validate_collection_ids(
2963 &self,
2964 ids: impl Iterator<Item = GlobalId>,
2965 ) -> Result<(), StorageError<T>> {
2966 for id in ids {
2967 self.storage_collections.check_exists(id)?;
2968 }
2969 Ok(())
2970 }
2971
2972 fn validate_export_ids(
2974 &self,
2975 ids: impl Iterator<Item = GlobalId>,
2976 ) -> Result<(), StorageError<T>> {
2977 for id in ids {
2978 self.export(id)?;
2979 }
2980 Ok(())
2981 }
2982
2983 async fn open_data_handles(
2991 &self,
2992 id: &GlobalId,
2993 shard: ShardId,
2994 relation_desc: RelationDesc,
2995 persist_client: &PersistClient,
2996 ) -> WriteHandle<SourceData, (), T, StorageDiff> {
2997 let diagnostics = Diagnostics {
2998 shard_name: id.to_string(),
2999 handle_purpose: format!("controller data for {}", id),
3000 };
3001
3002 let mut write = persist_client
3003 .open_writer(
3004 shard,
3005 Arc::new(relation_desc),
3006 Arc::new(UnitSchema),
3007 diagnostics.clone(),
3008 )
3009 .await
3010 .expect("invalid persist usage");
3011
3012 write.fetch_recent_upper().await;
3021
3022 write
3023 }
3024
3025 fn register_introspection_collection(
3030 &mut self,
3031 id: GlobalId,
3032 introspection_type: IntrospectionType,
3033 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
3034 persist_client: PersistClient,
3035 ) -> Result<(), StorageError<T>> {
3036 tracing::info!(%id, ?introspection_type, "registering introspection collection");
3037
3038 let force_writable = self.read_only && self.migrated_storage_collections.contains(&id);
3042 if force_writable {
3043 assert!(id.is_system(), "unexpected non-system global id: {id:?}");
3044 info!("writing to migrated storage collection {id} in read-only mode");
3045 }
3046
3047 let prev = self.introspection_ids.insert(introspection_type, id);
3048 assert!(
3049 prev.is_none(),
3050 "cannot have multiple IDs for introspection type"
3051 );
3052
3053 let metadata = self.storage_collections.collection_metadata(id)?.clone();
3054
3055 let read_handle_fn = move || {
3056 let persist_client = persist_client.clone();
3057 let metadata = metadata.clone();
3058
3059 let fut = async move {
3060 let read_handle = persist_client
3061 .open_leased_reader::<SourceData, (), T, StorageDiff>(
3062 metadata.data_shard,
3063 Arc::new(metadata.relation_desc.clone()),
3064 Arc::new(UnitSchema),
3065 Diagnostics {
3066 shard_name: id.to_string(),
3067 handle_purpose: format!("snapshot {}", id),
3068 },
3069 USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3070 )
3071 .await
3072 .expect("invalid persist usage");
3073 read_handle
3074 };
3075
3076 fut.boxed()
3077 };
3078
3079 let recent_upper = write_handle.shared_upper();
3080
3081 match CollectionManagerKind::from(&introspection_type) {
3082 CollectionManagerKind::Differential => {
3087 let introspection_config = DifferentialIntrospectionConfig {
3089 recent_upper,
3090 introspection_type,
3091 storage_collections: Arc::clone(&self.storage_collections),
3092 collection_manager: self.collection_manager.clone(),
3093 source_statistics: Arc::clone(&self.source_statistics),
3094 sink_statistics: Arc::clone(&self.sink_statistics),
3095 statistics_interval: self.config.parameters.statistics_interval.clone(),
3096 statistics_interval_receiver: self.statistics_interval_sender.subscribe(),
3097 metrics: self.metrics.clone(),
3098 introspection_tokens: Arc::clone(&self.introspection_tokens),
3099 };
3100 self.collection_manager.register_differential_collection(
3101 id,
3102 write_handle,
3103 read_handle_fn,
3104 force_writable,
3105 introspection_config,
3106 );
3107 }
3108 CollectionManagerKind::AppendOnly => {
3116 let introspection_config = AppendOnlyIntrospectionConfig {
3117 introspection_type,
3118 config_set: Arc::clone(self.config.config_set()),
3119 parameters: self.config.parameters.clone(),
3120 storage_collections: Arc::clone(&self.storage_collections),
3121 };
3122 self.collection_manager.register_append_only_collection(
3123 id,
3124 write_handle,
3125 force_writable,
3126 Some(introspection_config),
3127 );
3128 }
3129 }
3130
3131 Ok(())
3132 }
3133
3134 fn reconcile_dangling_statistics(&self) {
3137 self.source_statistics
3138 .lock()
3139 .expect("poisoned")
3140 .source_statistics
3141 .retain(|k, _| self.storage_collections.check_exists(*k).is_ok());
3143 self.sink_statistics
3144 .lock()
3145 .expect("poisoned")
3146 .retain(|k, _| self.export(*k).is_ok());
3147 }
3148
3149 #[instrument(level = "debug")]
3159 fn append_shard_mappings<I>(&self, global_ids: I, diff: Diff)
3160 where
3161 I: Iterator<Item = GlobalId>,
3162 {
3163 mz_ore::soft_assert_or_log!(
3164 diff == Diff::MINUS_ONE || diff == Diff::ONE,
3165 "use 1 for insert or -1 for delete"
3166 );
3167
3168 let id = *self
3169 .introspection_ids
3170 .get(&IntrospectionType::ShardMapping)
3171 .expect("should be registered before this call");
3172
3173 let mut updates = vec![];
3174 let mut row_buf = Row::default();
3176
3177 for global_id in global_ids {
3178 let shard_id = if let Some(collection) = self.collections.get(&global_id) {
3179 collection.collection_metadata.data_shard.clone()
3180 } else {
3181 panic!("unknown global id: {}", global_id);
3182 };
3183
3184 let mut packer = row_buf.packer();
3185 packer.push(Datum::from(global_id.to_string().as_str()));
3186 packer.push(Datum::from(shard_id.to_string().as_str()));
3187 updates.push((row_buf.clone(), diff));
3188 }
3189
3190 self.collection_manager.differential_append(id, updates);
3191 }
3192
3193 fn determine_collection_dependencies(
3195 &self,
3196 self_id: GlobalId,
3197 data_source: &DataSource<T>,
3198 ) -> Result<Vec<GlobalId>, StorageError<T>> {
3199 let dependency = match &data_source {
3200 DataSource::Introspection(_)
3201 | DataSource::Webhook
3202 | DataSource::Table { primary: None }
3203 | DataSource::Progress
3204 | DataSource::Other => vec![],
3205 DataSource::Table {
3206 primary: Some(primary),
3207 } => vec![*primary],
3208 DataSource::IngestionExport { ingestion_id, .. } => {
3209 let source_collection = self.collection(*ingestion_id)?;
3212 let ingestion_remap_collection_id = match &source_collection.data_source {
3213 DataSource::Ingestion(ingestion) => ingestion.remap_collection_id,
3214 _ => unreachable!(
3215 "SourceExport must only refer to primary sources that already exist"
3216 ),
3217 };
3218
3219 vec![self_id, ingestion_remap_collection_id]
3225 }
3226 DataSource::Ingestion(ingestion) => {
3228 vec![self_id, ingestion.remap_collection_id]
3233 }
3234 DataSource::Sink { desc } => {
3235 vec![self_id, desc.sink.from]
3237 }
3238 };
3239
3240 Ok(dependency)
3241 }
3242
3243 async fn read_handle_for_snapshot(
3244 &self,
3245 id: GlobalId,
3246 ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
3247 let metadata = self.storage_collections.collection_metadata(id)?;
3248 read_handle_for_snapshot(&self.persist, id, &metadata).await
3249 }
3250
3251 fn record_status_updates(&mut self, updates: Vec<StatusUpdate>) {
3254 if self.read_only {
3255 return;
3256 }
3257
3258 let mut sink_status_updates = vec![];
3259 let mut source_status_updates = vec![];
3260
3261 for update in updates {
3262 let id = update.id;
3263 if self.export(id).is_ok() {
3264 sink_status_updates.push(update);
3265 } else if self.storage_collections.check_exists(id).is_ok() {
3266 source_status_updates.push(update);
3267 }
3268 }
3269
3270 self.append_status_introspection_updates(
3271 IntrospectionType::SourceStatusHistory,
3272 source_status_updates,
3273 );
3274 self.append_status_introspection_updates(
3275 IntrospectionType::SinkStatusHistory,
3276 sink_status_updates,
3277 );
3278 }
3279
3280 fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, StorageError<T>> {
3281 self.collections
3282 .get(&id)
3283 .ok_or(StorageError::IdentifierMissing(id))
3284 }
3285
3286 fn run_ingestion(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3289 tracing::info!(%id, "starting ingestion");
3290
3291 let collection = self.collection(id)?;
3292 let ingestion_description = match &collection.data_source {
3293 DataSource::Ingestion(i) => i.clone(),
3294 _ => {
3295 tracing::warn!("run_ingestion called on non-ingestion ID {}", id);
3296 Err(StorageError::IdentifierInvalid(id))?
3297 }
3298 };
3299
3300 let mut source_exports = BTreeMap::new();
3302 for (
3303 export_id,
3304 SourceExport {
3305 storage_metadata: (),
3306 details,
3307 data_config,
3308 },
3309 ) in ingestion_description.source_exports.clone()
3310 {
3311 let export_storage_metadata = self.collection(export_id)?.collection_metadata.clone();
3312 source_exports.insert(
3313 export_id,
3314 SourceExport {
3315 storage_metadata: export_storage_metadata,
3316 details,
3317 data_config,
3318 },
3319 );
3320 }
3321
3322 let description = IngestionDescription::<CollectionMetadata> {
3323 source_exports,
3324 ingestion_metadata: collection.collection_metadata.clone(),
3327 desc: ingestion_description.desc.clone(),
3329 instance_id: ingestion_description.instance_id,
3330 remap_collection_id: ingestion_description.remap_collection_id,
3331 };
3332
3333 let storage_instance_id = description.instance_id;
3334 let instance = self
3336 .instances
3337 .get_mut(&storage_instance_id)
3338 .ok_or_else(|| StorageError::IngestionInstanceMissing {
3339 storage_instance_id,
3340 ingestion_id: id,
3341 })?;
3342
3343 let augmented_ingestion = Box::new(RunIngestionCommand { id, description });
3344 instance.send(StorageCommand::RunIngestion(augmented_ingestion));
3345
3346 Ok(())
3347 }
3348
3349 fn run_export(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3352 let DataSource::Sink { desc: description } = &self.collections[&id].data_source else {
3353 return Err(StorageError::IdentifierMissing(id));
3354 };
3355
3356 let from_storage_metadata = self
3357 .storage_collections
3358 .collection_metadata(description.sink.from)?;
3359 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
3360
3361 let enable_snapshot_frontier =
3365 dyncfgs::STORAGE_SINK_SNAPSHOT_FRONTIER.get(self.config().config_set());
3366 let export_state = self.storage_collections.collection_frontiers(id)?;
3367 let mut as_of = description.sink.as_of.clone();
3368 as_of.join_assign(&export_state.implied_capability);
3369 let with_snapshot = if enable_snapshot_frontier
3370 && PartialOrder::less_than(&as_of, &export_state.write_frontier)
3371 {
3372 false
3373 } else {
3374 description.sink.with_snapshot
3375 };
3376
3377 info!(
3378 sink_id = %id,
3379 from_id = %description.sink.from,
3380 write_frontier = ?export_state.write_frontier,
3381 ?as_of,
3382 ?with_snapshot,
3383 "run_export"
3384 );
3385
3386 let cmd = RunSinkCommand {
3387 id,
3388 description: StorageSinkDesc {
3389 from: description.sink.from,
3390 from_desc: description.sink.from_desc.clone(),
3391 connection: description.sink.connection.clone(),
3392 envelope: description.sink.envelope,
3393 as_of,
3394 version: description.sink.version,
3395 from_storage_metadata,
3396 with_snapshot,
3397 to_storage_metadata,
3398 },
3399 };
3400
3401 let storage_instance_id = description.instance_id.clone();
3402
3403 let instance = self
3404 .instances
3405 .get_mut(&storage_instance_id)
3406 .ok_or_else(|| StorageError::ExportInstanceMissing {
3407 storage_instance_id,
3408 export_id: id,
3409 })?;
3410
3411 instance.send(StorageCommand::RunSink(Box::new(cmd)));
3412
3413 Ok(())
3414 }
3415
3416 fn update_frontier_introspection(&mut self) {
3421 let mut global_frontiers = BTreeMap::new();
3422 let mut replica_frontiers = BTreeMap::new();
3423
3424 for collection_frontiers in self.storage_collections.active_collection_frontiers() {
3425 let id = collection_frontiers.id;
3426 let since = collection_frontiers.read_capabilities;
3427 let upper = collection_frontiers.write_frontier;
3428
3429 let instance = self
3430 .collections
3431 .get(&id)
3432 .and_then(|collection_state| match &collection_state.extra_state {
3433 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
3434 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
3435 CollectionStateExtra::None => None,
3436 })
3437 .and_then(|i| self.instances.get(&i));
3438
3439 if let Some(instance) = instance {
3440 for replica_id in instance.replica_ids() {
3441 replica_frontiers.insert((id, replica_id), upper.clone());
3442 }
3443 }
3444
3445 global_frontiers.insert(id, (since, upper));
3446 }
3447
3448 let mut global_updates = Vec::new();
3449 let mut replica_updates = Vec::new();
3450
3451 let mut push_global_update =
3452 |id: GlobalId, (since, upper): (Antichain<T>, Antichain<T>), diff: Diff| {
3453 let read_frontier = since.into_option().map_or(Datum::Null, |t| t.into());
3454 let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3455 let row = Row::pack_slice(&[
3456 Datum::String(&id.to_string()),
3457 read_frontier,
3458 write_frontier,
3459 ]);
3460 global_updates.push((row, diff));
3461 };
3462
3463 let mut push_replica_update =
3464 |(id, replica_id): (GlobalId, ReplicaId), upper: Antichain<T>, diff: Diff| {
3465 let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3466 let row = Row::pack_slice(&[
3467 Datum::String(&id.to_string()),
3468 Datum::String(&replica_id.to_string()),
3469 write_frontier,
3470 ]);
3471 replica_updates.push((row, diff));
3472 };
3473
3474 let mut old_global_frontiers =
3475 std::mem::replace(&mut self.recorded_frontiers, global_frontiers);
3476 for (&id, new) in &self.recorded_frontiers {
3477 match old_global_frontiers.remove(&id) {
3478 Some(old) if &old != new => {
3479 push_global_update(id, new.clone(), Diff::ONE);
3480 push_global_update(id, old, Diff::MINUS_ONE);
3481 }
3482 Some(_) => (),
3483 None => push_global_update(id, new.clone(), Diff::ONE),
3484 }
3485 }
3486 for (id, old) in old_global_frontiers {
3487 push_global_update(id, old, Diff::MINUS_ONE);
3488 }
3489
3490 let mut old_replica_frontiers =
3491 std::mem::replace(&mut self.recorded_replica_frontiers, replica_frontiers);
3492 for (&key, new) in &self.recorded_replica_frontiers {
3493 match old_replica_frontiers.remove(&key) {
3494 Some(old) if &old != new => {
3495 push_replica_update(key, new.clone(), Diff::ONE);
3496 push_replica_update(key, old, Diff::MINUS_ONE);
3497 }
3498 Some(_) => (),
3499 None => push_replica_update(key, new.clone(), Diff::ONE),
3500 }
3501 }
3502 for (key, old) in old_replica_frontiers {
3503 push_replica_update(key, old, Diff::MINUS_ONE);
3504 }
3505
3506 let id = self.introspection_ids[&IntrospectionType::Frontiers];
3507 self.collection_manager
3508 .differential_append(id, global_updates);
3509
3510 let id = self.introspection_ids[&IntrospectionType::ReplicaFrontiers];
3511 self.collection_manager
3512 .differential_append(id, replica_updates);
3513 }
3514
3515 fn refresh_wallclock_lag(&mut self) {
3534 let now_ms = (self.now)();
3535 let histogram_period =
3536 WallclockLagHistogramPeriod::from_epoch_millis(now_ms, self.config.config_set());
3537
3538 let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
3539 Some(ts) => (self.wallclock_lag)(ts.clone()),
3540 None => Duration::ZERO,
3541 };
3542
3543 for frontiers in self.storage_collections.active_collection_frontiers() {
3544 let id = frontiers.id;
3545 let Some(collection) = self.collections.get_mut(&id) else {
3546 continue;
3547 };
3548
3549 let collection_unreadable =
3550 PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
3551 let lag = if collection_unreadable {
3552 WallclockLag::Undefined
3553 } else {
3554 let lag = frontier_lag(&frontiers.write_frontier);
3555 WallclockLag::Seconds(lag.as_secs())
3556 };
3557
3558 collection.wallclock_lag_max = collection.wallclock_lag_max.max(lag);
3559
3560 let secs = lag.unwrap_seconds_or(u64::MAX);
3563 collection.wallclock_lag_metrics.observe(secs);
3564
3565 if !ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION.get(self.config.config_set()) {
3566 continue;
3567 }
3568
3569 if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
3570 let bucket = lag.map_seconds(|secs| secs.next_power_of_two());
3571
3572 let instance_id = match &collection.extra_state {
3573 CollectionStateExtra::Ingestion(i) => Some(i.instance_id),
3574 CollectionStateExtra::Export(e) => Some(e.cluster_id()),
3575 CollectionStateExtra::None => None,
3576 };
3577 let workload_class = instance_id
3578 .and_then(|id| self.instances.get(&id))
3579 .and_then(|i| i.workload_class.clone());
3580 let labels = match workload_class {
3581 Some(wc) => [("workload_class", wc.clone())].into(),
3582 None => BTreeMap::new(),
3583 };
3584
3585 let key = (histogram_period, bucket, labels);
3586 *stash.entry(key).or_default() += Diff::ONE;
3587 }
3588 }
3589
3590 self.maybe_record_wallclock_lag();
3592 }
3593
3594 fn maybe_record_wallclock_lag(&mut self) {
3602 if self.read_only {
3603 return;
3604 }
3605
3606 let duration_trunc = |datetime: DateTime<_>, interval| {
3607 let td = TimeDelta::from_std(interval).ok()?;
3608 datetime.duration_trunc(td).ok()
3609 };
3610
3611 let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(self.config.config_set());
3612 let now_dt = mz_ore::now::to_datetime((self.now)());
3613 let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
3614 soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
3615 let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
3616 duration_trunc(now_dt, *default).unwrap()
3617 });
3618 if now_trunc <= self.wallclock_lag_last_recorded {
3619 return;
3620 }
3621
3622 let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
3623
3624 let mut history_updates = Vec::new();
3625 let mut histogram_updates = Vec::new();
3626 let mut row_buf = Row::default();
3627 for frontiers in self.storage_collections.active_collection_frontiers() {
3628 let id = frontiers.id;
3629 let Some(collection) = self.collections.get_mut(&id) else {
3630 continue;
3631 };
3632
3633 let max_lag = std::mem::replace(&mut collection.wallclock_lag_max, WallclockLag::MIN);
3634 let row = Row::pack_slice(&[
3635 Datum::String(&id.to_string()),
3636 Datum::Null,
3637 max_lag.into_interval_datum(),
3638 Datum::TimestampTz(now_ts),
3639 ]);
3640 history_updates.push((row, Diff::ONE));
3641
3642 let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
3643 continue;
3644 };
3645
3646 for ((period, lag, labels), count) in std::mem::take(stash) {
3647 let mut packer = row_buf.packer();
3648 packer.extend([
3649 Datum::TimestampTz(period.start),
3650 Datum::TimestampTz(period.end),
3651 Datum::String(&id.to_string()),
3652 lag.into_uint64_datum(),
3653 ]);
3654 let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
3655 packer.push_dict(labels);
3656
3657 histogram_updates.push((row_buf.clone(), count));
3658 }
3659 }
3660
3661 if !history_updates.is_empty() {
3662 self.append_introspection_updates(
3663 IntrospectionType::WallclockLagHistory,
3664 history_updates,
3665 );
3666 }
3667 if !histogram_updates.is_empty() {
3668 self.append_introspection_updates(
3669 IntrospectionType::WallclockLagHistogram,
3670 histogram_updates,
3671 );
3672 }
3673
3674 self.wallclock_lag_last_recorded = now_trunc;
3675 }
3676
3677 fn maintain(&mut self) {
3682 self.update_frontier_introspection();
3683 self.refresh_wallclock_lag();
3684
3685 for instance in self.instances.values_mut() {
3687 instance.refresh_state_metrics();
3688 }
3689 }
3690}
3691
3692impl From<&IntrospectionType> for CollectionManagerKind {
3693 fn from(value: &IntrospectionType) -> Self {
3694 match value {
3695 IntrospectionType::ShardMapping
3696 | IntrospectionType::Frontiers
3697 | IntrospectionType::ReplicaFrontiers
3698 | IntrospectionType::StorageSourceStatistics
3699 | IntrospectionType::StorageSinkStatistics
3700 | IntrospectionType::ComputeDependencies
3701 | IntrospectionType::ComputeOperatorHydrationStatus
3702 | IntrospectionType::ComputeMaterializedViewRefreshes
3703 | IntrospectionType::ComputeErrorCounts
3704 | IntrospectionType::ComputeHydrationTimes => CollectionManagerKind::Differential,
3705
3706 IntrospectionType::SourceStatusHistory
3707 | IntrospectionType::SinkStatusHistory
3708 | IntrospectionType::PrivatelinkConnectionStatusHistory
3709 | IntrospectionType::ReplicaStatusHistory
3710 | IntrospectionType::ReplicaMetricsHistory
3711 | IntrospectionType::WallclockLagHistory
3712 | IntrospectionType::WallclockLagHistogram
3713 | IntrospectionType::PreparedStatementHistory
3714 | IntrospectionType::StatementExecutionHistory
3715 | IntrospectionType::SessionHistory
3716 | IntrospectionType::StatementLifecycleHistory
3717 | IntrospectionType::SqlText => CollectionManagerKind::AppendOnly,
3718 }
3719 }
3720}
3721
3722async fn snapshot_statistics<T>(
3728 id: GlobalId,
3729 upper: Antichain<T>,
3730 storage_collections: &Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
3731) -> Vec<Row>
3732where
3733 T: Codec64 + From<EpochMillis> + TimestampManipulation,
3734{
3735 match upper.as_option() {
3736 Some(f) if f > &T::minimum() => {
3737 let as_of = f.step_back().unwrap();
3738
3739 let snapshot = storage_collections.snapshot(id, as_of).await.unwrap();
3740 snapshot
3741 .into_iter()
3742 .map(|(row, diff)| {
3743 assert_eq!(diff, 1);
3744 row
3745 })
3746 .collect()
3747 }
3748 _ => Vec::new(),
3751 }
3752}
3753
3754async fn read_handle_for_snapshot<T>(
3755 persist: &PersistClientCache,
3756 id: GlobalId,
3757 metadata: &CollectionMetadata,
3758) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>>
3759where
3760 T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
3761{
3762 let persist_client = persist
3763 .open(metadata.persist_location.clone())
3764 .await
3765 .unwrap();
3766
3767 let read_handle = persist_client
3772 .open_leased_reader::<SourceData, (), _, _>(
3773 metadata.data_shard,
3774 Arc::new(metadata.relation_desc.clone()),
3775 Arc::new(UnitSchema),
3776 Diagnostics {
3777 shard_name: id.to_string(),
3778 handle_purpose: format!("snapshot {}", id),
3779 },
3780 USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3781 )
3782 .await
3783 .expect("invalid persist usage");
3784 Ok(read_handle)
3785}
3786
3787#[derive(Debug)]
3789struct CollectionState<T: TimelyTimestamp> {
3790 pub data_source: DataSource<T>,
3792
3793 pub collection_metadata: CollectionMetadata,
3794
3795 pub extra_state: CollectionStateExtra<T>,
3796
3797 wallclock_lag_max: WallclockLag,
3799 wallclock_lag_histogram_stash: Option<
3806 BTreeMap<
3807 (
3808 WallclockLagHistogramPeriod,
3809 WallclockLag,
3810 BTreeMap<&'static str, String>,
3811 ),
3812 Diff,
3813 >,
3814 >,
3815 wallclock_lag_metrics: WallclockLagMetrics,
3817}
3818
3819impl<T: TimelyTimestamp> CollectionState<T> {
3820 fn new(
3821 data_source: DataSource<T>,
3822 collection_metadata: CollectionMetadata,
3823 extra_state: CollectionStateExtra<T>,
3824 wallclock_lag_metrics: WallclockLagMetrics,
3825 ) -> Self {
3826 let wallclock_lag_histogram_stash = match &data_source {
3830 DataSource::Other => None,
3831 _ => Some(Default::default()),
3832 };
3833
3834 Self {
3835 data_source,
3836 collection_metadata,
3837 extra_state,
3838 wallclock_lag_max: WallclockLag::MIN,
3839 wallclock_lag_histogram_stash,
3840 wallclock_lag_metrics,
3841 }
3842 }
3843}
3844
3845#[derive(Debug)]
3847enum CollectionStateExtra<T: TimelyTimestamp> {
3848 Ingestion(IngestionState<T>),
3849 Export(ExportState<T>),
3850 None,
3851}
3852
3853#[derive(Debug)]
3855struct IngestionState<T: TimelyTimestamp> {
3856 pub read_capabilities: MutableAntichain<T>,
3858
3859 pub derived_since: Antichain<T>,
3862
3863 pub dependency_read_holds: Vec<ReadHold<T>>,
3865
3866 pub write_frontier: Antichain<T>,
3868
3869 pub hold_policy: ReadPolicy<T>,
3876
3877 pub instance_id: StorageInstanceId,
3879
3880 pub hydrated_on: BTreeSet<ReplicaId>,
3882}
3883
3884struct StatusHistoryDesc<K> {
3889 retention_policy: StatusHistoryRetentionPolicy,
3890 extract_key: Box<dyn Fn(&[Datum]) -> K + Send>,
3891 extract_time: Box<dyn Fn(&[Datum]) -> CheckedTimestamp<DateTime<Utc>> + Send>,
3892}
3893enum StatusHistoryRetentionPolicy {
3894 LastN(usize),
3896 TimeWindow(Duration),
3898}
3899
3900fn source_status_history_desc(
3901 params: &StorageParameters,
3902) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3903 let desc = &MZ_SOURCE_STATUS_HISTORY_DESC;
3904 let (source_id_idx, _) = desc.get_by_name(&"source_id".into()).expect("exists");
3905 let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3906 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3907
3908 StatusHistoryDesc {
3909 retention_policy: StatusHistoryRetentionPolicy::LastN(
3910 params.keep_n_source_status_history_entries,
3911 ),
3912 extract_key: Box::new(move |datums| {
3913 (
3914 GlobalId::from_str(datums[source_id_idx].unwrap_str()).expect("GlobalId column"),
3915 if datums[replica_id_idx].is_null() {
3916 None
3917 } else {
3918 Some(
3919 ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3920 .expect("ReplicaId column"),
3921 )
3922 },
3923 )
3924 }),
3925 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3926 }
3927}
3928
3929fn sink_status_history_desc(
3930 params: &StorageParameters,
3931) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3932 let desc = &MZ_SINK_STATUS_HISTORY_DESC;
3933 let (sink_id_idx, _) = desc.get_by_name(&"sink_id".into()).expect("exists");
3934 let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3935 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3936
3937 StatusHistoryDesc {
3938 retention_policy: StatusHistoryRetentionPolicy::LastN(
3939 params.keep_n_sink_status_history_entries,
3940 ),
3941 extract_key: Box::new(move |datums| {
3942 (
3943 GlobalId::from_str(datums[sink_id_idx].unwrap_str()).expect("GlobalId column"),
3944 if datums[replica_id_idx].is_null() {
3945 None
3946 } else {
3947 Some(
3948 ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3949 .expect("ReplicaId column"),
3950 )
3951 },
3952 )
3953 }),
3954 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3955 }
3956}
3957
3958fn privatelink_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<GlobalId> {
3959 let desc = &MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC;
3960 let (key_idx, _) = desc.get_by_name(&"connection_id".into()).expect("exists");
3961 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3962
3963 StatusHistoryDesc {
3964 retention_policy: StatusHistoryRetentionPolicy::LastN(
3965 params.keep_n_privatelink_status_history_entries,
3966 ),
3967 extract_key: Box::new(move |datums| {
3968 GlobalId::from_str(datums[key_idx].unwrap_str()).expect("GlobalId column")
3969 }),
3970 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3971 }
3972}
3973
3974fn replica_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<(GlobalId, u64)> {
3975 let desc = &REPLICA_STATUS_HISTORY_DESC;
3976 let (replica_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3977 let (process_idx, _) = desc.get_by_name(&"process_id".into()).expect("exists");
3978 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3979
3980 StatusHistoryDesc {
3981 retention_policy: StatusHistoryRetentionPolicy::TimeWindow(
3982 params.replica_status_history_retention_window,
3983 ),
3984 extract_key: Box::new(move |datums| {
3985 (
3986 GlobalId::from_str(datums[replica_idx].unwrap_str()).expect("GlobalId column"),
3987 datums[process_idx].unwrap_uint64(),
3988 )
3989 }),
3990 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3991 }
3992}
3993
3994fn swap_updates<T: Timestamp>(
3996 from: &mut Antichain<T>,
3997 mut replace_with: Antichain<T>,
3998) -> ChangeBatch<T> {
3999 let mut update = ChangeBatch::new();
4000 if PartialOrder::less_equal(from, &replace_with) {
4001 update.extend(replace_with.iter().map(|time| (time.clone(), 1)));
4002 std::mem::swap(from, &mut replace_with);
4003 update.extend(replace_with.iter().map(|time| (time.clone(), -1)));
4004 }
4005 update
4006}