1use std::any::Any;
13use std::collections::btree_map;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt::{Debug, Display};
16use std::str::FromStr;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use async_trait::async_trait;
21use chrono::{DateTime, DurationRound, TimeDelta, Utc};
22use derivative::Derivative;
23use differential_dataflow::lattice::Lattice;
24use futures::FutureExt;
25use futures::StreamExt;
26use itertools::Itertools;
27use mz_build_info::BuildInfo;
28use mz_cluster_client::client::ClusterReplicaLocation;
29use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
30use mz_cluster_client::{ReplicaId, WallclockLagFn};
31use mz_controller_types::dyncfgs::{
32 ENABLE_0DT_DEPLOYMENT_SOURCES, ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION,
33 WALLCLOCK_LAG_RECORDING_INTERVAL,
34};
35use mz_ore::collections::CollectionExt;
36use mz_ore::metrics::MetricsRegistry;
37use mz_ore::now::{EpochMillis, NowFn};
38use mz_ore::task::AbortOnDropHandle;
39use mz_ore::{assert_none, halt, instrument, soft_panic_or_log};
40use mz_persist_client::batch::ProtoBatch;
41use mz_persist_client::cache::PersistClientCache;
42use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
43use mz_persist_client::read::ReadHandle;
44use mz_persist_client::schema::CaESchema;
45use mz_persist_client::write::WriteHandle;
46use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
47use mz_persist_types::Codec64;
48use mz_persist_types::codec_impls::UnitSchema;
49use mz_proto::RustType;
50use mz_repr::adt::timestamp::CheckedTimestamp;
51use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
52use mz_storage_client::client::{
53 AppendOnlyUpdate, ProtoStorageCommand, ProtoStorageResponse, RunIngestionCommand,
54 RunOneshotIngestion, RunSinkCommand, Status, StatusUpdate, StorageCommand, StorageResponse,
55 TableData,
56};
57use mz_storage_client::controller::{
58 BoxFuture, CollectionDescription, DataSource, ExportDescription, ExportState,
59 IntrospectionType, MonotonicAppender, PersistEpoch, Response, StorageController,
60 StorageMetadata, StorageTxn, StorageWriteOp, WallclockLag, WallclockLagHistogramPeriod,
61};
62use mz_storage_client::healthcheck::{
63 MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC,
64 MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_STATUS_HISTORY_DESC,
65};
66use mz_storage_client::metrics::StorageControllerMetrics;
67use mz_storage_client::statistics::{
68 ControllerSinkStatistics, ControllerSourceStatistics, WebhookStatistics,
69};
70use mz_storage_client::storage_collections::StorageCollections;
71use mz_storage_types::configuration::StorageConfiguration;
72use mz_storage_types::connections::ConnectionContext;
73use mz_storage_types::connections::inline::InlinedConnection;
74use mz_storage_types::controller::{AlterError, CollectionMetadata, StorageError, TxnsCodecRow};
75use mz_storage_types::instances::StorageInstanceId;
76use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
77use mz_storage_types::parameters::StorageParameters;
78use mz_storage_types::read_holds::ReadHold;
79use mz_storage_types::read_policy::ReadPolicy;
80use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
81use mz_storage_types::sources::{
82 GenericSourceConnection, IngestionDescription, SourceConnection, SourceData, SourceDesc,
83 SourceExport, SourceExportDataConfig,
84};
85use mz_storage_types::{AlterCompatible, StorageDiff, dyncfgs};
86use mz_txn_wal::metrics::Metrics as TxnMetrics;
87use mz_txn_wal::txn_read::TxnsRead;
88use mz_txn_wal::txns::TxnsHandle;
89use timely::order::{PartialOrder, TotalOrder};
90use timely::progress::Timestamp as TimelyTimestamp;
91use timely::progress::frontier::MutableAntichain;
92use timely::progress::{Antichain, ChangeBatch, Timestamp};
93use tokio::sync::watch::{Sender, channel};
94use tokio::sync::{mpsc, oneshot};
95use tokio::time::MissedTickBehavior;
96use tokio::time::error::Elapsed;
97use tracing::{debug, info, warn};
98
99use crate::collection_mgmt::{
100 AppendOnlyIntrospectionConfig, CollectionManagerKind, DifferentialIntrospectionConfig,
101};
102use crate::instance::{Instance, ReplicaConfig};
103
104mod collection_mgmt;
105mod history;
106mod instance;
107mod persist_handles;
108mod rtr;
109mod statistics;
110
111#[derive(Derivative)]
112#[derivative(Debug)]
113struct PendingOneshotIngestion {
114 #[derivative(Debug = "ignore")]
116 result_tx: OneshotResultCallback<ProtoBatch>,
117 cluster_id: StorageInstanceId,
119}
120
121impl PendingOneshotIngestion {
122 pub(crate) fn cancel(self) {
126 (self.result_tx)(vec![Err("canceled".to_string())])
127 }
128}
129
130#[derive(Derivative)]
132#[derivative(Debug)]
133pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation>
134{
135 build_info: &'static BuildInfo,
137 now: NowFn,
139
140 read_only: bool,
146
147 pub(crate) collections: BTreeMap<GlobalId, CollectionState<T>>,
152
153 dropped_objects: BTreeMap<GlobalId, BTreeSet<ReplicaId>>,
162
163 pub(crate) persist_table_worker: persist_handles::PersistTableWriteWorker<T>,
165 txns_read: TxnsRead<T>,
167 txns_metrics: Arc<TxnMetrics>,
168 stashed_responses: Vec<(Option<ReplicaId>, StorageResponse<T>)>,
169 #[derivative(Debug = "ignore")]
171 pending_table_handle_drops_tx: mpsc::UnboundedSender<GlobalId>,
172 #[derivative(Debug = "ignore")]
174 pending_table_handle_drops_rx: mpsc::UnboundedReceiver<GlobalId>,
175 #[derivative(Debug = "ignore")]
177 pending_oneshot_ingestions: BTreeMap<uuid::Uuid, PendingOneshotIngestion>,
178
179 pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
181
182 pub(crate) introspection_ids: BTreeMap<IntrospectionType, GlobalId>,
184 introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
189
190 source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
195 sink_statistics: Arc<Mutex<BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSinkStatistics>>>,
198 statistics_interval_sender: Sender<Duration>,
200
201 instances: BTreeMap<StorageInstanceId, Instance<T>>,
203 initialized: bool,
205 config: StorageConfiguration,
207 persist_location: PersistLocation,
209 persist: Arc<PersistClientCache>,
211 metrics: StorageControllerMetrics,
213 recorded_frontiers: BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)>,
216 recorded_replica_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<T>>,
219
220 #[derivative(Debug = "ignore")]
222 wallclock_lag: WallclockLagFn<T>,
223 wallclock_lag_last_recorded: DateTime<Utc>,
225
226 storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
228 migrated_storage_collections: BTreeSet<GlobalId>,
230
231 maintenance_ticker: tokio::time::Interval,
233 maintenance_scheduled: bool,
235
236 instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
238 instance_response_rx: mpsc::UnboundedReceiver<(Option<ReplicaId>, StorageResponse<T>)>,
240
241 persist_warm_task: Option<AbortOnDropHandle<Box<dyn Debug + Send>>>,
243}
244
245fn warm_persist_state_in_background(
250 client: PersistClient,
251 shard_ids: impl Iterator<Item = ShardId> + Send + 'static,
252) -> mz_ore::task::JoinHandle<Box<dyn Debug + Send>> {
253 const MAX_CONCURRENT_WARMS: usize = 16;
255 let logic = async move {
256 let fetchers: Vec<_> = tokio_stream::iter(shard_ids)
257 .map(|shard_id| {
258 let client = client.clone();
259 async move {
260 client
261 .create_batch_fetcher::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
262 shard_id,
263 Arc::new(RelationDesc::empty()),
264 Arc::new(UnitSchema),
265 true,
266 Diagnostics::from_purpose("warm persist load state"),
267 )
268 .await
269 }
270 })
271 .buffer_unordered(MAX_CONCURRENT_WARMS)
272 .collect()
273 .await;
274 let fetchers: Box<dyn Debug + Send> = Box::new(fetchers);
275 fetchers
276 };
277 mz_ore::task::spawn(|| "warm_persist_load_state", logic)
278}
279
280#[async_trait(?Send)]
281impl<T> StorageController for Controller<T>
282where
283 T: Timestamp
284 + Lattice
285 + TotalOrder
286 + Codec64
287 + From<EpochMillis>
288 + TimestampManipulation
289 + Into<Datum<'static>>
290 + Display,
291 StorageCommand<T>: RustType<ProtoStorageCommand>,
292 StorageResponse<T>: RustType<ProtoStorageResponse>,
293{
294 type Timestamp = T;
295
296 fn initialization_complete(&mut self) {
297 self.reconcile_dangling_statistics();
298 self.initialized = true;
299
300 for instance in self.instances.values_mut() {
301 instance.send(StorageCommand::InitializationComplete);
302 }
303 }
304
305 fn update_parameters(&mut self, config_params: StorageParameters) {
306 self.storage_collections
307 .update_parameters(config_params.clone());
308
309 self.persist.cfg().apply_from(&config_params.dyncfg_updates);
312
313 for instance in self.instances.values_mut() {
314 let params = Box::new(config_params.clone());
315 instance.send(StorageCommand::UpdateConfiguration(params));
316 }
317 self.config.update(config_params);
318 self.statistics_interval_sender
319 .send_replace(self.config.parameters.statistics_interval);
320 self.collection_manager.update_user_batch_duration(
321 self.config
322 .parameters
323 .user_storage_managed_collections_batch_duration,
324 );
325 }
326
327 fn config(&self) -> &StorageConfiguration {
329 &self.config
330 }
331
332 fn collection_metadata(
333 &self,
334 id: GlobalId,
335 ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>> {
336 self.storage_collections.collection_metadata(id)
337 }
338
339 fn collection_hydrated(
340 &self,
341 collection_id: GlobalId,
342 ) -> Result<bool, StorageError<Self::Timestamp>> {
343 let collection = self.collection(collection_id)?;
344
345 let instance_id = match &collection.data_source {
346 DataSource::Ingestion(ingestion_description) => ingestion_description.instance_id,
347 DataSource::IngestionExport { ingestion_id, .. } => {
348 let ingestion_state = self.collections.get(ingestion_id).expect("known to exist");
349
350 let instance_id = match &ingestion_state.data_source {
351 DataSource::Ingestion(ingestion_desc) => ingestion_desc.instance_id,
352 _ => unreachable!("SourceExport must only refer to primary source"),
353 };
354
355 instance_id
356 }
357 _ => return Ok(true),
358 };
359
360 let instance = self.instances.get(&instance_id).ok_or_else(|| {
361 StorageError::IngestionInstanceMissing {
362 storage_instance_id: instance_id,
363 ingestion_id: collection_id,
364 }
365 })?;
366
367 if instance.replica_ids().next().is_none() {
368 return Ok(true);
371 }
372
373 match &collection.extra_state {
374 CollectionStateExtra::Ingestion(ingestion_state) => {
375 Ok(ingestion_state.hydrated_on.len() >= 1)
377 }
378 CollectionStateExtra::Export(_) => {
379 Ok(true)
384 }
385 CollectionStateExtra::None => {
386 Ok(true)
390 }
391 }
392 }
393
394 #[mz_ore::instrument(level = "debug")]
395 fn collections_hydrated_on_replicas(
396 &self,
397 target_replica_ids: Option<Vec<ReplicaId>>,
398 target_cluster_id: &StorageInstanceId,
399 exclude_collections: &BTreeSet<GlobalId>,
400 ) -> Result<bool, StorageError<Self::Timestamp>> {
401 if target_replica_ids.as_ref().is_some_and(|v| v.is_empty()) {
404 return Ok(true);
405 }
406
407 let target_replicas: Option<BTreeSet<ReplicaId>> =
410 target_replica_ids.map(|ids| ids.into_iter().collect());
411
412 let mut all_hydrated = true;
413 for (collection_id, collection_state) in self.collections.iter() {
414 if collection_id.is_transient() || exclude_collections.contains(collection_id) {
415 continue;
416 }
417 let hydrated = match &collection_state.extra_state {
418 CollectionStateExtra::Ingestion(state) => {
419 if &state.instance_id != target_cluster_id {
420 continue;
421 }
422 match &target_replicas {
423 Some(target_replicas) => !state.hydrated_on.is_disjoint(target_replicas),
424 None => {
425 state.hydrated_on.len() >= 1
428 }
429 }
430 }
431 CollectionStateExtra::Export(_) => {
432 true
437 }
438 CollectionStateExtra::None => {
439 true
443 }
444 };
445 if !hydrated {
446 tracing::info!(%collection_id, "collection is not hydrated on any replica");
447 all_hydrated = false;
448 }
451 }
452 Ok(all_hydrated)
453 }
454
455 fn collection_frontiers(
456 &self,
457 id: GlobalId,
458 ) -> Result<
459 (Antichain<Self::Timestamp>, Antichain<Self::Timestamp>),
460 StorageError<Self::Timestamp>,
461 > {
462 let frontiers = self.storage_collections.collection_frontiers(id)?;
463 Ok((frontiers.implied_capability, frontiers.write_frontier))
464 }
465
466 fn collections_frontiers(
467 &self,
468 mut ids: Vec<GlobalId>,
469 ) -> Result<Vec<(GlobalId, Antichain<T>, Antichain<T>)>, StorageError<Self::Timestamp>> {
470 let mut result = vec![];
471 ids.retain(|&id| match self.export(id) {
476 Ok(export) => {
477 result.push((
478 id,
479 export.input_hold().since().clone(),
480 export.write_frontier.clone(),
481 ));
482 false
483 }
484 Err(_) => true,
485 });
486 result.extend(
487 self.storage_collections
488 .collections_frontiers(ids)?
489 .into_iter()
490 .map(|frontiers| {
491 (
492 frontiers.id,
493 frontiers.implied_capability,
494 frontiers.write_frontier,
495 )
496 }),
497 );
498
499 Ok(result)
500 }
501
502 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
503 self.storage_collections.active_collection_metadatas()
504 }
505
506 fn active_ingestions(
507 &self,
508 instance_id: StorageInstanceId,
509 ) -> Box<dyn Iterator<Item = &GlobalId> + '_> {
510 Box::new(self.instances[&instance_id].active_ingestions())
511 }
512
513 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
514 self.storage_collections.check_exists(id)
515 }
516
517 fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>) {
518 let metrics = self.metrics.for_instance(id);
519 let mut instance = Instance::new(
520 workload_class,
521 metrics,
522 Arc::clone(self.config().config_set()),
523 self.now.clone(),
524 self.instance_response_tx.clone(),
525 );
526 if self.initialized {
527 instance.send(StorageCommand::InitializationComplete);
528 }
529 if !self.read_only {
530 instance.send(StorageCommand::AllowWrites);
531 }
532
533 let params = Box::new(self.config.parameters.clone());
534 instance.send(StorageCommand::UpdateConfiguration(params));
535
536 let old_instance = self.instances.insert(id, instance);
537 assert_none!(old_instance, "storage instance {id} already exists");
538 }
539
540 fn drop_instance(&mut self, id: StorageInstanceId) {
541 let instance = self.instances.remove(&id);
542 assert!(instance.is_some(), "storage instance {id} does not exist");
543 }
544
545 fn update_instance_workload_class(
546 &mut self,
547 id: StorageInstanceId,
548 workload_class: Option<String>,
549 ) {
550 let instance = self
551 .instances
552 .get_mut(&id)
553 .unwrap_or_else(|| panic!("instance {id} does not exist"));
554
555 instance.workload_class = workload_class;
556 }
557
558 fn connect_replica(
559 &mut self,
560 instance_id: StorageInstanceId,
561 replica_id: ReplicaId,
562 location: ClusterReplicaLocation,
563 enable_ctp: bool,
564 ) {
565 let instance = self
566 .instances
567 .get_mut(&instance_id)
568 .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
569
570 let config = ReplicaConfig {
571 build_info: self.build_info,
572 location,
573 grpc_client: self.config.parameters.grpc_client.clone(),
574 enable_ctp,
575 };
576 instance.add_replica(replica_id, config);
577 }
578
579 fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId) {
580 let instance = self
581 .instances
582 .get_mut(&instance_id)
583 .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
584
585 let status_now = mz_ore::now::to_datetime((self.now)());
586 let mut source_status_updates = vec![];
587 let mut sink_status_updates = vec![];
588
589 let make_update = |id, object_type| StatusUpdate {
590 id,
591 status: Status::Paused,
592 timestamp: status_now,
593 error: None,
594 hints: BTreeSet::from([format!(
595 "The replica running this {object_type} has been dropped"
596 )]),
597 namespaced_errors: Default::default(),
598 replica_id: Some(replica_id),
599 };
600
601 for id in instance.active_ingestions() {
602 if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
603 active_replicas.remove(&replica_id);
604 if active_replicas.is_empty() {
605 self.dropped_objects.remove(id);
606 }
607 }
608
609 let ingestion = self
610 .collections
611 .get_mut(id)
612 .expect("instance contains unknown ingestion");
613
614 let ingestion_description = match &ingestion.data_source {
615 DataSource::Ingestion(ingestion_description) => ingestion_description.clone(),
616 _ => panic!(
617 "unexpected data source for ingestion: {:?}",
618 ingestion.data_source
619 ),
620 };
621
622 let subsource_ids = ingestion_description
632 .collection_ids()
633 .filter(|id| id != &ingestion_description.remap_collection_id);
634 for id in subsource_ids {
635 source_status_updates.push(make_update(id, "source"));
636 }
637 }
638
639 for id in instance.active_exports() {
640 if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
641 active_replicas.remove(&replica_id);
642 if active_replicas.is_empty() {
643 self.dropped_objects.remove(id);
644 }
645 }
646
647 sink_status_updates.push(make_update(*id, "sink"));
648 }
649
650 instance.drop_replica(replica_id);
651
652 if !self.read_only {
653 if !source_status_updates.is_empty() {
654 self.append_status_introspection_updates(
655 IntrospectionType::SourceStatusHistory,
656 source_status_updates,
657 );
658 }
659 if !sink_status_updates.is_empty() {
660 self.append_status_introspection_updates(
661 IntrospectionType::SinkStatusHistory,
662 sink_status_updates,
663 );
664 }
665 }
666 }
667
668 async fn evolve_nullability_for_bootstrap(
669 &mut self,
670 storage_metadata: &StorageMetadata,
671 collections: Vec<(GlobalId, RelationDesc)>,
672 ) -> Result<(), StorageError<Self::Timestamp>> {
673 let persist_client = self
674 .persist
675 .open(self.persist_location.clone())
676 .await
677 .unwrap();
678
679 for (global_id, relation_desc) in collections {
680 let shard_id = storage_metadata.get_collection_shard(global_id)?;
681 let diagnostics = Diagnostics {
682 shard_name: global_id.to_string(),
683 handle_purpose: "evolve nullability for bootstrap".to_string(),
684 };
685 let latest_schema = persist_client
686 .latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
687 .await
688 .expect("invalid persist usage");
689 let Some((schema_id, current_schema, _)) = latest_schema else {
690 tracing::debug!(?global_id, "no schema registered");
691 continue;
692 };
693 tracing::debug!(?global_id, ?current_schema, new_schema = ?relation_desc, "migrating schema");
694
695 let diagnostics = Diagnostics {
696 shard_name: global_id.to_string(),
697 handle_purpose: "evolve nullability for bootstrap".to_string(),
698 };
699 let evolve_result = persist_client
700 .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
701 shard_id,
702 schema_id,
703 &relation_desc,
704 &UnitSchema,
705 diagnostics,
706 )
707 .await
708 .expect("invalid persist usage");
709 match evolve_result {
710 CaESchema::Ok(_) => (),
711 CaESchema::ExpectedMismatch {
712 schema_id,
713 key,
714 val: _,
715 } => {
716 return Err(StorageError::PersistSchemaEvolveRace {
717 global_id,
718 shard_id,
719 schema_id,
720 relation_desc: key,
721 });
722 }
723 CaESchema::Incompatible => {
724 return Err(StorageError::PersistInvalidSchemaEvolve {
725 global_id,
726 shard_id,
727 });
728 }
729 };
730 }
731
732 Ok(())
733 }
734
735 #[instrument(name = "storage::create_collections")]
754 async fn create_collections_for_bootstrap(
755 &mut self,
756 storage_metadata: &StorageMetadata,
757 register_ts: Option<Self::Timestamp>,
758 mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
759 migrated_storage_collections: &BTreeSet<GlobalId>,
760 ) -> Result<(), StorageError<Self::Timestamp>> {
761 self.migrated_storage_collections
762 .extend(migrated_storage_collections.iter().cloned());
763
764 self.storage_collections
765 .create_collections_for_bootstrap(
766 storage_metadata,
767 register_ts.clone(),
768 collections.clone(),
769 migrated_storage_collections,
770 )
771 .await?;
772
773 drop(self.persist_warm_task.take());
776
777 collections.sort_by_key(|(id, _)| *id);
782 collections.dedup();
783 for pos in 1..collections.len() {
784 if collections[pos - 1].0 == collections[pos].0 {
785 return Err(StorageError::CollectionIdReused(collections[pos].0));
786 }
787 }
788
789 let enriched_with_metadata = collections
791 .into_iter()
792 .map(|(id, description)| {
793 let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
794
795 let get_shard = |id| -> Result<ShardId, StorageError<T>> {
796 let shard = storage_metadata.get_collection_shard::<T>(id)?;
797 Ok(shard)
798 };
799
800 let remap_shard = match &description.data_source {
801 DataSource::Ingestion(IngestionDescription {
803 remap_collection_id,
804 ..
805 }) => {
806 Some(get_shard(*remap_collection_id)?)
809 }
810 _ => None,
811 };
812
813 let txns_shard = description
816 .data_source
817 .in_txns()
818 .then(|| *self.txns_read.txns_id());
819
820 let metadata = CollectionMetadata {
821 persist_location: self.persist_location.clone(),
822 remap_shard,
823 data_shard,
824 relation_desc: description.desc.clone(),
825 txns_shard,
826 };
827
828 Ok((id, description, metadata))
829 })
830 .collect_vec();
831
832 let persist_client = self
834 .persist
835 .open(self.persist_location.clone())
836 .await
837 .unwrap();
838 let persist_client = &persist_client;
839
840 use futures::stream::{StreamExt, TryStreamExt};
843 let this = &*self;
844 let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
845 .map(|data: Result<_, StorageError<Self::Timestamp>>| {
846 async move {
847 let (id, description, metadata) = data?;
848
849 debug!(
852 "mapping GlobalId={} to remap shard ({:?}), data shard ({})",
853 id, metadata.remap_shard, metadata.data_shard
854 );
855
856 let write = this
857 .open_data_handles(
858 &id,
859 metadata.data_shard,
860 metadata.relation_desc.clone(),
861 persist_client,
862 )
863 .await;
864
865 Ok::<_, StorageError<T>>((id, description, write, metadata))
866 }
867 })
868 .buffer_unordered(50)
870 .try_collect()
883 .await?;
884
885 let mut to_execute = BTreeSet::new();
888 let mut new_collections = BTreeSet::new();
893 let mut table_registers = Vec::with_capacity(to_register.len());
894
895 to_register.sort_by_key(|(id, ..)| *id);
897
898 let (tables_to_register, collections_to_register): (Vec<_>, Vec<_>) = to_register
904 .into_iter()
905 .partition(|(_id, desc, ..)| matches!(desc.data_source, DataSource::Table { .. }));
906 let to_register = tables_to_register
907 .into_iter()
908 .rev()
909 .chain(collections_to_register.into_iter());
910
911 let mut new_source_statistic_entries = BTreeSet::new();
915 let mut new_webhook_statistic_entries = BTreeSet::new();
916 let mut new_sink_statistic_entries = BTreeSet::new();
917
918 for (id, description, write, metadata) in to_register {
919 let is_in_txns = |id, metadata: &CollectionMetadata| {
920 metadata.txns_shard.is_some()
921 && !(self.read_only && migrated_storage_collections.contains(&id))
922 };
923
924 let mut data_source = description.data_source;
925
926 to_execute.insert(id);
927 new_collections.insert(id);
928
929 if let DataSource::Ingestion(ingestion) = &mut data_source {
934 let export = ingestion.desc.primary_source_export();
935 ingestion.source_exports.insert(id, export);
936 }
937
938 let write_frontier = write.upper();
939
940 let storage_dependencies = self.determine_collection_dependencies(id, &data_source)?;
942
943 let dependency_read_holds = self
944 .storage_collections
945 .acquire_read_holds(storage_dependencies)
946 .expect("can acquire read holds");
947
948 let mut dependency_since = Antichain::from_elem(T::minimum());
949 for read_hold in dependency_read_holds.iter() {
950 dependency_since.join_assign(read_hold.since());
951 }
952
953 if !dependency_read_holds.is_empty()
962 && !is_in_txns(id, &metadata)
963 && !matches!(&data_source, DataSource::Sink { .. })
964 {
965 if dependency_since.is_empty() {
971 halt!(
972 "dependency since frontier is empty while dependent upper \
973 is not empty (dependent id={id}, write_frontier={:?}, dependency_read_holds={:?}), \
974 this indicates concurrent deletion of a collection",
975 write_frontier,
976 dependency_read_holds,
977 );
978 }
979
980 mz_ore::soft_assert_or_log!(
998 write_frontier.elements() == &[T::minimum()]
999 || write_frontier.is_empty()
1000 || PartialOrder::less_than(&dependency_since, write_frontier),
1001 "dependency since has advanced past dependent ({id}) upper \n
1002 dependent ({id}): upper {:?} \n
1003 dependency since {:?} \n
1004 dependency read holds: {:?}",
1005 write_frontier,
1006 dependency_since,
1007 dependency_read_holds,
1008 );
1009 }
1010
1011 let mut extra_state = CollectionStateExtra::None;
1013 let mut maybe_instance_id = None;
1014 match &data_source {
1015 DataSource::Introspection(typ) => {
1016 debug!(
1017 ?data_source, meta = ?metadata,
1018 "registering {id} with persist monotonic worker",
1019 );
1020 self.register_introspection_collection(
1026 id,
1027 *typ,
1028 write,
1029 persist_client.clone(),
1030 )?;
1031 }
1032 DataSource::Webhook => {
1033 debug!(
1034 ?data_source, meta = ?metadata,
1035 "registering {id} with persist monotonic worker",
1036 );
1037 new_source_statistic_entries.insert(id);
1038 new_webhook_statistic_entries.insert(id);
1041 self.collection_manager
1047 .register_append_only_collection(id, write, false, None);
1048 }
1049 DataSource::IngestionExport {
1050 ingestion_id,
1051 details,
1052 data_config,
1053 } => {
1054 debug!(
1055 ?data_source, meta = ?metadata,
1056 "not registering {id} with a controller persist worker",
1057 );
1058 let ingestion_state = self
1060 .collections
1061 .get_mut(ingestion_id)
1062 .expect("known to exist");
1063
1064 let instance_id = match &mut ingestion_state.data_source {
1065 DataSource::Ingestion(ingestion_desc) => {
1066 ingestion_desc.source_exports.insert(
1067 id,
1068 SourceExport {
1069 storage_metadata: (),
1070 details: details.clone(),
1071 data_config: data_config.clone(),
1072 },
1073 );
1074
1075 ingestion_desc.instance_id
1080 }
1081 _ => unreachable!(
1082 "SourceExport must only refer to primary sources that already exist"
1083 ),
1084 };
1085
1086 to_execute.remove(&id);
1088 to_execute.insert(*ingestion_id);
1089
1090 let ingestion_state = IngestionState {
1091 read_capabilities: MutableAntichain::from(dependency_since.clone()),
1092 dependency_read_holds,
1093 derived_since: dependency_since,
1094 write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1095 hold_policy: ReadPolicy::step_back(),
1096 instance_id,
1097 hydrated_on: BTreeSet::new(),
1098 };
1099
1100 extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1101 maybe_instance_id = Some(instance_id);
1102
1103 new_source_statistic_entries.insert(id);
1104 }
1105 DataSource::Table { .. } => {
1106 debug!(
1107 ?data_source, meta = ?metadata,
1108 "registering {id} with persist table worker",
1109 );
1110 table_registers.push((id, write));
1111 }
1112 DataSource::Progress | DataSource::Other => {
1113 debug!(
1114 ?data_source, meta = ?metadata,
1115 "not registering {id} with a controller persist worker",
1116 );
1117 }
1118 DataSource::Ingestion(ingestion_desc) => {
1119 debug!(
1120 ?data_source, meta = ?metadata,
1121 "not registering {id} with a controller persist worker",
1122 );
1123
1124 let mut dependency_since = Antichain::from_elem(T::minimum());
1125 for read_hold in dependency_read_holds.iter() {
1126 dependency_since.join_assign(read_hold.since());
1127 }
1128
1129 let ingestion_state = IngestionState {
1130 read_capabilities: MutableAntichain::from(dependency_since.clone()),
1131 dependency_read_holds,
1132 derived_since: dependency_since,
1133 write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1134 hold_policy: ReadPolicy::step_back(),
1135 instance_id: ingestion_desc.instance_id,
1136 hydrated_on: BTreeSet::new(),
1137 };
1138
1139 extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1140 maybe_instance_id = Some(ingestion_desc.instance_id);
1141
1142 new_source_statistic_entries.insert(id);
1143 }
1144 DataSource::Sink { desc } => {
1145 let mut dependency_since = Antichain::from_elem(T::minimum());
1146 for read_hold in dependency_read_holds.iter() {
1147 dependency_since.join_assign(read_hold.since());
1148 }
1149
1150 let [self_hold, read_hold] =
1151 dependency_read_holds.try_into().expect("two holds");
1152
1153 let state = ExportState::new(
1154 desc.instance_id,
1155 read_hold,
1156 self_hold,
1157 write_frontier.clone(),
1158 ReadPolicy::step_back(),
1159 );
1160 maybe_instance_id = Some(state.cluster_id);
1161 extra_state = CollectionStateExtra::Export(state);
1162
1163 new_sink_statistic_entries.insert(id);
1164 }
1165 }
1166
1167 let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(id, maybe_instance_id);
1168 let collection_state =
1169 CollectionState::new(data_source, metadata, extra_state, wallclock_lag_metrics);
1170
1171 self.collections.insert(id, collection_state);
1172 }
1173
1174 {
1175 let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1176
1177 for id in new_webhook_statistic_entries {
1180 source_statistics.webhook_statistics.entry(id).or_default();
1181 }
1182
1183 }
1187
1188 if !table_registers.is_empty() {
1190 let register_ts = register_ts
1191 .expect("caller should have provided a register_ts when creating a table");
1192
1193 if self.read_only {
1194 table_registers
1204 .retain(|(id, _write_handle)| migrated_storage_collections.contains(id));
1205
1206 self.persist_table_worker
1207 .register(register_ts, table_registers)
1208 .await
1209 .expect("table worker unexpectedly shut down");
1210 } else {
1211 self.persist_table_worker
1212 .register(register_ts, table_registers)
1213 .await
1214 .expect("table worker unexpectedly shut down");
1215 }
1216 }
1217
1218 self.append_shard_mappings(new_collections.into_iter(), Diff::ONE);
1219
1220 for id in to_execute {
1222 match &self.collection(id)?.data_source {
1223 DataSource::Ingestion(ingestion) => {
1224 if !self.read_only
1225 || (ENABLE_0DT_DEPLOYMENT_SOURCES.get(self.config.config_set())
1226 && ingestion.desc.connection.supports_read_only())
1227 {
1228 self.run_ingestion(id)?;
1229 }
1230 }
1231 DataSource::IngestionExport { .. } => unreachable!(
1232 "ingestion exports do not execute directly, but instead schedule their source to be re-executed"
1233 ),
1234 DataSource::Introspection(_)
1235 | DataSource::Webhook
1236 | DataSource::Table { .. }
1237 | DataSource::Progress
1238 | DataSource::Other => {}
1239 DataSource::Sink { .. } => {
1240 if !self.read_only {
1241 self.run_export(id)?;
1242 }
1243 }
1244 };
1245 }
1246
1247 Ok(())
1248 }
1249
1250 fn check_alter_ingestion_source_desc(
1251 &mut self,
1252 ingestion_id: GlobalId,
1253 source_desc: &SourceDesc,
1254 ) -> Result<(), StorageError<Self::Timestamp>> {
1255 let source_collection = self.collection(ingestion_id)?;
1256 let data_source = &source_collection.data_source;
1257 match &data_source {
1258 DataSource::Ingestion(cur_ingestion) => {
1259 cur_ingestion
1260 .desc
1261 .alter_compatible(ingestion_id, source_desc)?;
1262 }
1263 o => {
1264 tracing::info!(
1265 "{ingestion_id} inalterable because its data source is {:?} and not an ingestion",
1266 o
1267 );
1268 Err(AlterError { id: ingestion_id })?
1269 }
1270 }
1271
1272 Ok(())
1273 }
1274
1275 async fn alter_ingestion_connections(
1276 &mut self,
1277 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
1278 ) -> Result<(), StorageError<Self::Timestamp>> {
1279 self.storage_collections
1281 .alter_ingestion_connections(source_connections.clone())
1282 .await?;
1283
1284 let mut ingestions_to_run = BTreeSet::new();
1285
1286 for (id, conn) in source_connections {
1287 let collection = self
1288 .collections
1289 .get_mut(&id)
1290 .ok_or_else(|| StorageError::IdentifierMissing(id))?;
1291
1292 match &mut collection.data_source {
1293 DataSource::Ingestion(ingestion) => {
1294 if ingestion.desc.connection != conn {
1297 tracing::info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
1298 ingestion.desc.connection = conn;
1299 ingestions_to_run.insert(id);
1300 } else {
1301 tracing::warn!(
1302 "update_source_connection called on {id} but the \
1303 connection was the same"
1304 );
1305 }
1306 }
1307 o => {
1308 tracing::warn!("update_source_connection called on {:?}", o);
1309 Err(StorageError::IdentifierInvalid(id))?;
1310 }
1311 }
1312 }
1313
1314 for id in ingestions_to_run {
1315 self.run_ingestion(id)?;
1316 }
1317 Ok(())
1318 }
1319
1320 async fn alter_ingestion_export_data_configs(
1321 &mut self,
1322 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
1323 ) -> Result<(), StorageError<Self::Timestamp>> {
1324 self.storage_collections
1326 .alter_ingestion_export_data_configs(source_exports.clone())
1327 .await?;
1328
1329 let mut ingestions_to_run = BTreeSet::new();
1330
1331 for (source_export_id, new_data_config) in source_exports {
1332 let source_export_collection = self
1335 .collections
1336 .get_mut(&source_export_id)
1337 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1338 let ingestion_id = match &mut source_export_collection.data_source {
1339 DataSource::IngestionExport {
1340 ingestion_id,
1341 details: _,
1342 data_config,
1343 } => {
1344 *data_config = new_data_config.clone();
1345 *ingestion_id
1346 }
1347 o => {
1348 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1349 Err(StorageError::IdentifierInvalid(source_export_id))?
1350 }
1351 };
1352 let ingestion_collection = self
1355 .collections
1356 .get_mut(&ingestion_id)
1357 .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
1358
1359 match &mut ingestion_collection.data_source {
1360 DataSource::Ingestion(ingestion_desc) => {
1361 let source_export = ingestion_desc
1362 .source_exports
1363 .get_mut(&source_export_id)
1364 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1365
1366 if source_export.data_config != new_data_config {
1369 tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
1370 source_export.data_config = new_data_config;
1371
1372 ingestions_to_run.insert(ingestion_id);
1373 } else {
1374 tracing::warn!(
1375 "alter_ingestion_export_data_configs called on \
1376 export {source_export_id} of {ingestion_id} but \
1377 the data config was the same"
1378 );
1379 }
1380 }
1381 o => {
1382 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1383 Err(StorageError::IdentifierInvalid(ingestion_id))?
1384 }
1385 }
1386 }
1387
1388 for id in ingestions_to_run {
1389 self.run_ingestion(id)?;
1390 }
1391 Ok(())
1392 }
1393
1394 async fn alter_table_desc(
1395 &mut self,
1396 existing_collection: GlobalId,
1397 new_collection: GlobalId,
1398 new_desc: RelationDesc,
1399 expected_version: RelationVersion,
1400 register_ts: Self::Timestamp,
1401 ) -> Result<(), StorageError<Self::Timestamp>> {
1402 let data_shard = {
1403 let Controller {
1404 collections,
1405 storage_collections,
1406 ..
1407 } = self;
1408
1409 let existing = collections
1410 .get(&existing_collection)
1411 .ok_or(StorageError::IdentifierMissing(existing_collection))?;
1412 if !matches!(existing.data_source, DataSource::Table { .. }) {
1413 return Err(StorageError::IdentifierInvalid(existing_collection));
1414 }
1415
1416 storage_collections
1418 .alter_table_desc(
1419 existing_collection,
1420 new_collection,
1421 new_desc.clone(),
1422 expected_version,
1423 )
1424 .await?;
1425
1426 existing.collection_metadata.data_shard.clone()
1427 };
1428
1429 let persist_client = self
1430 .persist
1431 .open(self.persist_location.clone())
1432 .await
1433 .expect("invalid persist location");
1434 let write_handle = self
1435 .open_data_handles(
1436 &existing_collection,
1437 data_shard,
1438 new_desc.clone(),
1439 &persist_client,
1440 )
1441 .await;
1442
1443 let collection_desc = CollectionDescription::<T>::for_table(new_desc.clone(), None);
1445 let collection_meta = CollectionMetadata {
1446 persist_location: self.persist_location.clone(),
1447 data_shard,
1448 relation_desc: new_desc.clone(),
1449 remap_shard: None,
1451 txns_shard: Some(self.txns_read.txns_id().clone()),
1452 };
1453 let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(new_collection, None);
1455 let collection_state = CollectionState::new(
1456 collection_desc.data_source.clone(),
1457 collection_meta,
1458 CollectionStateExtra::None,
1459 wallclock_lag_metrics,
1460 );
1461
1462 self.collections.insert(new_collection, collection_state);
1465 let existing = self
1466 .collections
1467 .get_mut(&existing_collection)
1468 .expect("missing existing collection");
1469 assert!(matches!(
1470 existing.data_source,
1471 DataSource::Table { primary: None }
1472 ));
1473 existing.data_source = DataSource::Table {
1474 primary: Some(new_collection),
1475 };
1476
1477 self.persist_table_worker
1478 .register(register_ts, vec![(new_collection, write_handle)])
1479 .await
1480 .expect("table worker unexpectedly shut down");
1481
1482 self.append_shard_mappings([new_collection].into_iter(), Diff::ONE);
1483
1484 Ok(())
1485 }
1486
1487 fn export(
1488 &self,
1489 id: GlobalId,
1490 ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1491 self.collections
1492 .get(&id)
1493 .and_then(|c| match &c.extra_state {
1494 CollectionStateExtra::Export(state) => Some(state),
1495 _ => None,
1496 })
1497 .ok_or(StorageError::IdentifierMissing(id))
1498 }
1499
1500 fn export_mut(
1501 &mut self,
1502 id: GlobalId,
1503 ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1504 self.collections
1505 .get_mut(&id)
1506 .and_then(|c| match &mut c.extra_state {
1507 CollectionStateExtra::Export(state) => Some(state),
1508 _ => None,
1509 })
1510 .ok_or(StorageError::IdentifierMissing(id))
1511 }
1512
1513 async fn create_oneshot_ingestion(
1515 &mut self,
1516 ingestion_id: uuid::Uuid,
1517 collection_id: GlobalId,
1518 instance_id: StorageInstanceId,
1519 request: OneshotIngestionRequest,
1520 result_tx: OneshotResultCallback<ProtoBatch>,
1521 ) -> Result<(), StorageError<Self::Timestamp>> {
1522 let collection_meta = self
1523 .collections
1524 .get(&collection_id)
1525 .ok_or_else(|| StorageError::IdentifierMissing(collection_id))?
1526 .collection_metadata
1527 .clone();
1528 let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1529 StorageError::Generic(anyhow::anyhow!("missing cluster {instance_id}"))
1531 })?;
1532 let oneshot_cmd = RunOneshotIngestion {
1533 ingestion_id,
1534 collection_id,
1535 collection_meta,
1536 request,
1537 };
1538
1539 if !self.read_only {
1540 instance.send(StorageCommand::RunOneshotIngestion(Box::new(oneshot_cmd)));
1541 let pending = PendingOneshotIngestion {
1542 result_tx,
1543 cluster_id: instance_id,
1544 };
1545 let novel = self
1546 .pending_oneshot_ingestions
1547 .insert(ingestion_id, pending);
1548 assert_none!(novel);
1549 Ok(())
1550 } else {
1551 Err(StorageError::ReadOnly)
1552 }
1553 }
1554
1555 fn cancel_oneshot_ingestion(
1556 &mut self,
1557 ingestion_id: uuid::Uuid,
1558 ) -> Result<(), StorageError<Self::Timestamp>> {
1559 if self.read_only {
1560 return Err(StorageError::ReadOnly);
1561 }
1562
1563 let pending = self
1564 .pending_oneshot_ingestions
1565 .remove(&ingestion_id)
1566 .ok_or_else(|| {
1567 StorageError::Generic(anyhow::anyhow!("missing oneshot ingestion {ingestion_id}"))
1569 })?;
1570
1571 match self.instances.get_mut(&pending.cluster_id) {
1572 Some(instance) => {
1573 instance.send(StorageCommand::CancelOneshotIngestion(ingestion_id));
1574 }
1575 None => {
1576 mz_ore::soft_panic_or_log!(
1577 "canceling oneshot ingestion on non-existent cluster, ingestion {:?}, instance {}",
1578 ingestion_id,
1579 pending.cluster_id,
1580 );
1581 }
1582 }
1583 pending.cancel();
1585
1586 Ok(())
1587 }
1588
1589 async fn alter_export(
1590 &mut self,
1591 id: GlobalId,
1592 new_description: ExportDescription<Self::Timestamp>,
1593 ) -> Result<(), StorageError<Self::Timestamp>> {
1594 let from_id = new_description.sink.from;
1595
1596 let desired_read_holds = vec![from_id.clone(), id.clone()];
1599 let [input_hold, self_hold] = self
1600 .storage_collections
1601 .acquire_read_holds(desired_read_holds)
1602 .expect("missing dependency")
1603 .try_into()
1604 .expect("expected number of holds");
1605 let from_storage_metadata = self.storage_collections.collection_metadata(from_id)?;
1606 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1607
1608 let cur_export = self.export_mut(id)?;
1610 let input_readable = cur_export
1611 .write_frontier
1612 .iter()
1613 .all(|t| input_hold.since().less_than(t));
1614 if !input_readable {
1615 return Err(StorageError::ReadBeforeSince(from_id));
1616 }
1617
1618 let new_export = ExportState {
1619 read_capabilities: cur_export.read_capabilities.clone(),
1620 cluster_id: new_description.instance_id,
1621 derived_since: cur_export.derived_since.clone(),
1622 read_holds: [input_hold, self_hold],
1623 read_policy: cur_export.read_policy.clone(),
1624 write_frontier: cur_export.write_frontier.clone(),
1625 };
1626 *cur_export = new_export;
1627
1628 let cmd = RunSinkCommand {
1629 id,
1630 description: StorageSinkDesc {
1631 from: from_id,
1632 from_desc: new_description.sink.from_desc,
1633 connection: new_description.sink.connection,
1634 envelope: new_description.sink.envelope,
1635 as_of: new_description.sink.as_of,
1636 version: new_description.sink.version,
1637 from_storage_metadata,
1638 with_snapshot: new_description.sink.with_snapshot,
1639 to_storage_metadata,
1640 },
1641 };
1642
1643 let instance = self
1645 .instances
1646 .get_mut(&new_description.instance_id)
1647 .ok_or_else(|| StorageError::ExportInstanceMissing {
1648 storage_instance_id: new_description.instance_id,
1649 export_id: id,
1650 })?;
1651
1652 instance.send(StorageCommand::RunSink(Box::new(cmd)));
1653 Ok(())
1654 }
1655
1656 async fn alter_export_connections(
1658 &mut self,
1659 exports: BTreeMap<GlobalId, StorageSinkConnection>,
1660 ) -> Result<(), StorageError<Self::Timestamp>> {
1661 let mut updates_by_instance =
1662 BTreeMap::<StorageInstanceId, Vec<(RunSinkCommand<T>, ExportDescription<T>)>>::new();
1663
1664 for (id, connection) in exports {
1665 let (mut new_export_description, as_of): (ExportDescription<Self::Timestamp>, _) = {
1673 let export = &self.collections[&id];
1674 let DataSource::Sink { desc } = &export.data_source else {
1675 panic!("export exists")
1676 };
1677 let CollectionStateExtra::Export(state) = &export.extra_state else {
1678 panic!("export exists")
1679 };
1680 let export_description = desc.clone();
1681 let as_of = state.input_hold().since().clone();
1682
1683 (export_description, as_of)
1684 };
1685 let current_sink = new_export_description.sink.clone();
1686
1687 new_export_description.sink.connection = connection;
1688
1689 current_sink.alter_compatible(id, &new_export_description.sink)?;
1691
1692 let from_storage_metadata = self
1693 .storage_collections
1694 .collection_metadata(new_export_description.sink.from)?;
1695 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1696
1697 let cmd = RunSinkCommand {
1698 id,
1699 description: StorageSinkDesc {
1700 from: new_export_description.sink.from,
1701 from_desc: new_export_description.sink.from_desc.clone(),
1702 connection: new_export_description.sink.connection.clone(),
1703 envelope: new_export_description.sink.envelope,
1704 with_snapshot: new_export_description.sink.with_snapshot,
1705 version: new_export_description.sink.version,
1706 as_of: as_of.to_owned(),
1717 from_storage_metadata,
1718 to_storage_metadata,
1719 },
1720 };
1721
1722 let update = updates_by_instance
1723 .entry(new_export_description.instance_id)
1724 .or_default();
1725 update.push((cmd, new_export_description));
1726 }
1727
1728 for (instance_id, updates) in updates_by_instance {
1729 let mut export_updates = BTreeMap::new();
1730 let mut cmds = Vec::with_capacity(updates.len());
1731
1732 for (cmd, export_state) in updates {
1733 export_updates.insert(cmd.id, export_state);
1734 cmds.push(cmd);
1735 }
1736
1737 let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1739 StorageError::ExportInstanceMissing {
1740 storage_instance_id: instance_id,
1741 export_id: *export_updates
1742 .keys()
1743 .next()
1744 .expect("set of exports not empty"),
1745 }
1746 })?;
1747
1748 for cmd in cmds {
1749 instance.send(StorageCommand::RunSink(Box::new(cmd)));
1750 }
1751
1752 for (id, new_export_description) in export_updates {
1754 let Some(state) = self.collections.get_mut(&id) else {
1755 panic!("export known to exist")
1756 };
1757 let DataSource::Sink { desc } = &mut state.data_source else {
1758 panic!("export known to exist")
1759 };
1760 *desc = new_export_description;
1761 }
1762 }
1763
1764 Ok(())
1765 }
1766
1767 fn drop_tables(
1782 &mut self,
1783 storage_metadata: &StorageMetadata,
1784 identifiers: Vec<GlobalId>,
1785 ts: Self::Timestamp,
1786 ) -> Result<(), StorageError<Self::Timestamp>> {
1787 let (table_write_ids, data_source_ids): (Vec<_>, Vec<_>) = identifiers
1789 .into_iter()
1790 .partition(|id| match self.collections[id].data_source {
1791 DataSource::Table { .. } => true,
1792 DataSource::IngestionExport { .. } | DataSource::Webhook => false,
1793 _ => panic!("identifier is not a table: {}", id),
1794 });
1795
1796 if table_write_ids.len() > 0 {
1798 let drop_notif = self
1799 .persist_table_worker
1800 .drop_handles(table_write_ids.clone(), ts);
1801 let tx = self.pending_table_handle_drops_tx.clone();
1802 mz_ore::task::spawn(|| "table-cleanup".to_string(), async move {
1803 drop_notif.await;
1804 for identifier in table_write_ids {
1805 let _ = tx.send(identifier);
1806 }
1807 });
1808 }
1809
1810 if data_source_ids.len() > 0 {
1812 self.validate_collection_ids(data_source_ids.iter().cloned())?;
1813 self.drop_sources_unvalidated(storage_metadata, data_source_ids)?;
1814 }
1815
1816 Ok(())
1817 }
1818
1819 fn drop_sources(
1820 &mut self,
1821 storage_metadata: &StorageMetadata,
1822 identifiers: Vec<GlobalId>,
1823 ) -> Result<(), StorageError<Self::Timestamp>> {
1824 self.validate_collection_ids(identifiers.iter().cloned())?;
1825 self.drop_sources_unvalidated(storage_metadata, identifiers)
1826 }
1827
1828 fn drop_sources_unvalidated(
1829 &mut self,
1830 storage_metadata: &StorageMetadata,
1831 ids: Vec<GlobalId>,
1832 ) -> Result<(), StorageError<Self::Timestamp>> {
1833 let mut ingestions_to_execute = BTreeSet::new();
1836 let mut ingestions_to_drop = BTreeSet::new();
1837 let mut source_statistics_to_drop = Vec::new();
1838
1839 let mut collections_to_drop = Vec::new();
1843
1844 for id in ids.iter() {
1845 let metadata = storage_metadata.get_collection_shard::<T>(*id);
1846 mz_ore::soft_assert_or_log!(
1847 matches!(metadata, Err(StorageError::IdentifierMissing(_))),
1848 "dropping {id}, but drop was not synchronized with storage \
1849 controller via `synchronize_collections`"
1850 );
1851
1852 let collection_state = self.collections.get(id);
1853
1854 if let Some(collection_state) = collection_state {
1855 match collection_state.data_source {
1856 DataSource::Webhook => {
1857 let fut = self.collection_manager.unregister_collection(*id);
1860 mz_ore::task::spawn(|| format!("storage-webhook-cleanup-{id}"), fut);
1861
1862 collections_to_drop.push(*id);
1863 source_statistics_to_drop.push(*id);
1864 }
1865 DataSource::Ingestion(_) => {
1866 ingestions_to_drop.insert(*id);
1867 source_statistics_to_drop.push(*id);
1868 }
1869 DataSource::IngestionExport { ingestion_id, .. } => {
1870 ingestions_to_execute.insert(ingestion_id);
1877
1878 let ingestion_state = match self.collections.get_mut(&ingestion_id) {
1880 Some(ingestion_collection) => ingestion_collection,
1881 None => {
1883 tracing::error!(
1884 "primary source {ingestion_id} seemingly dropped before subsource {id}"
1885 );
1886 continue;
1887 }
1888 };
1889
1890 match &mut ingestion_state.data_source {
1891 DataSource::Ingestion(ingestion_desc) => {
1892 let removed = ingestion_desc.source_exports.remove(id);
1893 mz_ore::soft_assert_or_log!(
1894 removed.is_some(),
1895 "dropped subsource {id} already removed from source exports"
1896 );
1897 }
1898 _ => unreachable!(
1899 "SourceExport must only refer to primary sources that already exist"
1900 ),
1901 };
1902
1903 ingestions_to_drop.insert(*id);
1907 source_statistics_to_drop.push(*id);
1908 }
1909 DataSource::Progress | DataSource::Table { .. } | DataSource::Other => {
1910 collections_to_drop.push(*id);
1911 }
1912 DataSource::Introspection(_) | DataSource::Sink { .. } => {
1913 soft_panic_or_log!(
1916 "drop_sources called on a {:?} (id={id}))",
1917 collection_state.data_source,
1918 );
1919 }
1920 }
1921 }
1922 }
1923
1924 ingestions_to_execute.retain(|id| !ingestions_to_drop.contains(id));
1926 for ingestion_id in ingestions_to_execute {
1927 self.run_ingestion(ingestion_id)?;
1928 }
1929
1930 let ingestion_policies = ingestions_to_drop
1937 .iter()
1938 .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
1939 .collect();
1940
1941 tracing::debug!(
1942 ?ingestion_policies,
1943 "dropping sources by setting read hold policies"
1944 );
1945 self.set_hold_policies(ingestion_policies);
1946
1947 let shards_to_update: BTreeSet<_> = ingestions_to_drop
1949 .iter()
1950 .chain(collections_to_drop.iter())
1951 .cloned()
1952 .collect();
1953 self.append_shard_mappings(shards_to_update.into_iter(), Diff::MINUS_ONE);
1954
1955 let status_now = mz_ore::now::to_datetime((self.now)());
1956 let mut status_updates = vec![];
1957 for id in ingestions_to_drop.iter() {
1958 status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
1959 }
1960
1961 if !self.read_only {
1962 self.append_status_introspection_updates(
1963 IntrospectionType::SourceStatusHistory,
1964 status_updates,
1965 );
1966 }
1967
1968 {
1969 let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1970 for id in source_statistics_to_drop {
1971 source_statistics
1972 .source_statistics
1973 .retain(|(stats_id, _), _| stats_id != &id);
1974 source_statistics
1975 .webhook_statistics
1976 .retain(|stats_id, _| stats_id != &id);
1977 }
1978 }
1979
1980 for id in ingestions_to_drop.iter().chain(collections_to_drop.iter()) {
1982 tracing::info!(%id, "dropping collection state");
1983 let collection = self
1984 .collections
1985 .remove(id)
1986 .expect("list populated after checking that self.collections contains it");
1987
1988 let instance = match &collection.extra_state {
1989 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
1990 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
1991 CollectionStateExtra::None => None,
1992 }
1993 .and_then(|i| self.instances.get(&i));
1994
1995 if let Some(instance) = instance {
1999 let active_replicas = instance.get_active_replicas_for_object(id);
2000 if !active_replicas.is_empty() {
2001 match &collection.data_source {
2008 DataSource::Ingestion(ingestion_desc) => {
2009 self.dropped_objects.insert(
2010 ingestion_desc.remap_collection_id,
2011 active_replicas.clone(),
2012 );
2013 }
2014 _ => {}
2015 }
2016
2017 self.dropped_objects.insert(*id, active_replicas);
2018 }
2019 }
2020 }
2021
2022 self.storage_collections
2024 .drop_collections_unvalidated(storage_metadata, ids);
2025
2026 Ok(())
2027 }
2028
2029 fn drop_sinks(
2031 &mut self,
2032 storage_metadata: &StorageMetadata,
2033 identifiers: Vec<GlobalId>,
2034 ) -> Result<(), StorageError<Self::Timestamp>> {
2035 self.validate_export_ids(identifiers.iter().cloned())?;
2036 self.drop_sinks_unvalidated(storage_metadata, identifiers);
2037 Ok(())
2038 }
2039
2040 fn drop_sinks_unvalidated(
2041 &mut self,
2042 storage_metadata: &StorageMetadata,
2043 mut sinks_to_drop: Vec<GlobalId>,
2044 ) {
2045 sinks_to_drop.retain(|id| self.export(*id).is_ok());
2047
2048 let drop_policy = sinks_to_drop
2055 .iter()
2056 .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
2057 .collect();
2058
2059 tracing::debug!(
2060 ?drop_policy,
2061 "dropping sources by setting read hold policies"
2062 );
2063 self.set_hold_policies(drop_policy);
2064
2065 let status_now = mz_ore::now::to_datetime((self.now)());
2072
2073 let mut status_updates = vec![];
2075 {
2076 let mut sink_statistics = self.sink_statistics.lock().expect("poisoned");
2077 for id in sinks_to_drop.iter() {
2078 status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
2079 sink_statistics.retain(|(stats_id, _), _| stats_id != id);
2080 }
2081 }
2082
2083 if !self.read_only {
2084 self.append_status_introspection_updates(
2085 IntrospectionType::SinkStatusHistory,
2086 status_updates,
2087 );
2088 }
2089
2090 for id in sinks_to_drop.iter() {
2092 tracing::info!(%id, "dropping export state");
2093 let collection = self
2094 .collections
2095 .remove(id)
2096 .expect("list populated after checking that self.collections contains it");
2097
2098 let instance = match &collection.extra_state {
2099 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
2100 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
2101 CollectionStateExtra::None => None,
2102 }
2103 .and_then(|i| self.instances.get(&i));
2104
2105 if let Some(instance) = instance {
2109 let active_replicas = instance.get_active_replicas_for_object(id);
2110 if !active_replicas.is_empty() {
2111 self.dropped_objects.insert(*id, active_replicas);
2112 }
2113 }
2114 }
2115
2116 self.storage_collections
2118 .drop_collections_unvalidated(storage_metadata, sinks_to_drop);
2119 }
2120
2121 #[instrument(level = "debug")]
2122 fn append_table(
2123 &mut self,
2124 write_ts: Self::Timestamp,
2125 advance_to: Self::Timestamp,
2126 commands: Vec<(GlobalId, Vec<TableData>)>,
2127 ) -> Result<
2128 tokio::sync::oneshot::Receiver<Result<(), StorageError<Self::Timestamp>>>,
2129 StorageError<Self::Timestamp>,
2130 > {
2131 if self.read_only {
2132 if !commands
2135 .iter()
2136 .all(|(id, _)| id.is_system() && self.migrated_storage_collections.contains(id))
2137 {
2138 return Err(StorageError::ReadOnly);
2139 }
2140 }
2141
2142 for (id, updates) in commands.iter() {
2144 if !updates.is_empty() {
2145 if !write_ts.less_than(&advance_to) {
2146 return Err(StorageError::UpdateBeyondUpper(*id));
2147 }
2148 }
2149 }
2150
2151 Ok(self
2152 .persist_table_worker
2153 .append(write_ts, advance_to, commands))
2154 }
2155
2156 fn monotonic_appender(
2157 &self,
2158 id: GlobalId,
2159 ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>> {
2160 self.collection_manager.monotonic_appender(id)
2161 }
2162
2163 fn webhook_statistics(
2164 &self,
2165 id: GlobalId,
2166 ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>> {
2167 let source_statistics = self.source_statistics.lock().expect("poisoned");
2169 source_statistics
2170 .webhook_statistics
2171 .get(&id)
2172 .cloned()
2173 .ok_or(StorageError::IdentifierMissing(id))
2174 }
2175
2176 async fn ready(&mut self) {
2177 if self.maintenance_scheduled {
2178 return;
2179 }
2180
2181 if !self.pending_table_handle_drops_rx.is_empty() {
2182 return;
2183 }
2184
2185 tokio::select! {
2186 Some(m) = self.instance_response_rx.recv() => {
2187 self.stashed_responses.push(m);
2188 while let Ok(m) = self.instance_response_rx.try_recv() {
2189 self.stashed_responses.push(m);
2190 }
2191 }
2192 _ = self.maintenance_ticker.tick() => {
2193 self.maintenance_scheduled = true;
2194 },
2195 };
2196 }
2197
2198 #[instrument(level = "debug")]
2199 fn process(
2200 &mut self,
2201 storage_metadata: &StorageMetadata,
2202 ) -> Result<Option<Response<T>>, anyhow::Error> {
2203 if self.maintenance_scheduled {
2205 self.maintain();
2206 self.maintenance_scheduled = false;
2207 }
2208
2209 for instance in self.instances.values_mut() {
2210 instance.rehydrate_failed_replicas();
2211 }
2212
2213 let mut status_updates = vec![];
2214 let mut updated_frontiers = BTreeMap::new();
2215
2216 let stashed_responses = std::mem::take(&mut self.stashed_responses);
2218 for resp in stashed_responses {
2219 match resp {
2220 (_replica_id, StorageResponse::FrontierUpper(id, upper)) => {
2221 self.update_write_frontier(id, &upper);
2222 updated_frontiers.insert(id, upper);
2223 }
2224 (replica_id, StorageResponse::DroppedId(id)) => {
2225 let replica_id = replica_id.expect("DroppedId from unknown replica");
2226 if let Some(remaining_replicas) = self.dropped_objects.get_mut(&id) {
2227 remaining_replicas.remove(&replica_id);
2228 if remaining_replicas.is_empty() {
2229 self.dropped_objects.remove(&id);
2230 }
2231 } else {
2232 soft_panic_or_log!("unexpected DroppedId for {id}");
2233 }
2234 }
2235 (replica_id, StorageResponse::StatisticsUpdates(source_stats, sink_stats)) => {
2236 {
2238 let replica_id = if let Some(replica_id) = replica_id {
2245 replica_id
2246 } else {
2247 tracing::error!(
2248 ?source_stats,
2249 "missing replica_id for source statistics update"
2250 );
2251 continue;
2252 };
2253
2254 let mut shared_stats = self.source_statistics.lock().expect("poisoned");
2255
2256 for stat in source_stats {
2257 let collection_id = stat.id.clone();
2258
2259 if self.collection(collection_id).is_err() {
2260 continue;
2263 }
2264
2265 let entry = shared_stats
2266 .source_statistics
2267 .entry((stat.id, Some(replica_id)));
2268
2269 match entry {
2270 btree_map::Entry::Vacant(vacant_entry) => {
2271 let mut stats = ControllerSourceStatistics::new(
2272 collection_id,
2273 Some(replica_id),
2274 );
2275 stats.incorporate(stat);
2276 vacant_entry.insert(stats);
2277 }
2278 btree_map::Entry::Occupied(mut occupied_entry) => {
2279 occupied_entry.get_mut().incorporate(stat);
2280 }
2281 }
2282 }
2283 }
2284
2285 {
2286 let replica_id = if let Some(replica_id) = replica_id {
2297 replica_id
2298 } else {
2299 tracing::error!(
2300 ?sink_stats,
2301 "missing replica_id for sink statistics update"
2302 );
2303 continue;
2304 };
2305
2306 let mut shared_stats = self.sink_statistics.lock().expect("poisoned");
2307
2308 for stat in sink_stats {
2309 let collection_id = stat.id.clone();
2310
2311 if self.collection(collection_id).is_err() {
2312 continue;
2315 }
2316
2317 let entry = shared_stats.entry((stat.id, Some(replica_id)));
2318
2319 match entry {
2320 btree_map::Entry::Vacant(vacant_entry) => {
2321 let mut stats =
2322 ControllerSinkStatistics::new(collection_id, replica_id);
2323 stats.incorporate(stat);
2324 vacant_entry.insert(stats);
2325 }
2326 btree_map::Entry::Occupied(mut occupied_entry) => {
2327 occupied_entry.get_mut().incorporate(stat);
2328 }
2329 }
2330 }
2331 }
2332 }
2333 (replica_id, StorageResponse::StatusUpdate(mut status_update)) => {
2334 match status_update.status {
2350 Status::Running => {
2351 let collection = self.collections.get_mut(&status_update.id);
2352 match collection {
2353 Some(collection) => {
2354 match collection.extra_state {
2355 CollectionStateExtra::Ingestion(
2356 ref mut ingestion_state,
2357 ) => {
2358 if ingestion_state.hydrated_on.is_empty() {
2359 tracing::debug!(ingestion_id = %status_update.id, "ingestion is hydrated");
2360 }
2361 ingestion_state.hydrated_on.insert(replica_id.expect(
2362 "replica id should be present for status running",
2363 ));
2364 }
2365 CollectionStateExtra::Export(_) => {
2366 }
2368 CollectionStateExtra::None => {
2369 }
2371 }
2372 }
2373 None => (), }
2376 }
2377 Status::Paused => {
2378 let collection = self.collections.get_mut(&status_update.id);
2379 match collection {
2380 Some(collection) => {
2381 match collection.extra_state {
2382 CollectionStateExtra::Ingestion(
2383 ref mut ingestion_state,
2384 ) => {
2385 tracing::debug!(ingestion_id = %status_update.id, "ingestion is now paused");
2392 ingestion_state.hydrated_on.clear();
2393 }
2394 CollectionStateExtra::Export(_) => {
2395 }
2397 CollectionStateExtra::None => {
2398 }
2400 }
2401 }
2402 None => (), }
2405 }
2406 _ => (),
2407 }
2408
2409 if let Some(id) = replica_id {
2411 status_update.replica_id = Some(id);
2412 }
2413 status_updates.push(status_update);
2414 }
2415 (_replica_id, StorageResponse::StagedBatches(batches)) => {
2416 for (ingestion_id, batches) in batches {
2417 match self.pending_oneshot_ingestions.remove(&ingestion_id) {
2418 Some(pending) => {
2419 if let Some(instance) = self.instances.get_mut(&pending.cluster_id)
2422 {
2423 instance
2424 .send(StorageCommand::CancelOneshotIngestion(ingestion_id));
2425 }
2426 (pending.result_tx)(batches)
2428 }
2429 None => {
2430 }
2433 }
2434 }
2435 }
2436 }
2437 }
2438
2439 self.record_status_updates(status_updates);
2440
2441 let mut dropped_table_ids = Vec::new();
2443 while let Ok(dropped_id) = self.pending_table_handle_drops_rx.try_recv() {
2444 dropped_table_ids.push(dropped_id);
2445 }
2446 if !dropped_table_ids.is_empty() {
2447 self.drop_sources(storage_metadata, dropped_table_ids)?;
2448 }
2449
2450 if updated_frontiers.is_empty() {
2451 Ok(None)
2452 } else {
2453 Ok(Some(Response::FrontierUpdates(
2454 updated_frontiers.into_iter().collect(),
2455 )))
2456 }
2457 }
2458
2459 async fn inspect_persist_state(
2460 &self,
2461 id: GlobalId,
2462 ) -> Result<serde_json::Value, anyhow::Error> {
2463 let collection = &self.storage_collections.collection_metadata(id)?;
2464 let client = self
2465 .persist
2466 .open(collection.persist_location.clone())
2467 .await?;
2468 let shard_state = client
2469 .inspect_shard::<Self::Timestamp>(&collection.data_shard)
2470 .await?;
2471 let json_state = serde_json::to_value(shard_state)?;
2472 Ok(json_state)
2473 }
2474
2475 fn append_introspection_updates(
2476 &mut self,
2477 type_: IntrospectionType,
2478 updates: Vec<(Row, Diff)>,
2479 ) {
2480 let id = self.introspection_ids[&type_];
2481 let updates = updates.into_iter().map(|update| update.into()).collect();
2482 self.collection_manager.blind_write(id, updates);
2483 }
2484
2485 fn append_status_introspection_updates(
2486 &mut self,
2487 type_: IntrospectionType,
2488 updates: Vec<StatusUpdate>,
2489 ) {
2490 let id = self.introspection_ids[&type_];
2491 let updates: Vec<_> = updates.into_iter().map(|update| update.into()).collect();
2492 if !updates.is_empty() {
2493 self.collection_manager.blind_write(id, updates);
2494 }
2495 }
2496
2497 fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp) {
2498 let id = self.introspection_ids[&type_];
2499 self.collection_manager.differential_write(id, op);
2500 }
2501
2502 fn append_only_introspection_tx(
2503 &self,
2504 type_: IntrospectionType,
2505 ) -> mpsc::UnboundedSender<(
2506 Vec<AppendOnlyUpdate>,
2507 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2508 )> {
2509 let id = self.introspection_ids[&type_];
2510 self.collection_manager.append_only_write_sender(id)
2511 }
2512
2513 fn differential_introspection_tx(
2514 &self,
2515 type_: IntrospectionType,
2516 ) -> mpsc::UnboundedSender<(
2517 StorageWriteOp,
2518 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2519 )> {
2520 let id = self.introspection_ids[&type_];
2521 self.collection_manager.differential_write_sender(id)
2522 }
2523
2524 async fn real_time_recent_timestamp(
2525 &self,
2526 timestamp_objects: BTreeSet<GlobalId>,
2527 timeout: Duration,
2528 ) -> Result<
2529 BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>,
2530 StorageError<Self::Timestamp>,
2531 > {
2532 use mz_storage_types::sources::GenericSourceConnection;
2533
2534 let mut rtr_futures = BTreeMap::new();
2535
2536 for id in timestamp_objects.into_iter().filter(GlobalId::is_user) {
2538 let collection = match self.collection(id) {
2539 Ok(c) => c,
2540 Err(_) => continue,
2542 };
2543
2544 let (source_conn, remap_id) = match &collection.data_source {
2545 DataSource::Ingestion(IngestionDescription {
2546 desc: SourceDesc { connection, .. },
2547 remap_collection_id,
2548 ..
2549 }) => match connection {
2550 GenericSourceConnection::Kafka(_)
2551 | GenericSourceConnection::Postgres(_)
2552 | GenericSourceConnection::MySql(_)
2553 | GenericSourceConnection::SqlServer(_) => {
2554 (connection.clone(), *remap_collection_id)
2555 }
2556
2557 GenericSourceConnection::LoadGenerator(_) => continue,
2562 },
2563 _ => {
2565 continue;
2566 }
2567 };
2568
2569 let config = self.config().clone();
2571
2572 let read_handle = self.read_handle_for_snapshot(remap_id).await?;
2580
2581 let remap_read_hold = self
2584 .storage_collections
2585 .acquire_read_holds(vec![remap_id])
2586 .map_err(|_e| StorageError::ReadBeforeSince(remap_id))?
2587 .expect_element(|| "known to be exactly one");
2588
2589 let remap_as_of = remap_read_hold
2590 .since()
2591 .to_owned()
2592 .into_option()
2593 .ok_or(StorageError::ReadBeforeSince(remap_id))?;
2594
2595 rtr_futures.insert(
2596 id,
2597 tokio::time::timeout(timeout, async move {
2598 use mz_storage_types::sources::SourceConnection as _;
2599
2600 let as_of = Antichain::from_elem(remap_as_of);
2603 let remap_subscribe = read_handle
2604 .subscribe(as_of.clone())
2605 .await
2606 .map_err(|_| StorageError::ReadBeforeSince(remap_id))?;
2607
2608 tracing::debug!(?id, type_ = source_conn.name(), upstream = ?source_conn.external_reference(), "fetching real time recency");
2609
2610 let result = rtr::real_time_recency_ts(source_conn, id, config, as_of, remap_subscribe)
2611 .await.map_err(|e| {
2612 tracing::debug!(?id, "real time recency error: {:?}", e);
2613 e
2614 });
2615
2616 drop(remap_read_hold);
2618
2619 result
2620 }),
2621 );
2622 }
2623
2624 Ok(Box::pin(async move {
2625 let (ids, futs): (Vec<_>, Vec<_>) = rtr_futures.into_iter().unzip();
2626 ids.into_iter()
2627 .zip_eq(futures::future::join_all(futs).await)
2628 .try_fold(T::minimum(), |curr, (id, per_source_res)| {
2629 let new =
2630 per_source_res.map_err(|_e: Elapsed| StorageError::RtrTimeout(id))??;
2631 Ok::<_, StorageError<Self::Timestamp>>(std::cmp::max(curr, new))
2632 })
2633 }))
2634 }
2635}
2636
2637pub fn prepare_initialization<T>(txn: &mut dyn StorageTxn<T>) -> Result<(), StorageError<T>> {
2644 if txn.get_txn_wal_shard().is_none() {
2645 let txns_id = ShardId::new();
2646 txn.write_txn_wal_shard(txns_id)?;
2647 }
2648
2649 Ok(())
2650}
2651
2652impl<T> Controller<T>
2653where
2654 T: Timestamp
2655 + Lattice
2656 + TotalOrder
2657 + Codec64
2658 + From<EpochMillis>
2659 + TimestampManipulation
2660 + Into<Datum<'static>>,
2661 StorageCommand<T>: RustType<ProtoStorageCommand>,
2662 StorageResponse<T>: RustType<ProtoStorageResponse>,
2663 Self: StorageController<Timestamp = T>,
2664{
2665 pub async fn new(
2673 build_info: &'static BuildInfo,
2674 persist_location: PersistLocation,
2675 persist_clients: Arc<PersistClientCache>,
2676 now: NowFn,
2677 wallclock_lag: WallclockLagFn<T>,
2678 txns_metrics: Arc<TxnMetrics>,
2679 read_only: bool,
2680 metrics_registry: &MetricsRegistry,
2681 controller_metrics: ControllerMetrics,
2682 connection_context: ConnectionContext,
2683 txn: &dyn StorageTxn<T>,
2684 storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
2685 ) -> Self {
2686 let txns_client = persist_clients
2687 .open(persist_location.clone())
2688 .await
2689 .expect("location should be valid");
2690
2691 let persist_warm_task = warm_persist_state_in_background(
2692 txns_client.clone(),
2693 txn.get_collection_metadata().into_values(),
2694 );
2695 let persist_warm_task = Some(persist_warm_task.abort_on_drop());
2696
2697 let txns_id = txn
2701 .get_txn_wal_shard()
2702 .expect("must call prepare initialization before creating storage controller");
2703
2704 let persist_table_worker = if read_only {
2705 let txns_write = txns_client
2706 .open_writer(
2707 txns_id,
2708 Arc::new(TxnsCodecRow::desc()),
2709 Arc::new(UnitSchema),
2710 Diagnostics {
2711 shard_name: "txns".to_owned(),
2712 handle_purpose: "follow txns upper".to_owned(),
2713 },
2714 )
2715 .await
2716 .expect("txns schema shouldn't change");
2717 persist_handles::PersistTableWriteWorker::new_read_only_mode(txns_write)
2718 } else {
2719 let txns = TxnsHandle::open(
2720 T::minimum(),
2721 txns_client.clone(),
2722 txns_client.dyncfgs().clone(),
2723 Arc::clone(&txns_metrics),
2724 txns_id,
2725 )
2726 .await;
2727 persist_handles::PersistTableWriteWorker::new_txns(txns)
2728 };
2729 let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
2730
2731 let collection_manager = collection_mgmt::CollectionManager::new(read_only, now.clone());
2732
2733 let introspection_ids = BTreeMap::new();
2734 let introspection_tokens = Arc::new(Mutex::new(BTreeMap::new()));
2735
2736 let (statistics_interval_sender, _) =
2737 channel(mz_storage_types::parameters::STATISTICS_INTERVAL_DEFAULT);
2738
2739 let (pending_table_handle_drops_tx, pending_table_handle_drops_rx) =
2740 tokio::sync::mpsc::unbounded_channel();
2741
2742 let mut maintenance_ticker = tokio::time::interval(Duration::from_secs(1));
2743 maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2744
2745 let (instance_response_tx, instance_response_rx) = mpsc::unbounded_channel();
2746
2747 let metrics = StorageControllerMetrics::new(metrics_registry, controller_metrics);
2748
2749 let now_dt = mz_ore::now::to_datetime(now());
2750
2751 Self {
2752 build_info,
2753 collections: BTreeMap::default(),
2754 dropped_objects: Default::default(),
2755 persist_table_worker,
2756 txns_read,
2757 txns_metrics,
2758 stashed_responses: vec![],
2759 pending_table_handle_drops_tx,
2760 pending_table_handle_drops_rx,
2761 pending_oneshot_ingestions: BTreeMap::default(),
2762 collection_manager,
2763 introspection_ids,
2764 introspection_tokens,
2765 now,
2766 read_only,
2767 source_statistics: Arc::new(Mutex::new(statistics::SourceStatistics {
2768 source_statistics: BTreeMap::new(),
2769 webhook_statistics: BTreeMap::new(),
2770 })),
2771 sink_statistics: Arc::new(Mutex::new(BTreeMap::new())),
2772 statistics_interval_sender,
2773 instances: BTreeMap::new(),
2774 initialized: false,
2775 config: StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs()),
2776 persist_location,
2777 persist: persist_clients,
2778 metrics,
2779 recorded_frontiers: BTreeMap::new(),
2780 recorded_replica_frontiers: BTreeMap::new(),
2781 wallclock_lag,
2782 wallclock_lag_last_recorded: now_dt,
2783 storage_collections,
2784 migrated_storage_collections: BTreeSet::new(),
2785 maintenance_ticker,
2786 maintenance_scheduled: false,
2787 instance_response_rx,
2788 instance_response_tx,
2789 persist_warm_task,
2790 }
2791 }
2792
2793 #[instrument(level = "debug")]
2801 fn set_hold_policies(&mut self, policies: Vec<(GlobalId, ReadPolicy<T>)>) {
2802 let mut read_capability_changes = BTreeMap::default();
2803
2804 for (id, policy) in policies.into_iter() {
2805 if let Some(collection) = self.collections.get_mut(&id) {
2806 let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state
2807 {
2808 CollectionStateExtra::Ingestion(ingestion) => (
2809 ingestion.write_frontier.borrow(),
2810 &mut ingestion.derived_since,
2811 &mut ingestion.hold_policy,
2812 ),
2813 CollectionStateExtra::None => {
2814 unreachable!("set_hold_policies is only called for ingestions");
2815 }
2816 CollectionStateExtra::Export(export) => (
2817 export.write_frontier.borrow(),
2818 &mut export.derived_since,
2819 &mut export.read_policy,
2820 ),
2821 };
2822
2823 let new_derived_since = policy.frontier(write_frontier);
2824 let mut update = swap_updates(derived_since, new_derived_since);
2825 if !update.is_empty() {
2826 read_capability_changes.insert(id, update);
2827 }
2828
2829 *hold_policy = policy;
2830 }
2831 }
2832
2833 if !read_capability_changes.is_empty() {
2834 self.update_hold_capabilities(&mut read_capability_changes);
2835 }
2836 }
2837
2838 #[instrument(level = "debug", fields(updates))]
2839 fn update_write_frontier(&mut self, id: GlobalId, new_upper: &Antichain<T>) {
2840 let mut read_capability_changes = BTreeMap::default();
2841
2842 if let Some(collection) = self.collections.get_mut(&id) {
2843 let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state {
2844 CollectionStateExtra::Ingestion(ingestion) => (
2845 &mut ingestion.write_frontier,
2846 &mut ingestion.derived_since,
2847 &ingestion.hold_policy,
2848 ),
2849 CollectionStateExtra::None => {
2850 if matches!(collection.data_source, DataSource::Progress) {
2851 } else {
2853 tracing::error!(
2854 ?collection,
2855 ?new_upper,
2856 "updated write frontier for collection which is not an ingestion"
2857 );
2858 }
2859 return;
2860 }
2861 CollectionStateExtra::Export(export) => (
2862 &mut export.write_frontier,
2863 &mut export.derived_since,
2864 &export.read_policy,
2865 ),
2866 };
2867
2868 if PartialOrder::less_than(write_frontier, new_upper) {
2869 write_frontier.clone_from(new_upper);
2870 }
2871
2872 let new_derived_since = hold_policy.frontier(write_frontier.borrow());
2873 let mut update = swap_updates(derived_since, new_derived_since);
2874 if !update.is_empty() {
2875 read_capability_changes.insert(id, update);
2876 }
2877 } else if self.dropped_objects.contains_key(&id) {
2878 } else {
2881 soft_panic_or_log!("spurious upper update for {id}: {new_upper:?}");
2882 }
2883
2884 if !read_capability_changes.is_empty() {
2885 self.update_hold_capabilities(&mut read_capability_changes);
2886 }
2887 }
2888
2889 #[instrument(level = "debug", fields(updates))]
2893 fn update_hold_capabilities(&mut self, updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>) {
2894 let mut collections_net = BTreeMap::new();
2896
2897 while let Some(key) = updates.keys().rev().next().cloned() {
2902 let mut update = updates.remove(&key).unwrap();
2903
2904 if key.is_user() {
2905 debug!(id = %key, ?update, "update_hold_capability");
2906 }
2907
2908 if let Some(collection) = self.collections.get_mut(&key) {
2909 match &mut collection.extra_state {
2910 CollectionStateExtra::Ingestion(ingestion) => {
2911 let changes = ingestion.read_capabilities.update_iter(update.drain());
2912 update.extend(changes);
2913
2914 let (changes, frontier, _cluster_id) =
2915 collections_net.entry(key).or_insert_with(|| {
2916 (
2917 <ChangeBatch<_>>::new(),
2918 Antichain::new(),
2919 ingestion.instance_id,
2920 )
2921 });
2922
2923 changes.extend(update.drain());
2924 *frontier = ingestion.read_capabilities.frontier().to_owned();
2925 }
2926 CollectionStateExtra::None => {
2927 soft_panic_or_log!(
2929 "trying to update holds for collection {collection:?} which is not \
2930 an ingestion: {update:?}"
2931 );
2932 continue;
2933 }
2934 CollectionStateExtra::Export(export) => {
2935 let changes = export.read_capabilities.update_iter(update.drain());
2936 update.extend(changes);
2937
2938 let (changes, frontier, _cluster_id) =
2939 collections_net.entry(key).or_insert_with(|| {
2940 (<ChangeBatch<_>>::new(), Antichain::new(), export.cluster_id)
2941 });
2942
2943 changes.extend(update.drain());
2944 *frontier = export.read_capabilities.frontier().to_owned();
2945 }
2946 }
2947 } else {
2948 tracing::warn!(id = ?key, ?update, "update_hold_capabilities for unknown object");
2950 }
2951 }
2952
2953 for (key, (mut changes, frontier, cluster_id)) in collections_net {
2956 if !changes.is_empty() {
2957 if key.is_user() {
2958 debug!(id = %key, ?frontier, "downgrading ingestion read holds!");
2959 }
2960
2961 let collection = self
2962 .collections
2963 .get_mut(&key)
2964 .expect("missing collection state");
2965
2966 let read_holds = match &mut collection.extra_state {
2967 CollectionStateExtra::Ingestion(ingestion) => {
2968 ingestion.dependency_read_holds.as_mut_slice()
2969 }
2970 CollectionStateExtra::Export(export) => export.read_holds.as_mut_slice(),
2971 CollectionStateExtra::None => {
2972 soft_panic_or_log!(
2973 "trying to downgrade read holds for collection which is not an \
2974 ingestion: {collection:?}"
2975 );
2976 continue;
2977 }
2978 };
2979
2980 for read_hold in read_holds.iter_mut() {
2981 read_hold
2982 .try_downgrade(frontier.clone())
2983 .expect("we only advance the frontier");
2984 }
2985
2986 if let Some(instance) = self.instances.get_mut(&cluster_id) {
2988 instance.send(StorageCommand::AllowCompaction(key, frontier.clone()));
2989 } else {
2990 soft_panic_or_log!(
2991 "missing instance client for cluster {cluster_id} while we still have outstanding AllowCompaction command {frontier:?} for {key}"
2992 );
2993 }
2994 }
2995 }
2996 }
2997
2998 fn validate_collection_ids(
3000 &self,
3001 ids: impl Iterator<Item = GlobalId>,
3002 ) -> Result<(), StorageError<T>> {
3003 for id in ids {
3004 self.storage_collections.check_exists(id)?;
3005 }
3006 Ok(())
3007 }
3008
3009 fn validate_export_ids(
3011 &self,
3012 ids: impl Iterator<Item = GlobalId>,
3013 ) -> Result<(), StorageError<T>> {
3014 for id in ids {
3015 self.export(id)?;
3016 }
3017 Ok(())
3018 }
3019
3020 async fn open_data_handles(
3028 &self,
3029 id: &GlobalId,
3030 shard: ShardId,
3031 relation_desc: RelationDesc,
3032 persist_client: &PersistClient,
3033 ) -> WriteHandle<SourceData, (), T, StorageDiff> {
3034 let diagnostics = Diagnostics {
3035 shard_name: id.to_string(),
3036 handle_purpose: format!("controller data for {}", id),
3037 };
3038
3039 let mut write = persist_client
3040 .open_writer(
3041 shard,
3042 Arc::new(relation_desc),
3043 Arc::new(UnitSchema),
3044 diagnostics.clone(),
3045 )
3046 .await
3047 .expect("invalid persist usage");
3048
3049 write.fetch_recent_upper().await;
3058
3059 write
3060 }
3061
3062 fn register_introspection_collection(
3067 &mut self,
3068 id: GlobalId,
3069 introspection_type: IntrospectionType,
3070 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
3071 persist_client: PersistClient,
3072 ) -> Result<(), StorageError<T>> {
3073 tracing::info!(%id, ?introspection_type, "registering introspection collection");
3074
3075 let force_writable = self.read_only && self.migrated_storage_collections.contains(&id);
3079 if force_writable {
3080 assert!(id.is_system(), "unexpected non-system global id: {id:?}");
3081 info!("writing to migrated storage collection {id} in read-only mode");
3082 }
3083
3084 let prev = self.introspection_ids.insert(introspection_type, id);
3085 assert!(
3086 prev.is_none(),
3087 "cannot have multiple IDs for introspection type"
3088 );
3089
3090 let metadata = self.storage_collections.collection_metadata(id)?.clone();
3091
3092 let read_handle_fn = move || {
3093 let persist_client = persist_client.clone();
3094 let metadata = metadata.clone();
3095
3096 let fut = async move {
3097 let read_handle = persist_client
3098 .open_leased_reader::<SourceData, (), T, StorageDiff>(
3099 metadata.data_shard,
3100 Arc::new(metadata.relation_desc.clone()),
3101 Arc::new(UnitSchema),
3102 Diagnostics {
3103 shard_name: id.to_string(),
3104 handle_purpose: format!("snapshot {}", id),
3105 },
3106 USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3107 )
3108 .await
3109 .expect("invalid persist usage");
3110 read_handle
3111 };
3112
3113 fut.boxed()
3114 };
3115
3116 let recent_upper = write_handle.shared_upper();
3117
3118 match CollectionManagerKind::from(&introspection_type) {
3119 CollectionManagerKind::Differential => {
3124 let statistics_retention_duration =
3125 dyncfgs::STATISTICS_RETENTION_DURATION.get(self.config().config_set());
3126
3127 let introspection_config = DifferentialIntrospectionConfig {
3129 recent_upper,
3130 introspection_type,
3131 storage_collections: Arc::clone(&self.storage_collections),
3132 collection_manager: self.collection_manager.clone(),
3133 source_statistics: Arc::clone(&self.source_statistics),
3134 sink_statistics: Arc::clone(&self.sink_statistics),
3135 statistics_interval: self.config.parameters.statistics_interval.clone(),
3136 statistics_interval_receiver: self.statistics_interval_sender.subscribe(),
3137 statistics_retention_duration,
3138 metrics: self.metrics.clone(),
3139 introspection_tokens: Arc::clone(&self.introspection_tokens),
3140 };
3141 self.collection_manager.register_differential_collection(
3142 id,
3143 write_handle,
3144 read_handle_fn,
3145 force_writable,
3146 introspection_config,
3147 );
3148 }
3149 CollectionManagerKind::AppendOnly => {
3157 let introspection_config = AppendOnlyIntrospectionConfig {
3158 introspection_type,
3159 config_set: Arc::clone(self.config.config_set()),
3160 parameters: self.config.parameters.clone(),
3161 storage_collections: Arc::clone(&self.storage_collections),
3162 };
3163 self.collection_manager.register_append_only_collection(
3164 id,
3165 write_handle,
3166 force_writable,
3167 Some(introspection_config),
3168 );
3169 }
3170 }
3171
3172 Ok(())
3173 }
3174
3175 fn reconcile_dangling_statistics(&self) {
3178 self.source_statistics
3179 .lock()
3180 .expect("poisoned")
3181 .source_statistics
3182 .retain(|(k, _replica_id), _| self.storage_collections.check_exists(*k).is_ok());
3184 self.sink_statistics
3185 .lock()
3186 .expect("poisoned")
3187 .retain(|(k, _replica_id), _| self.export(*k).is_ok());
3188 }
3189
3190 #[instrument(level = "debug")]
3200 fn append_shard_mappings<I>(&self, global_ids: I, diff: Diff)
3201 where
3202 I: Iterator<Item = GlobalId>,
3203 {
3204 mz_ore::soft_assert_or_log!(
3205 diff == Diff::MINUS_ONE || diff == Diff::ONE,
3206 "use 1 for insert or -1 for delete"
3207 );
3208
3209 let id = *self
3210 .introspection_ids
3211 .get(&IntrospectionType::ShardMapping)
3212 .expect("should be registered before this call");
3213
3214 let mut updates = vec![];
3215 let mut row_buf = Row::default();
3217
3218 for global_id in global_ids {
3219 let shard_id = if let Some(collection) = self.collections.get(&global_id) {
3220 collection.collection_metadata.data_shard.clone()
3221 } else {
3222 panic!("unknown global id: {}", global_id);
3223 };
3224
3225 let mut packer = row_buf.packer();
3226 packer.push(Datum::from(global_id.to_string().as_str()));
3227 packer.push(Datum::from(shard_id.to_string().as_str()));
3228 updates.push((row_buf.clone(), diff));
3229 }
3230
3231 self.collection_manager.differential_append(id, updates);
3232 }
3233
3234 fn determine_collection_dependencies(
3236 &self,
3237 self_id: GlobalId,
3238 data_source: &DataSource<T>,
3239 ) -> Result<Vec<GlobalId>, StorageError<T>> {
3240 let dependency = match &data_source {
3241 DataSource::Introspection(_)
3242 | DataSource::Webhook
3243 | DataSource::Table { primary: None }
3244 | DataSource::Progress
3245 | DataSource::Other => vec![],
3246 DataSource::Table {
3247 primary: Some(primary),
3248 } => vec![*primary],
3249 DataSource::IngestionExport { ingestion_id, .. } => {
3250 let source_collection = self.collection(*ingestion_id)?;
3253 let ingestion_remap_collection_id = match &source_collection.data_source {
3254 DataSource::Ingestion(ingestion) => ingestion.remap_collection_id,
3255 _ => unreachable!(
3256 "SourceExport must only refer to primary sources that already exist"
3257 ),
3258 };
3259
3260 vec![self_id, ingestion_remap_collection_id]
3266 }
3267 DataSource::Ingestion(ingestion) => {
3269 vec![self_id, ingestion.remap_collection_id]
3274 }
3275 DataSource::Sink { desc } => {
3276 vec![self_id, desc.sink.from]
3278 }
3279 };
3280
3281 Ok(dependency)
3282 }
3283
3284 async fn read_handle_for_snapshot(
3285 &self,
3286 id: GlobalId,
3287 ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
3288 let metadata = self.storage_collections.collection_metadata(id)?;
3289 read_handle_for_snapshot(&self.persist, id, &metadata).await
3290 }
3291
3292 fn record_status_updates(&mut self, updates: Vec<StatusUpdate>) {
3295 if self.read_only {
3296 return;
3297 }
3298
3299 let mut sink_status_updates = vec![];
3300 let mut source_status_updates = vec![];
3301
3302 for update in updates {
3303 let id = update.id;
3304 if self.export(id).is_ok() {
3305 sink_status_updates.push(update);
3306 } else if self.storage_collections.check_exists(id).is_ok() {
3307 source_status_updates.push(update);
3308 }
3309 }
3310
3311 self.append_status_introspection_updates(
3312 IntrospectionType::SourceStatusHistory,
3313 source_status_updates,
3314 );
3315 self.append_status_introspection_updates(
3316 IntrospectionType::SinkStatusHistory,
3317 sink_status_updates,
3318 );
3319 }
3320
3321 fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, StorageError<T>> {
3322 self.collections
3323 .get(&id)
3324 .ok_or(StorageError::IdentifierMissing(id))
3325 }
3326
3327 fn run_ingestion(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3330 tracing::info!(%id, "starting ingestion");
3331
3332 let collection = self.collection(id)?;
3333 let ingestion_description = match &collection.data_source {
3334 DataSource::Ingestion(i) => i.clone(),
3335 _ => {
3336 tracing::warn!("run_ingestion called on non-ingestion ID {}", id);
3337 Err(StorageError::IdentifierInvalid(id))?
3338 }
3339 };
3340
3341 let mut source_exports = BTreeMap::new();
3343 for (
3344 export_id,
3345 SourceExport {
3346 storage_metadata: (),
3347 details,
3348 data_config,
3349 },
3350 ) in ingestion_description.source_exports.clone()
3351 {
3352 let export_storage_metadata = self.collection(export_id)?.collection_metadata.clone();
3353 source_exports.insert(
3354 export_id,
3355 SourceExport {
3356 storage_metadata: export_storage_metadata,
3357 details,
3358 data_config,
3359 },
3360 );
3361 }
3362
3363 let description = IngestionDescription::<CollectionMetadata> {
3364 source_exports,
3365 ingestion_metadata: collection.collection_metadata.clone(),
3368 desc: ingestion_description.desc.clone(),
3370 instance_id: ingestion_description.instance_id,
3371 remap_collection_id: ingestion_description.remap_collection_id,
3372 };
3373
3374 let storage_instance_id = description.instance_id;
3375 let instance = self
3377 .instances
3378 .get_mut(&storage_instance_id)
3379 .ok_or_else(|| StorageError::IngestionInstanceMissing {
3380 storage_instance_id,
3381 ingestion_id: id,
3382 })?;
3383
3384 let augmented_ingestion = Box::new(RunIngestionCommand { id, description });
3385 instance.send(StorageCommand::RunIngestion(augmented_ingestion));
3386
3387 Ok(())
3388 }
3389
3390 fn run_export(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3393 let DataSource::Sink { desc: description } = &self.collections[&id].data_source else {
3394 return Err(StorageError::IdentifierMissing(id));
3395 };
3396
3397 let from_storage_metadata = self
3398 .storage_collections
3399 .collection_metadata(description.sink.from)?;
3400 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
3401
3402 let enable_snapshot_frontier =
3406 dyncfgs::STORAGE_SINK_SNAPSHOT_FRONTIER.get(self.config().config_set());
3407 let export_state = self.storage_collections.collection_frontiers(id)?;
3408 let mut as_of = description.sink.as_of.clone();
3409 as_of.join_assign(&export_state.implied_capability);
3410 let with_snapshot = if enable_snapshot_frontier
3411 && PartialOrder::less_than(&as_of, &export_state.write_frontier)
3412 {
3413 false
3414 } else {
3415 description.sink.with_snapshot
3416 };
3417
3418 info!(
3419 sink_id = %id,
3420 from_id = %description.sink.from,
3421 write_frontier = ?export_state.write_frontier,
3422 ?as_of,
3423 ?with_snapshot,
3424 "run_export"
3425 );
3426
3427 let cmd = RunSinkCommand {
3428 id,
3429 description: StorageSinkDesc {
3430 from: description.sink.from,
3431 from_desc: description.sink.from_desc.clone(),
3432 connection: description.sink.connection.clone(),
3433 envelope: description.sink.envelope,
3434 as_of,
3435 version: description.sink.version,
3436 from_storage_metadata,
3437 with_snapshot,
3438 to_storage_metadata,
3439 },
3440 };
3441
3442 let storage_instance_id = description.instance_id.clone();
3443
3444 let instance = self
3445 .instances
3446 .get_mut(&storage_instance_id)
3447 .ok_or_else(|| StorageError::ExportInstanceMissing {
3448 storage_instance_id,
3449 export_id: id,
3450 })?;
3451
3452 instance.send(StorageCommand::RunSink(Box::new(cmd)));
3453
3454 Ok(())
3455 }
3456
3457 fn update_frontier_introspection(&mut self) {
3462 let mut global_frontiers = BTreeMap::new();
3463 let mut replica_frontiers = BTreeMap::new();
3464
3465 for collection_frontiers in self.storage_collections.active_collection_frontiers() {
3466 let id = collection_frontiers.id;
3467 let since = collection_frontiers.read_capabilities;
3468 let upper = collection_frontiers.write_frontier;
3469
3470 let instance = self
3471 .collections
3472 .get(&id)
3473 .and_then(|collection_state| match &collection_state.extra_state {
3474 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
3475 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
3476 CollectionStateExtra::None => None,
3477 })
3478 .and_then(|i| self.instances.get(&i));
3479
3480 if let Some(instance) = instance {
3481 for replica_id in instance.replica_ids() {
3482 replica_frontiers.insert((id, replica_id), upper.clone());
3483 }
3484 }
3485
3486 global_frontiers.insert(id, (since, upper));
3487 }
3488
3489 let mut global_updates = Vec::new();
3490 let mut replica_updates = Vec::new();
3491
3492 let mut push_global_update =
3493 |id: GlobalId, (since, upper): (Antichain<T>, Antichain<T>), diff: Diff| {
3494 let read_frontier = since.into_option().map_or(Datum::Null, |t| t.into());
3495 let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3496 let row = Row::pack_slice(&[
3497 Datum::String(&id.to_string()),
3498 read_frontier,
3499 write_frontier,
3500 ]);
3501 global_updates.push((row, diff));
3502 };
3503
3504 let mut push_replica_update =
3505 |(id, replica_id): (GlobalId, ReplicaId), upper: Antichain<T>, diff: Diff| {
3506 let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3507 let row = Row::pack_slice(&[
3508 Datum::String(&id.to_string()),
3509 Datum::String(&replica_id.to_string()),
3510 write_frontier,
3511 ]);
3512 replica_updates.push((row, diff));
3513 };
3514
3515 let mut old_global_frontiers =
3516 std::mem::replace(&mut self.recorded_frontiers, global_frontiers);
3517 for (&id, new) in &self.recorded_frontiers {
3518 match old_global_frontiers.remove(&id) {
3519 Some(old) if &old != new => {
3520 push_global_update(id, new.clone(), Diff::ONE);
3521 push_global_update(id, old, Diff::MINUS_ONE);
3522 }
3523 Some(_) => (),
3524 None => push_global_update(id, new.clone(), Diff::ONE),
3525 }
3526 }
3527 for (id, old) in old_global_frontiers {
3528 push_global_update(id, old, Diff::MINUS_ONE);
3529 }
3530
3531 let mut old_replica_frontiers =
3532 std::mem::replace(&mut self.recorded_replica_frontiers, replica_frontiers);
3533 for (&key, new) in &self.recorded_replica_frontiers {
3534 match old_replica_frontiers.remove(&key) {
3535 Some(old) if &old != new => {
3536 push_replica_update(key, new.clone(), Diff::ONE);
3537 push_replica_update(key, old, Diff::MINUS_ONE);
3538 }
3539 Some(_) => (),
3540 None => push_replica_update(key, new.clone(), Diff::ONE),
3541 }
3542 }
3543 for (key, old) in old_replica_frontiers {
3544 push_replica_update(key, old, Diff::MINUS_ONE);
3545 }
3546
3547 let id = self.introspection_ids[&IntrospectionType::Frontiers];
3548 self.collection_manager
3549 .differential_append(id, global_updates);
3550
3551 let id = self.introspection_ids[&IntrospectionType::ReplicaFrontiers];
3552 self.collection_manager
3553 .differential_append(id, replica_updates);
3554 }
3555
3556 fn refresh_wallclock_lag(&mut self) {
3575 let now_ms = (self.now)();
3576 let histogram_period =
3577 WallclockLagHistogramPeriod::from_epoch_millis(now_ms, self.config.config_set());
3578
3579 let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
3580 Some(ts) => (self.wallclock_lag)(ts.clone()),
3581 None => Duration::ZERO,
3582 };
3583
3584 for frontiers in self.storage_collections.active_collection_frontiers() {
3585 let id = frontiers.id;
3586 let Some(collection) = self.collections.get_mut(&id) else {
3587 continue;
3588 };
3589
3590 let collection_unreadable =
3591 PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
3592 let lag = if collection_unreadable {
3593 WallclockLag::Undefined
3594 } else {
3595 let lag = frontier_lag(&frontiers.write_frontier);
3596 WallclockLag::Seconds(lag.as_secs())
3597 };
3598
3599 collection.wallclock_lag_max = collection.wallclock_lag_max.max(lag);
3600
3601 let secs = lag.unwrap_seconds_or(u64::MAX);
3604 collection.wallclock_lag_metrics.observe(secs);
3605
3606 if !ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION.get(self.config.config_set()) {
3607 continue;
3608 }
3609
3610 if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
3611 let bucket = lag.map_seconds(|secs| secs.next_power_of_two());
3612
3613 let instance_id = match &collection.extra_state {
3614 CollectionStateExtra::Ingestion(i) => Some(i.instance_id),
3615 CollectionStateExtra::Export(e) => Some(e.cluster_id()),
3616 CollectionStateExtra::None => None,
3617 };
3618 let workload_class = instance_id
3619 .and_then(|id| self.instances.get(&id))
3620 .and_then(|i| i.workload_class.clone());
3621 let labels = match workload_class {
3622 Some(wc) => [("workload_class", wc.clone())].into(),
3623 None => BTreeMap::new(),
3624 };
3625
3626 let key = (histogram_period, bucket, labels);
3627 *stash.entry(key).or_default() += Diff::ONE;
3628 }
3629 }
3630
3631 self.maybe_record_wallclock_lag();
3633 }
3634
3635 fn maybe_record_wallclock_lag(&mut self) {
3643 if self.read_only {
3644 return;
3645 }
3646
3647 let duration_trunc = |datetime: DateTime<_>, interval| {
3648 let td = TimeDelta::from_std(interval).ok()?;
3649 datetime.duration_trunc(td).ok()
3650 };
3651
3652 let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(self.config.config_set());
3653 let now_dt = mz_ore::now::to_datetime((self.now)());
3654 let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
3655 soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
3656 let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
3657 duration_trunc(now_dt, *default).unwrap()
3658 });
3659 if now_trunc <= self.wallclock_lag_last_recorded {
3660 return;
3661 }
3662
3663 let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
3664
3665 let mut history_updates = Vec::new();
3666 let mut histogram_updates = Vec::new();
3667 let mut row_buf = Row::default();
3668 for frontiers in self.storage_collections.active_collection_frontiers() {
3669 let id = frontiers.id;
3670 let Some(collection) = self.collections.get_mut(&id) else {
3671 continue;
3672 };
3673
3674 let max_lag = std::mem::replace(&mut collection.wallclock_lag_max, WallclockLag::MIN);
3675 let row = Row::pack_slice(&[
3676 Datum::String(&id.to_string()),
3677 Datum::Null,
3678 max_lag.into_interval_datum(),
3679 Datum::TimestampTz(now_ts),
3680 ]);
3681 history_updates.push((row, Diff::ONE));
3682
3683 let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
3684 continue;
3685 };
3686
3687 for ((period, lag, labels), count) in std::mem::take(stash) {
3688 let mut packer = row_buf.packer();
3689 packer.extend([
3690 Datum::TimestampTz(period.start),
3691 Datum::TimestampTz(period.end),
3692 Datum::String(&id.to_string()),
3693 lag.into_uint64_datum(),
3694 ]);
3695 let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
3696 packer.push_dict(labels);
3697
3698 histogram_updates.push((row_buf.clone(), count));
3699 }
3700 }
3701
3702 if !history_updates.is_empty() {
3703 self.append_introspection_updates(
3704 IntrospectionType::WallclockLagHistory,
3705 history_updates,
3706 );
3707 }
3708 if !histogram_updates.is_empty() {
3709 self.append_introspection_updates(
3710 IntrospectionType::WallclockLagHistogram,
3711 histogram_updates,
3712 );
3713 }
3714
3715 self.wallclock_lag_last_recorded = now_trunc;
3716 }
3717
3718 fn maintain(&mut self) {
3723 self.update_frontier_introspection();
3724 self.refresh_wallclock_lag();
3725
3726 for instance in self.instances.values_mut() {
3728 instance.refresh_state_metrics();
3729 }
3730 }
3731}
3732
3733impl From<&IntrospectionType> for CollectionManagerKind {
3734 fn from(value: &IntrospectionType) -> Self {
3735 match value {
3736 IntrospectionType::ShardMapping
3737 | IntrospectionType::Frontiers
3738 | IntrospectionType::ReplicaFrontiers
3739 | IntrospectionType::StorageSourceStatistics
3740 | IntrospectionType::StorageSinkStatistics
3741 | IntrospectionType::ComputeDependencies
3742 | IntrospectionType::ComputeOperatorHydrationStatus
3743 | IntrospectionType::ComputeMaterializedViewRefreshes
3744 | IntrospectionType::ComputeErrorCounts
3745 | IntrospectionType::ComputeHydrationTimes => CollectionManagerKind::Differential,
3746
3747 IntrospectionType::SourceStatusHistory
3748 | IntrospectionType::SinkStatusHistory
3749 | IntrospectionType::PrivatelinkConnectionStatusHistory
3750 | IntrospectionType::ReplicaStatusHistory
3751 | IntrospectionType::ReplicaMetricsHistory
3752 | IntrospectionType::WallclockLagHistory
3753 | IntrospectionType::WallclockLagHistogram
3754 | IntrospectionType::PreparedStatementHistory
3755 | IntrospectionType::StatementExecutionHistory
3756 | IntrospectionType::SessionHistory
3757 | IntrospectionType::StatementLifecycleHistory
3758 | IntrospectionType::SqlText => CollectionManagerKind::AppendOnly,
3759 }
3760 }
3761}
3762
3763async fn snapshot_statistics<T>(
3769 id: GlobalId,
3770 upper: Antichain<T>,
3771 storage_collections: &Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
3772) -> Vec<Row>
3773where
3774 T: Codec64 + From<EpochMillis> + TimestampManipulation,
3775{
3776 match upper.as_option() {
3777 Some(f) if f > &T::minimum() => {
3778 let as_of = f.step_back().unwrap();
3779
3780 let snapshot = storage_collections.snapshot(id, as_of).await.unwrap();
3781 snapshot
3782 .into_iter()
3783 .map(|(row, diff)| {
3784 assert_eq!(diff, 1);
3785 row
3786 })
3787 .collect()
3788 }
3789 _ => Vec::new(),
3792 }
3793}
3794
3795async fn read_handle_for_snapshot<T>(
3796 persist: &PersistClientCache,
3797 id: GlobalId,
3798 metadata: &CollectionMetadata,
3799) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>>
3800where
3801 T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
3802{
3803 let persist_client = persist
3804 .open(metadata.persist_location.clone())
3805 .await
3806 .unwrap();
3807
3808 let read_handle = persist_client
3813 .open_leased_reader::<SourceData, (), _, _>(
3814 metadata.data_shard,
3815 Arc::new(metadata.relation_desc.clone()),
3816 Arc::new(UnitSchema),
3817 Diagnostics {
3818 shard_name: id.to_string(),
3819 handle_purpose: format!("snapshot {}", id),
3820 },
3821 USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3822 )
3823 .await
3824 .expect("invalid persist usage");
3825 Ok(read_handle)
3826}
3827
3828#[derive(Debug)]
3830struct CollectionState<T: TimelyTimestamp> {
3831 pub data_source: DataSource<T>,
3833
3834 pub collection_metadata: CollectionMetadata,
3835
3836 pub extra_state: CollectionStateExtra<T>,
3837
3838 wallclock_lag_max: WallclockLag,
3840 wallclock_lag_histogram_stash: Option<
3847 BTreeMap<
3848 (
3849 WallclockLagHistogramPeriod,
3850 WallclockLag,
3851 BTreeMap<&'static str, String>,
3852 ),
3853 Diff,
3854 >,
3855 >,
3856 wallclock_lag_metrics: WallclockLagMetrics,
3858}
3859
3860impl<T: TimelyTimestamp> CollectionState<T> {
3861 fn new(
3862 data_source: DataSource<T>,
3863 collection_metadata: CollectionMetadata,
3864 extra_state: CollectionStateExtra<T>,
3865 wallclock_lag_metrics: WallclockLagMetrics,
3866 ) -> Self {
3867 let wallclock_lag_histogram_stash = match &data_source {
3871 DataSource::Other => None,
3872 _ => Some(Default::default()),
3873 };
3874
3875 Self {
3876 data_source,
3877 collection_metadata,
3878 extra_state,
3879 wallclock_lag_max: WallclockLag::MIN,
3880 wallclock_lag_histogram_stash,
3881 wallclock_lag_metrics,
3882 }
3883 }
3884}
3885
3886#[derive(Debug)]
3888enum CollectionStateExtra<T: TimelyTimestamp> {
3889 Ingestion(IngestionState<T>),
3890 Export(ExportState<T>),
3891 None,
3892}
3893
3894#[derive(Debug)]
3896struct IngestionState<T: TimelyTimestamp> {
3897 pub read_capabilities: MutableAntichain<T>,
3899
3900 pub derived_since: Antichain<T>,
3903
3904 pub dependency_read_holds: Vec<ReadHold<T>>,
3906
3907 pub write_frontier: Antichain<T>,
3909
3910 pub hold_policy: ReadPolicy<T>,
3917
3918 pub instance_id: StorageInstanceId,
3920
3921 pub hydrated_on: BTreeSet<ReplicaId>,
3923}
3924
3925struct StatusHistoryDesc<K> {
3930 retention_policy: StatusHistoryRetentionPolicy,
3931 extract_key: Box<dyn Fn(&[Datum]) -> K + Send>,
3932 extract_time: Box<dyn Fn(&[Datum]) -> CheckedTimestamp<DateTime<Utc>> + Send>,
3933}
3934enum StatusHistoryRetentionPolicy {
3935 LastN(usize),
3937 TimeWindow(Duration),
3939}
3940
3941fn source_status_history_desc(
3942 params: &StorageParameters,
3943) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3944 let desc = &MZ_SOURCE_STATUS_HISTORY_DESC;
3945 let (source_id_idx, _) = desc.get_by_name(&"source_id".into()).expect("exists");
3946 let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3947 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3948
3949 StatusHistoryDesc {
3950 retention_policy: StatusHistoryRetentionPolicy::LastN(
3951 params.keep_n_source_status_history_entries,
3952 ),
3953 extract_key: Box::new(move |datums| {
3954 (
3955 GlobalId::from_str(datums[source_id_idx].unwrap_str()).expect("GlobalId column"),
3956 if datums[replica_id_idx].is_null() {
3957 None
3958 } else {
3959 Some(
3960 ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3961 .expect("ReplicaId column"),
3962 )
3963 },
3964 )
3965 }),
3966 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3967 }
3968}
3969
3970fn sink_status_history_desc(
3971 params: &StorageParameters,
3972) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3973 let desc = &MZ_SINK_STATUS_HISTORY_DESC;
3974 let (sink_id_idx, _) = desc.get_by_name(&"sink_id".into()).expect("exists");
3975 let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3976 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3977
3978 StatusHistoryDesc {
3979 retention_policy: StatusHistoryRetentionPolicy::LastN(
3980 params.keep_n_sink_status_history_entries,
3981 ),
3982 extract_key: Box::new(move |datums| {
3983 (
3984 GlobalId::from_str(datums[sink_id_idx].unwrap_str()).expect("GlobalId column"),
3985 if datums[replica_id_idx].is_null() {
3986 None
3987 } else {
3988 Some(
3989 ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3990 .expect("ReplicaId column"),
3991 )
3992 },
3993 )
3994 }),
3995 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3996 }
3997}
3998
3999fn privatelink_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<GlobalId> {
4000 let desc = &MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC;
4001 let (key_idx, _) = desc.get_by_name(&"connection_id".into()).expect("exists");
4002 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4003
4004 StatusHistoryDesc {
4005 retention_policy: StatusHistoryRetentionPolicy::LastN(
4006 params.keep_n_privatelink_status_history_entries,
4007 ),
4008 extract_key: Box::new(move |datums| {
4009 GlobalId::from_str(datums[key_idx].unwrap_str()).expect("GlobalId column")
4010 }),
4011 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4012 }
4013}
4014
4015fn replica_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<(GlobalId, u64)> {
4016 let desc = &REPLICA_STATUS_HISTORY_DESC;
4017 let (replica_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
4018 let (process_idx, _) = desc.get_by_name(&"process_id".into()).expect("exists");
4019 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4020
4021 StatusHistoryDesc {
4022 retention_policy: StatusHistoryRetentionPolicy::TimeWindow(
4023 params.replica_status_history_retention_window,
4024 ),
4025 extract_key: Box::new(move |datums| {
4026 (
4027 GlobalId::from_str(datums[replica_idx].unwrap_str()).expect("GlobalId column"),
4028 datums[process_idx].unwrap_uint64(),
4029 )
4030 }),
4031 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4032 }
4033}
4034
4035fn swap_updates<T: Timestamp>(
4037 from: &mut Antichain<T>,
4038 mut replace_with: Antichain<T>,
4039) -> ChangeBatch<T> {
4040 let mut update = ChangeBatch::new();
4041 if PartialOrder::less_equal(from, &replace_with) {
4042 update.extend(replace_with.iter().map(|time| (time.clone(), 1)));
4043 std::mem::swap(from, &mut replace_with);
4044 update.extend(replace_with.iter().map(|time| (time.clone(), -1)));
4045 }
4046 update
4047}