1use std::any::Any;
13use std::collections::btree_map;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt::{Debug, Display};
16use std::str::FromStr;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use async_trait::async_trait;
21use chrono::{DateTime, DurationRound, TimeDelta, Utc};
22use derivative::Derivative;
23use differential_dataflow::lattice::Lattice;
24use futures::FutureExt;
25use futures::StreamExt;
26use itertools::Itertools;
27use mz_build_info::BuildInfo;
28use mz_cluster_client::client::ClusterReplicaLocation;
29use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
30use mz_cluster_client::{ReplicaId, WallclockLagFn};
31use mz_controller_types::dyncfgs::{
32 ENABLE_0DT_DEPLOYMENT_SOURCES, ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION,
33 WALLCLOCK_LAG_RECORDING_INTERVAL,
34};
35use mz_ore::collections::CollectionExt;
36use mz_ore::metrics::MetricsRegistry;
37use mz_ore::now::{EpochMillis, NowFn};
38use mz_ore::task::AbortOnDropHandle;
39use mz_ore::{assert_none, halt, instrument, soft_panic_or_log};
40use mz_persist_client::batch::ProtoBatch;
41use mz_persist_client::cache::PersistClientCache;
42use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
43use mz_persist_client::read::ReadHandle;
44use mz_persist_client::schema::CaESchema;
45use mz_persist_client::write::WriteHandle;
46use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
47use mz_persist_types::Codec64;
48use mz_persist_types::codec_impls::UnitSchema;
49use mz_proto::RustType;
50use mz_repr::adt::timestamp::CheckedTimestamp;
51use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
52use mz_storage_client::client::{
53 AppendOnlyUpdate, ProtoStorageCommand, ProtoStorageResponse, RunIngestionCommand,
54 RunOneshotIngestion, RunSinkCommand, Status, StatusUpdate, StorageCommand, StorageResponse,
55 TableData,
56};
57use mz_storage_client::controller::{
58 BoxFuture, CollectionDescription, DataSource, ExportDescription, ExportState,
59 IntrospectionType, MonotonicAppender, PersistEpoch, Response, StorageController,
60 StorageMetadata, StorageTxn, StorageWriteOp, WallclockLag, WallclockLagHistogramPeriod,
61};
62use mz_storage_client::healthcheck::{
63 MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC,
64 MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_STATUS_HISTORY_DESC,
65};
66use mz_storage_client::metrics::StorageControllerMetrics;
67use mz_storage_client::statistics::{
68 ControllerSinkStatistics, ControllerSourceStatistics, WebhookStatistics,
69};
70use mz_storage_client::storage_collections::StorageCollections;
71use mz_storage_types::configuration::StorageConfiguration;
72use mz_storage_types::connections::ConnectionContext;
73use mz_storage_types::connections::inline::InlinedConnection;
74use mz_storage_types::controller::{AlterError, CollectionMetadata, StorageError, TxnsCodecRow};
75use mz_storage_types::instances::StorageInstanceId;
76use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
77use mz_storage_types::parameters::StorageParameters;
78use mz_storage_types::read_holds::ReadHold;
79use mz_storage_types::read_policy::ReadPolicy;
80use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
81use mz_storage_types::sources::{
82 GenericSourceConnection, IngestionDescription, SourceConnection, SourceData, SourceDesc,
83 SourceExport, SourceExportDataConfig,
84};
85use mz_storage_types::{AlterCompatible, StorageDiff, dyncfgs};
86use mz_txn_wal::metrics::Metrics as TxnMetrics;
87use mz_txn_wal::txn_read::TxnsRead;
88use mz_txn_wal::txns::TxnsHandle;
89use timely::order::{PartialOrder, TotalOrder};
90use timely::progress::Timestamp as TimelyTimestamp;
91use timely::progress::frontier::MutableAntichain;
92use timely::progress::{Antichain, ChangeBatch, Timestamp};
93use tokio::sync::watch::{Sender, channel};
94use tokio::sync::{mpsc, oneshot};
95use tokio::time::MissedTickBehavior;
96use tokio::time::error::Elapsed;
97use tracing::{debug, info, warn};
98
99use crate::collection_mgmt::{
100 AppendOnlyIntrospectionConfig, CollectionManagerKind, DifferentialIntrospectionConfig,
101};
102use crate::instance::{Instance, ReplicaConfig};
103
104mod collection_mgmt;
105mod history;
106mod instance;
107mod persist_handles;
108mod rtr;
109mod statistics;
110
111#[derive(Derivative)]
112#[derivative(Debug)]
113struct PendingOneshotIngestion {
114 #[derivative(Debug = "ignore")]
116 result_tx: OneshotResultCallback<ProtoBatch>,
117 cluster_id: StorageInstanceId,
119}
120
121impl PendingOneshotIngestion {
122 pub(crate) fn cancel(self) {
126 (self.result_tx)(vec![Err("canceled".to_string())])
127 }
128}
129
130#[derive(Derivative)]
132#[derivative(Debug)]
133pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation>
134{
135 build_info: &'static BuildInfo,
137 now: NowFn,
139
140 read_only: bool,
146
147 pub(crate) collections: BTreeMap<GlobalId, CollectionState<T>>,
152
153 dropped_objects: BTreeMap<GlobalId, BTreeSet<ReplicaId>>,
162
163 pub(crate) persist_table_worker: persist_handles::PersistTableWriteWorker<T>,
165 txns_read: TxnsRead<T>,
167 txns_metrics: Arc<TxnMetrics>,
168 stashed_responses: Vec<(Option<ReplicaId>, StorageResponse<T>)>,
169 #[derivative(Debug = "ignore")]
171 pending_table_handle_drops_tx: mpsc::UnboundedSender<GlobalId>,
172 #[derivative(Debug = "ignore")]
174 pending_table_handle_drops_rx: mpsc::UnboundedReceiver<GlobalId>,
175 #[derivative(Debug = "ignore")]
177 pending_oneshot_ingestions: BTreeMap<uuid::Uuid, PendingOneshotIngestion>,
178
179 pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
181
182 pub(crate) introspection_ids: BTreeMap<IntrospectionType, GlobalId>,
184 introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
189
190 source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
195 sink_statistics: Arc<Mutex<BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSinkStatistics>>>,
198 statistics_interval_sender: Sender<Duration>,
200
201 instances: BTreeMap<StorageInstanceId, Instance<T>>,
203 initialized: bool,
205 config: StorageConfiguration,
207 persist_location: PersistLocation,
209 persist: Arc<PersistClientCache>,
211 metrics: StorageControllerMetrics,
213 recorded_frontiers: BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)>,
216 recorded_replica_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<T>>,
219
220 #[derivative(Debug = "ignore")]
222 wallclock_lag: WallclockLagFn<T>,
223 wallclock_lag_last_recorded: DateTime<Utc>,
225
226 storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
228 migrated_storage_collections: BTreeSet<GlobalId>,
230
231 maintenance_ticker: tokio::time::Interval,
233 maintenance_scheduled: bool,
235
236 instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
238 instance_response_rx: mpsc::UnboundedReceiver<(Option<ReplicaId>, StorageResponse<T>)>,
240
241 persist_warm_task: Option<AbortOnDropHandle<Box<dyn Debug + Send>>>,
243}
244
245fn warm_persist_state_in_background(
250 client: PersistClient,
251 shard_ids: impl Iterator<Item = ShardId> + Send + 'static,
252) -> mz_ore::task::JoinHandle<Box<dyn Debug + Send>> {
253 const MAX_CONCURRENT_WARMS: usize = 16;
255 let logic = async move {
256 let fetchers: Vec<_> = tokio_stream::iter(shard_ids)
257 .map(|shard_id| {
258 let client = client.clone();
259 async move {
260 client
261 .create_batch_fetcher::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
262 shard_id,
263 Arc::new(RelationDesc::empty()),
264 Arc::new(UnitSchema),
265 true,
266 Diagnostics::from_purpose("warm persist load state"),
267 )
268 .await
269 }
270 })
271 .buffer_unordered(MAX_CONCURRENT_WARMS)
272 .collect()
273 .await;
274 let fetchers: Box<dyn Debug + Send> = Box::new(fetchers);
275 fetchers
276 };
277 mz_ore::task::spawn(|| "warm_persist_load_state", logic)
278}
279
280#[async_trait(?Send)]
281impl<T> StorageController for Controller<T>
282where
283 T: Timestamp
284 + Lattice
285 + TotalOrder
286 + Codec64
287 + From<EpochMillis>
288 + TimestampManipulation
289 + Into<Datum<'static>>
290 + Display,
291 StorageCommand<T>: RustType<ProtoStorageCommand>,
292 StorageResponse<T>: RustType<ProtoStorageResponse>,
293{
294 type Timestamp = T;
295
296 fn initialization_complete(&mut self) {
297 self.reconcile_dangling_statistics();
298 self.initialized = true;
299
300 for instance in self.instances.values_mut() {
301 instance.send(StorageCommand::InitializationComplete);
302 }
303 }
304
305 fn update_parameters(&mut self, config_params: StorageParameters) {
306 self.storage_collections
307 .update_parameters(config_params.clone());
308
309 self.persist.cfg().apply_from(&config_params.dyncfg_updates);
312
313 for instance in self.instances.values_mut() {
314 let params = Box::new(config_params.clone());
315 instance.send(StorageCommand::UpdateConfiguration(params));
316 }
317 self.config.update(config_params);
318 self.statistics_interval_sender
319 .send_replace(self.config.parameters.statistics_interval);
320 self.collection_manager.update_user_batch_duration(
321 self.config
322 .parameters
323 .user_storage_managed_collections_batch_duration,
324 );
325 }
326
327 fn config(&self) -> &StorageConfiguration {
329 &self.config
330 }
331
332 fn collection_metadata(
333 &self,
334 id: GlobalId,
335 ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>> {
336 self.storage_collections.collection_metadata(id)
337 }
338
339 fn collection_hydrated(
340 &self,
341 collection_id: GlobalId,
342 ) -> Result<bool, StorageError<Self::Timestamp>> {
343 let collection = self.collection(collection_id)?;
344
345 let instance_id = match &collection.data_source {
346 DataSource::Ingestion(ingestion_description) => ingestion_description.instance_id,
347 DataSource::IngestionExport { ingestion_id, .. } => {
348 let ingestion_state = self.collections.get(ingestion_id).expect("known to exist");
349
350 let instance_id = match &ingestion_state.data_source {
351 DataSource::Ingestion(ingestion_desc) => ingestion_desc.instance_id,
352 _ => unreachable!("SourceExport must only refer to primary source"),
353 };
354
355 instance_id
356 }
357 _ => return Ok(true),
358 };
359
360 let instance = self.instances.get(&instance_id).ok_or_else(|| {
361 StorageError::IngestionInstanceMissing {
362 storage_instance_id: instance_id,
363 ingestion_id: collection_id,
364 }
365 })?;
366
367 if instance.replica_ids().next().is_none() {
368 return Ok(true);
371 }
372
373 match &collection.extra_state {
374 CollectionStateExtra::Ingestion(ingestion_state) => {
375 Ok(ingestion_state.hydrated_on.len() >= 1)
377 }
378 CollectionStateExtra::Export(_) => {
379 Ok(true)
384 }
385 CollectionStateExtra::None => {
386 Ok(true)
390 }
391 }
392 }
393
394 #[mz_ore::instrument(level = "debug")]
395 fn collections_hydrated_on_replicas(
396 &self,
397 target_replica_ids: Option<Vec<ReplicaId>>,
398 target_cluster_id: &StorageInstanceId,
399 exclude_collections: &BTreeSet<GlobalId>,
400 ) -> Result<bool, StorageError<Self::Timestamp>> {
401 if target_replica_ids.as_ref().is_some_and(|v| v.is_empty()) {
404 return Ok(true);
405 }
406
407 let target_replicas: Option<BTreeSet<ReplicaId>> =
410 target_replica_ids.map(|ids| ids.into_iter().collect());
411
412 let mut all_hydrated = true;
413 for (collection_id, collection_state) in self.collections.iter() {
414 if collection_id.is_transient() || exclude_collections.contains(collection_id) {
415 continue;
416 }
417 let hydrated = match &collection_state.extra_state {
418 CollectionStateExtra::Ingestion(state) => {
419 if &state.instance_id != target_cluster_id {
420 continue;
421 }
422 match &target_replicas {
423 Some(target_replicas) => !state.hydrated_on.is_disjoint(target_replicas),
424 None => {
425 state.hydrated_on.len() >= 1
428 }
429 }
430 }
431 CollectionStateExtra::Export(_) => {
432 true
437 }
438 CollectionStateExtra::None => {
439 true
443 }
444 };
445 if !hydrated {
446 tracing::info!(%collection_id, "collection is not hydrated on any replica");
447 all_hydrated = false;
448 }
451 }
452 Ok(all_hydrated)
453 }
454
455 fn collection_frontiers(
456 &self,
457 id: GlobalId,
458 ) -> Result<
459 (Antichain<Self::Timestamp>, Antichain<Self::Timestamp>),
460 StorageError<Self::Timestamp>,
461 > {
462 let frontiers = self.storage_collections.collection_frontiers(id)?;
463 Ok((frontiers.implied_capability, frontiers.write_frontier))
464 }
465
466 fn collections_frontiers(
467 &self,
468 mut ids: Vec<GlobalId>,
469 ) -> Result<Vec<(GlobalId, Antichain<T>, Antichain<T>)>, StorageError<Self::Timestamp>> {
470 let mut result = vec![];
471 ids.retain(|&id| match self.export(id) {
476 Ok(export) => {
477 result.push((
478 id,
479 export.input_hold().since().clone(),
480 export.write_frontier.clone(),
481 ));
482 false
483 }
484 Err(_) => true,
485 });
486 result.extend(
487 self.storage_collections
488 .collections_frontiers(ids)?
489 .into_iter()
490 .map(|frontiers| {
491 (
492 frontiers.id,
493 frontiers.implied_capability,
494 frontiers.write_frontier,
495 )
496 }),
497 );
498
499 Ok(result)
500 }
501
502 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
503 self.storage_collections.active_collection_metadatas()
504 }
505
506 fn active_ingestions(
507 &self,
508 instance_id: StorageInstanceId,
509 ) -> Box<dyn Iterator<Item = &GlobalId> + '_> {
510 Box::new(self.instances[&instance_id].active_ingestions())
511 }
512
513 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
514 self.storage_collections.check_exists(id)
515 }
516
517 fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>) {
518 let metrics = self.metrics.for_instance(id);
519 let mut instance = Instance::new(
520 workload_class,
521 metrics,
522 self.now.clone(),
523 self.instance_response_tx.clone(),
524 );
525 if self.initialized {
526 instance.send(StorageCommand::InitializationComplete);
527 }
528 if !self.read_only {
529 instance.send(StorageCommand::AllowWrites);
530 }
531
532 let params = Box::new(self.config.parameters.clone());
533 instance.send(StorageCommand::UpdateConfiguration(params));
534
535 let old_instance = self.instances.insert(id, instance);
536 assert_none!(old_instance, "storage instance {id} already exists");
537 }
538
539 fn drop_instance(&mut self, id: StorageInstanceId) {
540 let instance = self.instances.remove(&id);
541 assert!(instance.is_some(), "storage instance {id} does not exist");
542 }
543
544 fn update_instance_workload_class(
545 &mut self,
546 id: StorageInstanceId,
547 workload_class: Option<String>,
548 ) {
549 let instance = self
550 .instances
551 .get_mut(&id)
552 .unwrap_or_else(|| panic!("instance {id} does not exist"));
553
554 instance.workload_class = workload_class;
555 }
556
557 fn connect_replica(
558 &mut self,
559 instance_id: StorageInstanceId,
560 replica_id: ReplicaId,
561 location: ClusterReplicaLocation,
562 enable_ctp: bool,
563 ) {
564 let instance = self
565 .instances
566 .get_mut(&instance_id)
567 .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
568
569 let config = ReplicaConfig {
570 build_info: self.build_info,
571 location,
572 grpc_client: self.config.parameters.grpc_client.clone(),
573 enable_ctp,
574 };
575 instance.add_replica(replica_id, config);
576 }
577
578 fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId) {
579 let instance = self
580 .instances
581 .get_mut(&instance_id)
582 .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
583
584 let status_now = mz_ore::now::to_datetime((self.now)());
585 let mut source_status_updates = vec![];
586 let mut sink_status_updates = vec![];
587
588 let make_update = |id, object_type| StatusUpdate {
589 id,
590 status: Status::Paused,
591 timestamp: status_now,
592 error: None,
593 hints: BTreeSet::from([format!(
594 "The replica running this {object_type} has been dropped"
595 )]),
596 namespaced_errors: Default::default(),
597 replica_id: Some(replica_id),
598 };
599
600 for id in instance.active_ingestions() {
601 if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
602 active_replicas.remove(&replica_id);
603 if active_replicas.is_empty() {
604 self.dropped_objects.remove(id);
605 }
606 }
607
608 let ingestion = self
609 .collections
610 .get_mut(id)
611 .expect("instance contains unknown ingestion");
612
613 let ingestion_description = match &ingestion.data_source {
614 DataSource::Ingestion(ingestion_description) => ingestion_description.clone(),
615 _ => panic!(
616 "unexpected data source for ingestion: {:?}",
617 ingestion.data_source
618 ),
619 };
620
621 let subsource_ids = ingestion_description
631 .collection_ids()
632 .filter(|id| id != &ingestion_description.remap_collection_id);
633 for id in subsource_ids {
634 source_status_updates.push(make_update(id, "source"));
635 }
636 }
637
638 for id in instance.active_exports() {
639 if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
640 active_replicas.remove(&replica_id);
641 if active_replicas.is_empty() {
642 self.dropped_objects.remove(id);
643 }
644 }
645
646 sink_status_updates.push(make_update(*id, "sink"));
647 }
648
649 instance.drop_replica(replica_id);
650
651 if !self.read_only {
652 if !source_status_updates.is_empty() {
653 self.append_status_introspection_updates(
654 IntrospectionType::SourceStatusHistory,
655 source_status_updates,
656 );
657 }
658 if !sink_status_updates.is_empty() {
659 self.append_status_introspection_updates(
660 IntrospectionType::SinkStatusHistory,
661 sink_status_updates,
662 );
663 }
664 }
665 }
666
667 async fn evolve_nullability_for_bootstrap(
668 &mut self,
669 storage_metadata: &StorageMetadata,
670 collections: Vec<(GlobalId, RelationDesc)>,
671 ) -> Result<(), StorageError<Self::Timestamp>> {
672 let persist_client = self
673 .persist
674 .open(self.persist_location.clone())
675 .await
676 .unwrap();
677
678 for (global_id, relation_desc) in collections {
679 let shard_id = storage_metadata.get_collection_shard(global_id)?;
680 let diagnostics = Diagnostics {
681 shard_name: global_id.to_string(),
682 handle_purpose: "evolve nullability for bootstrap".to_string(),
683 };
684 let latest_schema = persist_client
685 .latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
686 .await
687 .expect("invalid persist usage");
688 let Some((schema_id, current_schema, _)) = latest_schema else {
689 tracing::debug!(?global_id, "no schema registered");
690 continue;
691 };
692 tracing::debug!(?global_id, ?current_schema, new_schema = ?relation_desc, "migrating schema");
693
694 let diagnostics = Diagnostics {
695 shard_name: global_id.to_string(),
696 handle_purpose: "evolve nullability for bootstrap".to_string(),
697 };
698 let evolve_result = persist_client
699 .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
700 shard_id,
701 schema_id,
702 &relation_desc,
703 &UnitSchema,
704 diagnostics,
705 )
706 .await
707 .expect("invalid persist usage");
708 match evolve_result {
709 CaESchema::Ok(_) => (),
710 CaESchema::ExpectedMismatch {
711 schema_id,
712 key,
713 val: _,
714 } => {
715 return Err(StorageError::PersistSchemaEvolveRace {
716 global_id,
717 shard_id,
718 schema_id,
719 relation_desc: key,
720 });
721 }
722 CaESchema::Incompatible => {
723 return Err(StorageError::PersistInvalidSchemaEvolve {
724 global_id,
725 shard_id,
726 });
727 }
728 };
729 }
730
731 Ok(())
732 }
733
734 #[instrument(name = "storage::create_collections")]
753 async fn create_collections_for_bootstrap(
754 &mut self,
755 storage_metadata: &StorageMetadata,
756 register_ts: Option<Self::Timestamp>,
757 mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
758 migrated_storage_collections: &BTreeSet<GlobalId>,
759 ) -> Result<(), StorageError<Self::Timestamp>> {
760 self.migrated_storage_collections
761 .extend(migrated_storage_collections.iter().cloned());
762
763 self.storage_collections
764 .create_collections_for_bootstrap(
765 storage_metadata,
766 register_ts.clone(),
767 collections.clone(),
768 migrated_storage_collections,
769 )
770 .await?;
771
772 drop(self.persist_warm_task.take());
775
776 collections.sort_by_key(|(id, _)| *id);
781 collections.dedup();
782 for pos in 1..collections.len() {
783 if collections[pos - 1].0 == collections[pos].0 {
784 return Err(StorageError::CollectionIdReused(collections[pos].0));
785 }
786 }
787
788 let enriched_with_metadata = collections
790 .into_iter()
791 .map(|(id, description)| {
792 let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
793
794 let get_shard = |id| -> Result<ShardId, StorageError<T>> {
795 let shard = storage_metadata.get_collection_shard::<T>(id)?;
796 Ok(shard)
797 };
798
799 let remap_shard = match &description.data_source {
800 DataSource::Ingestion(IngestionDescription {
802 remap_collection_id,
803 ..
804 }) => {
805 Some(get_shard(*remap_collection_id)?)
808 }
809 _ => None,
810 };
811
812 let txns_shard = description
815 .data_source
816 .in_txns()
817 .then(|| *self.txns_read.txns_id());
818
819 let metadata = CollectionMetadata {
820 persist_location: self.persist_location.clone(),
821 remap_shard,
822 data_shard,
823 relation_desc: description.desc.clone(),
824 txns_shard,
825 };
826
827 Ok((id, description, metadata))
828 })
829 .collect_vec();
830
831 let persist_client = self
833 .persist
834 .open(self.persist_location.clone())
835 .await
836 .unwrap();
837 let persist_client = &persist_client;
838
839 use futures::stream::{StreamExt, TryStreamExt};
842 let this = &*self;
843 let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
844 .map(|data: Result<_, StorageError<Self::Timestamp>>| {
845 async move {
846 let (id, description, metadata) = data?;
847
848 debug!(
851 "mapping GlobalId={} to remap shard ({:?}), data shard ({})",
852 id, metadata.remap_shard, metadata.data_shard
853 );
854
855 let write = this
856 .open_data_handles(
857 &id,
858 metadata.data_shard,
859 metadata.relation_desc.clone(),
860 persist_client,
861 )
862 .await;
863
864 Ok::<_, StorageError<T>>((id, description, write, metadata))
865 }
866 })
867 .buffer_unordered(50)
869 .try_collect()
882 .await?;
883
884 let mut to_execute = BTreeSet::new();
887 let mut new_collections = BTreeSet::new();
892 let mut table_registers = Vec::with_capacity(to_register.len());
893
894 to_register.sort_by_key(|(id, ..)| *id);
896
897 let (tables_to_register, collections_to_register): (Vec<_>, Vec<_>) = to_register
903 .into_iter()
904 .partition(|(_id, desc, ..)| matches!(desc.data_source, DataSource::Table { .. }));
905 let to_register = tables_to_register
906 .into_iter()
907 .rev()
908 .chain(collections_to_register.into_iter());
909
910 let mut new_source_statistic_entries = BTreeSet::new();
914 let mut new_webhook_statistic_entries = BTreeSet::new();
915 let mut new_sink_statistic_entries = BTreeSet::new();
916
917 for (id, description, write, metadata) in to_register {
918 let is_in_txns = |id, metadata: &CollectionMetadata| {
919 metadata.txns_shard.is_some()
920 && !(self.read_only && migrated_storage_collections.contains(&id))
921 };
922
923 let mut data_source = description.data_source;
924
925 to_execute.insert(id);
926 new_collections.insert(id);
927
928 if let DataSource::Ingestion(ingestion) = &mut data_source {
933 let export = ingestion.desc.primary_source_export();
934 ingestion.source_exports.insert(id, export);
935 }
936
937 let write_frontier = write.upper();
938
939 let storage_dependencies = self.determine_collection_dependencies(id, &data_source)?;
941
942 let dependency_read_holds = self
943 .storage_collections
944 .acquire_read_holds(storage_dependencies)
945 .expect("can acquire read holds");
946
947 let mut dependency_since = Antichain::from_elem(T::minimum());
948 for read_hold in dependency_read_holds.iter() {
949 dependency_since.join_assign(read_hold.since());
950 }
951
952 if !dependency_read_holds.is_empty()
961 && !is_in_txns(id, &metadata)
962 && !matches!(&data_source, DataSource::Sink { .. })
963 {
964 if dependency_since.is_empty() {
970 halt!(
971 "dependency since frontier is empty while dependent upper \
972 is not empty (dependent id={id}, write_frontier={:?}, dependency_read_holds={:?}), \
973 this indicates concurrent deletion of a collection",
974 write_frontier,
975 dependency_read_holds,
976 );
977 }
978
979 mz_ore::soft_assert_or_log!(
997 write_frontier.elements() == &[T::minimum()]
998 || write_frontier.is_empty()
999 || PartialOrder::less_than(&dependency_since, write_frontier),
1000 "dependency since has advanced past dependent ({id}) upper \n
1001 dependent ({id}): upper {:?} \n
1002 dependency since {:?} \n
1003 dependency read holds: {:?}",
1004 write_frontier,
1005 dependency_since,
1006 dependency_read_holds,
1007 );
1008 }
1009
1010 let mut extra_state = CollectionStateExtra::None;
1012 let mut maybe_instance_id = None;
1013 match &data_source {
1014 DataSource::Introspection(typ) => {
1015 debug!(
1016 ?data_source, meta = ?metadata,
1017 "registering {id} with persist monotonic worker",
1018 );
1019 self.register_introspection_collection(
1025 id,
1026 *typ,
1027 write,
1028 persist_client.clone(),
1029 )?;
1030 }
1031 DataSource::Webhook => {
1032 debug!(
1033 ?data_source, meta = ?metadata,
1034 "registering {id} with persist monotonic worker",
1035 );
1036 new_source_statistic_entries.insert(id);
1037 new_webhook_statistic_entries.insert(id);
1040 self.collection_manager
1046 .register_append_only_collection(id, write, false, None);
1047 }
1048 DataSource::IngestionExport {
1049 ingestion_id,
1050 details,
1051 data_config,
1052 } => {
1053 debug!(
1054 ?data_source, meta = ?metadata,
1055 "not registering {id} with a controller persist worker",
1056 );
1057 let ingestion_state = self
1059 .collections
1060 .get_mut(ingestion_id)
1061 .expect("known to exist");
1062
1063 let instance_id = match &mut ingestion_state.data_source {
1064 DataSource::Ingestion(ingestion_desc) => {
1065 ingestion_desc.source_exports.insert(
1066 id,
1067 SourceExport {
1068 storage_metadata: (),
1069 details: details.clone(),
1070 data_config: data_config.clone(),
1071 },
1072 );
1073
1074 ingestion_desc.instance_id
1079 }
1080 _ => unreachable!(
1081 "SourceExport must only refer to primary sources that already exist"
1082 ),
1083 };
1084
1085 to_execute.remove(&id);
1087 to_execute.insert(*ingestion_id);
1088
1089 let ingestion_state = IngestionState {
1090 read_capabilities: MutableAntichain::from(dependency_since.clone()),
1091 dependency_read_holds,
1092 derived_since: dependency_since,
1093 write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1094 hold_policy: ReadPolicy::step_back(),
1095 instance_id,
1096 hydrated_on: BTreeSet::new(),
1097 };
1098
1099 extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1100 maybe_instance_id = Some(instance_id);
1101
1102 new_source_statistic_entries.insert(id);
1103 }
1104 DataSource::Table { .. } => {
1105 debug!(
1106 ?data_source, meta = ?metadata,
1107 "registering {id} with persist table worker",
1108 );
1109 table_registers.push((id, write));
1110 }
1111 DataSource::Progress | DataSource::Other => {
1112 debug!(
1113 ?data_source, meta = ?metadata,
1114 "not registering {id} with a controller persist worker",
1115 );
1116 }
1117 DataSource::Ingestion(ingestion_desc) => {
1118 debug!(
1119 ?data_source, meta = ?metadata,
1120 "not registering {id} with a controller persist worker",
1121 );
1122
1123 let mut dependency_since = Antichain::from_elem(T::minimum());
1124 for read_hold in dependency_read_holds.iter() {
1125 dependency_since.join_assign(read_hold.since());
1126 }
1127
1128 let ingestion_state = IngestionState {
1129 read_capabilities: MutableAntichain::from(dependency_since.clone()),
1130 dependency_read_holds,
1131 derived_since: dependency_since,
1132 write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1133 hold_policy: ReadPolicy::step_back(),
1134 instance_id: ingestion_desc.instance_id,
1135 hydrated_on: BTreeSet::new(),
1136 };
1137
1138 extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1139 maybe_instance_id = Some(ingestion_desc.instance_id);
1140
1141 new_source_statistic_entries.insert(id);
1142 }
1143 DataSource::Sink { desc } => {
1144 let mut dependency_since = Antichain::from_elem(T::minimum());
1145 for read_hold in dependency_read_holds.iter() {
1146 dependency_since.join_assign(read_hold.since());
1147 }
1148
1149 let [self_hold, read_hold] =
1150 dependency_read_holds.try_into().expect("two holds");
1151
1152 let state = ExportState::new(
1153 desc.instance_id,
1154 read_hold,
1155 self_hold,
1156 write_frontier.clone(),
1157 ReadPolicy::step_back(),
1158 );
1159 maybe_instance_id = Some(state.cluster_id);
1160 extra_state = CollectionStateExtra::Export(state);
1161
1162 new_sink_statistic_entries.insert(id);
1163 }
1164 }
1165
1166 let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(id, maybe_instance_id);
1167 let collection_state =
1168 CollectionState::new(data_source, metadata, extra_state, wallclock_lag_metrics);
1169
1170 self.collections.insert(id, collection_state);
1171 }
1172
1173 {
1174 let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1175
1176 for id in new_webhook_statistic_entries {
1179 source_statistics.webhook_statistics.entry(id).or_default();
1180 }
1181
1182 }
1186
1187 if !table_registers.is_empty() {
1189 let register_ts = register_ts
1190 .expect("caller should have provided a register_ts when creating a table");
1191
1192 if self.read_only {
1193 table_registers
1203 .retain(|(id, _write_handle)| migrated_storage_collections.contains(id));
1204
1205 self.persist_table_worker
1206 .register(register_ts, table_registers)
1207 .await
1208 .expect("table worker unexpectedly shut down");
1209 } else {
1210 self.persist_table_worker
1211 .register(register_ts, table_registers)
1212 .await
1213 .expect("table worker unexpectedly shut down");
1214 }
1215 }
1216
1217 self.append_shard_mappings(new_collections.into_iter(), Diff::ONE);
1218
1219 for id in to_execute {
1221 match &self.collection(id)?.data_source {
1222 DataSource::Ingestion(ingestion) => {
1223 if !self.read_only
1224 || (ENABLE_0DT_DEPLOYMENT_SOURCES.get(self.config.config_set())
1225 && ingestion.desc.connection.supports_read_only())
1226 {
1227 self.run_ingestion(id)?;
1228 }
1229 }
1230 DataSource::IngestionExport { .. } => unreachable!(
1231 "ingestion exports do not execute directly, but instead schedule their source to be re-executed"
1232 ),
1233 DataSource::Introspection(_)
1234 | DataSource::Webhook
1235 | DataSource::Table { .. }
1236 | DataSource::Progress
1237 | DataSource::Other => {}
1238 DataSource::Sink { .. } => {
1239 if !self.read_only {
1240 self.run_export(id)?;
1241 }
1242 }
1243 };
1244 }
1245
1246 Ok(())
1247 }
1248
1249 fn check_alter_ingestion_source_desc(
1250 &mut self,
1251 ingestion_id: GlobalId,
1252 source_desc: &SourceDesc,
1253 ) -> Result<(), StorageError<Self::Timestamp>> {
1254 let source_collection = self.collection(ingestion_id)?;
1255 let data_source = &source_collection.data_source;
1256 match &data_source {
1257 DataSource::Ingestion(cur_ingestion) => {
1258 cur_ingestion
1259 .desc
1260 .alter_compatible(ingestion_id, source_desc)?;
1261 }
1262 o => {
1263 tracing::info!(
1264 "{ingestion_id} inalterable because its data source is {:?} and not an ingestion",
1265 o
1266 );
1267 Err(AlterError { id: ingestion_id })?
1268 }
1269 }
1270
1271 Ok(())
1272 }
1273
1274 async fn alter_ingestion_connections(
1275 &mut self,
1276 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
1277 ) -> Result<(), StorageError<Self::Timestamp>> {
1278 self.storage_collections
1280 .alter_ingestion_connections(source_connections.clone())
1281 .await?;
1282
1283 let mut ingestions_to_run = BTreeSet::new();
1284
1285 for (id, conn) in source_connections {
1286 let collection = self
1287 .collections
1288 .get_mut(&id)
1289 .ok_or_else(|| StorageError::IdentifierMissing(id))?;
1290
1291 match &mut collection.data_source {
1292 DataSource::Ingestion(ingestion) => {
1293 if ingestion.desc.connection != conn {
1296 tracing::info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
1297 ingestion.desc.connection = conn;
1298 ingestions_to_run.insert(id);
1299 } else {
1300 tracing::warn!(
1301 "update_source_connection called on {id} but the \
1302 connection was the same"
1303 );
1304 }
1305 }
1306 o => {
1307 tracing::warn!("update_source_connection called on {:?}", o);
1308 Err(StorageError::IdentifierInvalid(id))?;
1309 }
1310 }
1311 }
1312
1313 for id in ingestions_to_run {
1314 self.run_ingestion(id)?;
1315 }
1316 Ok(())
1317 }
1318
1319 async fn alter_ingestion_export_data_configs(
1320 &mut self,
1321 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
1322 ) -> Result<(), StorageError<Self::Timestamp>> {
1323 self.storage_collections
1325 .alter_ingestion_export_data_configs(source_exports.clone())
1326 .await?;
1327
1328 let mut ingestions_to_run = BTreeSet::new();
1329
1330 for (source_export_id, new_data_config) in source_exports {
1331 let source_export_collection = self
1334 .collections
1335 .get_mut(&source_export_id)
1336 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1337 let ingestion_id = match &mut source_export_collection.data_source {
1338 DataSource::IngestionExport {
1339 ingestion_id,
1340 details: _,
1341 data_config,
1342 } => {
1343 *data_config = new_data_config.clone();
1344 *ingestion_id
1345 }
1346 o => {
1347 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1348 Err(StorageError::IdentifierInvalid(source_export_id))?
1349 }
1350 };
1351 let ingestion_collection = self
1354 .collections
1355 .get_mut(&ingestion_id)
1356 .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
1357
1358 match &mut ingestion_collection.data_source {
1359 DataSource::Ingestion(ingestion_desc) => {
1360 let source_export = ingestion_desc
1361 .source_exports
1362 .get_mut(&source_export_id)
1363 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1364
1365 if source_export.data_config != new_data_config {
1368 tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
1369 source_export.data_config = new_data_config;
1370
1371 ingestions_to_run.insert(ingestion_id);
1372 } else {
1373 tracing::warn!(
1374 "alter_ingestion_export_data_configs called on \
1375 export {source_export_id} of {ingestion_id} but \
1376 the data config was the same"
1377 );
1378 }
1379 }
1380 o => {
1381 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1382 Err(StorageError::IdentifierInvalid(ingestion_id))?
1383 }
1384 }
1385 }
1386
1387 for id in ingestions_to_run {
1388 self.run_ingestion(id)?;
1389 }
1390 Ok(())
1391 }
1392
1393 async fn alter_table_desc(
1394 &mut self,
1395 existing_collection: GlobalId,
1396 new_collection: GlobalId,
1397 new_desc: RelationDesc,
1398 expected_version: RelationVersion,
1399 register_ts: Self::Timestamp,
1400 ) -> Result<(), StorageError<Self::Timestamp>> {
1401 let data_shard = {
1402 let Controller {
1403 collections,
1404 storage_collections,
1405 ..
1406 } = self;
1407
1408 let existing = collections
1409 .get(&existing_collection)
1410 .ok_or(StorageError::IdentifierMissing(existing_collection))?;
1411 if !matches!(existing.data_source, DataSource::Table { .. }) {
1412 return Err(StorageError::IdentifierInvalid(existing_collection));
1413 }
1414
1415 storage_collections
1417 .alter_table_desc(
1418 existing_collection,
1419 new_collection,
1420 new_desc.clone(),
1421 expected_version,
1422 )
1423 .await?;
1424
1425 existing.collection_metadata.data_shard.clone()
1426 };
1427
1428 let persist_client = self
1429 .persist
1430 .open(self.persist_location.clone())
1431 .await
1432 .expect("invalid persist location");
1433 let write_handle = self
1434 .open_data_handles(
1435 &existing_collection,
1436 data_shard,
1437 new_desc.clone(),
1438 &persist_client,
1439 )
1440 .await;
1441
1442 let collection_desc = CollectionDescription::<T>::for_table(new_desc.clone(), None);
1444 let collection_meta = CollectionMetadata {
1445 persist_location: self.persist_location.clone(),
1446 data_shard,
1447 relation_desc: new_desc.clone(),
1448 remap_shard: None,
1450 txns_shard: Some(self.txns_read.txns_id().clone()),
1451 };
1452 let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(new_collection, None);
1454 let collection_state = CollectionState::new(
1455 collection_desc.data_source.clone(),
1456 collection_meta,
1457 CollectionStateExtra::None,
1458 wallclock_lag_metrics,
1459 );
1460
1461 self.collections.insert(new_collection, collection_state);
1464 let existing = self
1465 .collections
1466 .get_mut(&existing_collection)
1467 .expect("missing existing collection");
1468 assert!(matches!(
1469 existing.data_source,
1470 DataSource::Table { primary: None }
1471 ));
1472 existing.data_source = DataSource::Table {
1473 primary: Some(new_collection),
1474 };
1475
1476 self.persist_table_worker
1477 .register(register_ts, vec![(new_collection, write_handle)])
1478 .await
1479 .expect("table worker unexpectedly shut down");
1480
1481 self.append_shard_mappings([new_collection].into_iter(), Diff::ONE);
1482
1483 Ok(())
1484 }
1485
1486 fn export(
1487 &self,
1488 id: GlobalId,
1489 ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1490 self.collections
1491 .get(&id)
1492 .and_then(|c| match &c.extra_state {
1493 CollectionStateExtra::Export(state) => Some(state),
1494 _ => None,
1495 })
1496 .ok_or(StorageError::IdentifierMissing(id))
1497 }
1498
1499 fn export_mut(
1500 &mut self,
1501 id: GlobalId,
1502 ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1503 self.collections
1504 .get_mut(&id)
1505 .and_then(|c| match &mut c.extra_state {
1506 CollectionStateExtra::Export(state) => Some(state),
1507 _ => None,
1508 })
1509 .ok_or(StorageError::IdentifierMissing(id))
1510 }
1511
1512 async fn create_oneshot_ingestion(
1514 &mut self,
1515 ingestion_id: uuid::Uuid,
1516 collection_id: GlobalId,
1517 instance_id: StorageInstanceId,
1518 request: OneshotIngestionRequest,
1519 result_tx: OneshotResultCallback<ProtoBatch>,
1520 ) -> Result<(), StorageError<Self::Timestamp>> {
1521 let collection_meta = self
1522 .collections
1523 .get(&collection_id)
1524 .ok_or_else(|| StorageError::IdentifierMissing(collection_id))?
1525 .collection_metadata
1526 .clone();
1527 let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1528 StorageError::Generic(anyhow::anyhow!("missing cluster {instance_id}"))
1530 })?;
1531 let oneshot_cmd = RunOneshotIngestion {
1532 ingestion_id,
1533 collection_id,
1534 collection_meta,
1535 request,
1536 };
1537
1538 if !self.read_only {
1539 instance.send(StorageCommand::RunOneshotIngestion(Box::new(oneshot_cmd)));
1540 let pending = PendingOneshotIngestion {
1541 result_tx,
1542 cluster_id: instance_id,
1543 };
1544 let novel = self
1545 .pending_oneshot_ingestions
1546 .insert(ingestion_id, pending);
1547 assert_none!(novel);
1548 Ok(())
1549 } else {
1550 Err(StorageError::ReadOnly)
1551 }
1552 }
1553
1554 fn cancel_oneshot_ingestion(
1555 &mut self,
1556 ingestion_id: uuid::Uuid,
1557 ) -> Result<(), StorageError<Self::Timestamp>> {
1558 if self.read_only {
1559 return Err(StorageError::ReadOnly);
1560 }
1561
1562 let pending = self
1563 .pending_oneshot_ingestions
1564 .remove(&ingestion_id)
1565 .ok_or_else(|| {
1566 StorageError::Generic(anyhow::anyhow!("missing oneshot ingestion {ingestion_id}"))
1568 })?;
1569
1570 match self.instances.get_mut(&pending.cluster_id) {
1571 Some(instance) => {
1572 instance.send(StorageCommand::CancelOneshotIngestion(ingestion_id));
1573 }
1574 None => {
1575 mz_ore::soft_panic_or_log!(
1576 "canceling oneshot ingestion on non-existent cluster, ingestion {:?}, instance {}",
1577 ingestion_id,
1578 pending.cluster_id,
1579 );
1580 }
1581 }
1582 pending.cancel();
1584
1585 Ok(())
1586 }
1587
1588 async fn alter_export(
1589 &mut self,
1590 id: GlobalId,
1591 new_description: ExportDescription<Self::Timestamp>,
1592 ) -> Result<(), StorageError<Self::Timestamp>> {
1593 let from_id = new_description.sink.from;
1594
1595 let desired_read_holds = vec![from_id.clone(), id.clone()];
1598 let [input_hold, self_hold] = self
1599 .storage_collections
1600 .acquire_read_holds(desired_read_holds)
1601 .expect("missing dependency")
1602 .try_into()
1603 .expect("expected number of holds");
1604 let from_storage_metadata = self.storage_collections.collection_metadata(from_id)?;
1605 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1606
1607 let cur_export = self.export_mut(id)?;
1609 let input_readable = cur_export
1610 .write_frontier
1611 .iter()
1612 .all(|t| input_hold.since().less_than(t));
1613 if !input_readable {
1614 return Err(StorageError::ReadBeforeSince(from_id));
1615 }
1616
1617 let new_export = ExportState {
1618 read_capabilities: cur_export.read_capabilities.clone(),
1619 cluster_id: new_description.instance_id,
1620 derived_since: cur_export.derived_since.clone(),
1621 read_holds: [input_hold, self_hold],
1622 read_policy: cur_export.read_policy.clone(),
1623 write_frontier: cur_export.write_frontier.clone(),
1624 };
1625 *cur_export = new_export;
1626
1627 let cmd = RunSinkCommand {
1628 id,
1629 description: StorageSinkDesc {
1630 from: from_id,
1631 from_desc: new_description.sink.from_desc,
1632 connection: new_description.sink.connection,
1633 envelope: new_description.sink.envelope,
1634 as_of: new_description.sink.as_of,
1635 version: new_description.sink.version,
1636 from_storage_metadata,
1637 with_snapshot: new_description.sink.with_snapshot,
1638 to_storage_metadata,
1639 },
1640 };
1641
1642 let instance = self
1644 .instances
1645 .get_mut(&new_description.instance_id)
1646 .ok_or_else(|| StorageError::ExportInstanceMissing {
1647 storage_instance_id: new_description.instance_id,
1648 export_id: id,
1649 })?;
1650
1651 instance.send(StorageCommand::RunSink(Box::new(cmd)));
1652 Ok(())
1653 }
1654
1655 async fn alter_export_connections(
1657 &mut self,
1658 exports: BTreeMap<GlobalId, StorageSinkConnection>,
1659 ) -> Result<(), StorageError<Self::Timestamp>> {
1660 let mut updates_by_instance =
1661 BTreeMap::<StorageInstanceId, Vec<(RunSinkCommand<T>, ExportDescription<T>)>>::new();
1662
1663 for (id, connection) in exports {
1664 let (mut new_export_description, as_of): (ExportDescription<Self::Timestamp>, _) = {
1672 let export = &self.collections[&id];
1673 let DataSource::Sink { desc } = &export.data_source else {
1674 panic!("export exists")
1675 };
1676 let CollectionStateExtra::Export(state) = &export.extra_state else {
1677 panic!("export exists")
1678 };
1679 let export_description = desc.clone();
1680 let as_of = state.input_hold().since().clone();
1681
1682 (export_description, as_of)
1683 };
1684 let current_sink = new_export_description.sink.clone();
1685
1686 new_export_description.sink.connection = connection;
1687
1688 current_sink.alter_compatible(id, &new_export_description.sink)?;
1690
1691 let from_storage_metadata = self
1692 .storage_collections
1693 .collection_metadata(new_export_description.sink.from)?;
1694 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1695
1696 let cmd = RunSinkCommand {
1697 id,
1698 description: StorageSinkDesc {
1699 from: new_export_description.sink.from,
1700 from_desc: new_export_description.sink.from_desc.clone(),
1701 connection: new_export_description.sink.connection.clone(),
1702 envelope: new_export_description.sink.envelope,
1703 with_snapshot: new_export_description.sink.with_snapshot,
1704 version: new_export_description.sink.version,
1705 as_of: as_of.to_owned(),
1716 from_storage_metadata,
1717 to_storage_metadata,
1718 },
1719 };
1720
1721 let update = updates_by_instance
1722 .entry(new_export_description.instance_id)
1723 .or_default();
1724 update.push((cmd, new_export_description));
1725 }
1726
1727 for (instance_id, updates) in updates_by_instance {
1728 let mut export_updates = BTreeMap::new();
1729 let mut cmds = Vec::with_capacity(updates.len());
1730
1731 for (cmd, export_state) in updates {
1732 export_updates.insert(cmd.id, export_state);
1733 cmds.push(cmd);
1734 }
1735
1736 let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1738 StorageError::ExportInstanceMissing {
1739 storage_instance_id: instance_id,
1740 export_id: *export_updates
1741 .keys()
1742 .next()
1743 .expect("set of exports not empty"),
1744 }
1745 })?;
1746
1747 for cmd in cmds {
1748 instance.send(StorageCommand::RunSink(Box::new(cmd)));
1749 }
1750
1751 for (id, new_export_description) in export_updates {
1753 let Some(state) = self.collections.get_mut(&id) else {
1754 panic!("export known to exist")
1755 };
1756 let DataSource::Sink { desc } = &mut state.data_source else {
1757 panic!("export known to exist")
1758 };
1759 *desc = new_export_description;
1760 }
1761 }
1762
1763 Ok(())
1764 }
1765
1766 fn drop_tables(
1781 &mut self,
1782 storage_metadata: &StorageMetadata,
1783 identifiers: Vec<GlobalId>,
1784 ts: Self::Timestamp,
1785 ) -> Result<(), StorageError<Self::Timestamp>> {
1786 let (table_write_ids, data_source_ids): (Vec<_>, Vec<_>) = identifiers
1788 .into_iter()
1789 .partition(|id| match self.collections[id].data_source {
1790 DataSource::Table { .. } => true,
1791 DataSource::IngestionExport { .. } | DataSource::Webhook => false,
1792 _ => panic!("identifier is not a table: {}", id),
1793 });
1794
1795 if table_write_ids.len() > 0 {
1797 let drop_notif = self
1798 .persist_table_worker
1799 .drop_handles(table_write_ids.clone(), ts);
1800 let tx = self.pending_table_handle_drops_tx.clone();
1801 mz_ore::task::spawn(|| "table-cleanup".to_string(), async move {
1802 drop_notif.await;
1803 for identifier in table_write_ids {
1804 let _ = tx.send(identifier);
1805 }
1806 });
1807 }
1808
1809 if data_source_ids.len() > 0 {
1811 self.validate_collection_ids(data_source_ids.iter().cloned())?;
1812 self.drop_sources_unvalidated(storage_metadata, data_source_ids)?;
1813 }
1814
1815 Ok(())
1816 }
1817
1818 fn drop_sources(
1819 &mut self,
1820 storage_metadata: &StorageMetadata,
1821 identifiers: Vec<GlobalId>,
1822 ) -> Result<(), StorageError<Self::Timestamp>> {
1823 self.validate_collection_ids(identifiers.iter().cloned())?;
1824 self.drop_sources_unvalidated(storage_metadata, identifiers)
1825 }
1826
1827 fn drop_sources_unvalidated(
1828 &mut self,
1829 storage_metadata: &StorageMetadata,
1830 ids: Vec<GlobalId>,
1831 ) -> Result<(), StorageError<Self::Timestamp>> {
1832 let mut ingestions_to_execute = BTreeSet::new();
1835 let mut ingestions_to_drop = BTreeSet::new();
1836 let mut source_statistics_to_drop = Vec::new();
1837
1838 let mut collections_to_drop = Vec::new();
1842
1843 for id in ids.iter() {
1844 let metadata = storage_metadata.get_collection_shard::<T>(*id);
1845 mz_ore::soft_assert_or_log!(
1846 matches!(metadata, Err(StorageError::IdentifierMissing(_))),
1847 "dropping {id}, but drop was not synchronized with storage \
1848 controller via `synchronize_collections`"
1849 );
1850
1851 let collection_state = self.collections.get(id);
1852
1853 if let Some(collection_state) = collection_state {
1854 match collection_state.data_source {
1855 DataSource::Webhook => {
1856 let fut = self.collection_manager.unregister_collection(*id);
1859 mz_ore::task::spawn(|| format!("storage-webhook-cleanup-{id}"), fut);
1860
1861 collections_to_drop.push(*id);
1862 source_statistics_to_drop.push(*id);
1863 }
1864 DataSource::Ingestion(_) => {
1865 ingestions_to_drop.insert(*id);
1866 source_statistics_to_drop.push(*id);
1867 }
1868 DataSource::IngestionExport { ingestion_id, .. } => {
1869 ingestions_to_execute.insert(ingestion_id);
1876
1877 let ingestion_state = match self.collections.get_mut(&ingestion_id) {
1879 Some(ingestion_collection) => ingestion_collection,
1880 None => {
1882 tracing::error!(
1883 "primary source {ingestion_id} seemingly dropped before subsource {id}"
1884 );
1885 continue;
1886 }
1887 };
1888
1889 match &mut ingestion_state.data_source {
1890 DataSource::Ingestion(ingestion_desc) => {
1891 let removed = ingestion_desc.source_exports.remove(id);
1892 mz_ore::soft_assert_or_log!(
1893 removed.is_some(),
1894 "dropped subsource {id} already removed from source exports"
1895 );
1896 }
1897 _ => unreachable!(
1898 "SourceExport must only refer to primary sources that already exist"
1899 ),
1900 };
1901
1902 ingestions_to_drop.insert(*id);
1906 source_statistics_to_drop.push(*id);
1907 }
1908 DataSource::Progress | DataSource::Table { .. } | DataSource::Other => {
1909 collections_to_drop.push(*id);
1910 }
1911 DataSource::Introspection(_) | DataSource::Sink { .. } => {
1912 soft_panic_or_log!(
1915 "drop_sources called on a {:?} (id={id}))",
1916 collection_state.data_source,
1917 );
1918 }
1919 }
1920 }
1921 }
1922
1923 ingestions_to_execute.retain(|id| !ingestions_to_drop.contains(id));
1925 for ingestion_id in ingestions_to_execute {
1926 self.run_ingestion(ingestion_id)?;
1927 }
1928
1929 let ingestion_policies = ingestions_to_drop
1936 .iter()
1937 .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
1938 .collect();
1939
1940 tracing::debug!(
1941 ?ingestion_policies,
1942 "dropping sources by setting read hold policies"
1943 );
1944 self.set_hold_policies(ingestion_policies);
1945
1946 let shards_to_update: BTreeSet<_> = ingestions_to_drop
1948 .iter()
1949 .chain(collections_to_drop.iter())
1950 .cloned()
1951 .collect();
1952 self.append_shard_mappings(shards_to_update.into_iter(), Diff::MINUS_ONE);
1953
1954 let status_now = mz_ore::now::to_datetime((self.now)());
1955 let mut status_updates = vec![];
1956 for id in ingestions_to_drop.iter() {
1957 status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
1958 }
1959
1960 if !self.read_only {
1961 self.append_status_introspection_updates(
1962 IntrospectionType::SourceStatusHistory,
1963 status_updates,
1964 );
1965 }
1966
1967 {
1968 let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1969 for id in source_statistics_to_drop {
1970 source_statistics
1971 .source_statistics
1972 .retain(|(stats_id, _), _| stats_id != &id);
1973 source_statistics
1974 .webhook_statistics
1975 .retain(|stats_id, _| stats_id != &id);
1976 }
1977 }
1978
1979 for id in ingestions_to_drop.iter().chain(collections_to_drop.iter()) {
1981 tracing::info!(%id, "dropping collection state");
1982 let collection = self
1983 .collections
1984 .remove(id)
1985 .expect("list populated after checking that self.collections contains it");
1986
1987 let instance = match &collection.extra_state {
1988 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
1989 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
1990 CollectionStateExtra::None => None,
1991 }
1992 .and_then(|i| self.instances.get(&i));
1993
1994 if let Some(instance) = instance {
1998 let active_replicas = instance.get_active_replicas_for_object(id);
1999 if !active_replicas.is_empty() {
2000 match &collection.data_source {
2007 DataSource::Ingestion(ingestion_desc) => {
2008 self.dropped_objects.insert(
2009 ingestion_desc.remap_collection_id,
2010 active_replicas.clone(),
2011 );
2012 }
2013 _ => {}
2014 }
2015
2016 self.dropped_objects.insert(*id, active_replicas);
2017 }
2018 }
2019 }
2020
2021 self.storage_collections
2023 .drop_collections_unvalidated(storage_metadata, ids);
2024
2025 Ok(())
2026 }
2027
2028 fn drop_sinks(
2030 &mut self,
2031 storage_metadata: &StorageMetadata,
2032 identifiers: Vec<GlobalId>,
2033 ) -> Result<(), StorageError<Self::Timestamp>> {
2034 self.validate_export_ids(identifiers.iter().cloned())?;
2035 self.drop_sinks_unvalidated(storage_metadata, identifiers);
2036 Ok(())
2037 }
2038
2039 fn drop_sinks_unvalidated(
2040 &mut self,
2041 storage_metadata: &StorageMetadata,
2042 mut sinks_to_drop: Vec<GlobalId>,
2043 ) {
2044 sinks_to_drop.retain(|id| self.export(*id).is_ok());
2046
2047 let drop_policy = sinks_to_drop
2054 .iter()
2055 .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
2056 .collect();
2057
2058 tracing::debug!(
2059 ?drop_policy,
2060 "dropping sources by setting read hold policies"
2061 );
2062 self.set_hold_policies(drop_policy);
2063
2064 let status_now = mz_ore::now::to_datetime((self.now)());
2071
2072 let mut status_updates = vec![];
2074 {
2075 let mut sink_statistics = self.sink_statistics.lock().expect("poisoned");
2076 for id in sinks_to_drop.iter() {
2077 status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
2078 sink_statistics.retain(|(stats_id, _), _| stats_id != id);
2079 }
2080 }
2081
2082 if !self.read_only {
2083 self.append_status_introspection_updates(
2084 IntrospectionType::SinkStatusHistory,
2085 status_updates,
2086 );
2087 }
2088
2089 for id in sinks_to_drop.iter() {
2091 tracing::info!(%id, "dropping export state");
2092 let collection = self
2093 .collections
2094 .remove(id)
2095 .expect("list populated after checking that self.collections contains it");
2096
2097 let instance = match &collection.extra_state {
2098 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
2099 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
2100 CollectionStateExtra::None => None,
2101 }
2102 .and_then(|i| self.instances.get(&i));
2103
2104 if let Some(instance) = instance {
2108 let active_replicas = instance.get_active_replicas_for_object(id);
2109 if !active_replicas.is_empty() {
2110 self.dropped_objects.insert(*id, active_replicas);
2111 }
2112 }
2113 }
2114
2115 self.storage_collections
2117 .drop_collections_unvalidated(storage_metadata, sinks_to_drop);
2118 }
2119
2120 #[instrument(level = "debug")]
2121 fn append_table(
2122 &mut self,
2123 write_ts: Self::Timestamp,
2124 advance_to: Self::Timestamp,
2125 commands: Vec<(GlobalId, Vec<TableData>)>,
2126 ) -> Result<
2127 tokio::sync::oneshot::Receiver<Result<(), StorageError<Self::Timestamp>>>,
2128 StorageError<Self::Timestamp>,
2129 > {
2130 if self.read_only {
2131 if !commands
2134 .iter()
2135 .all(|(id, _)| id.is_system() && self.migrated_storage_collections.contains(id))
2136 {
2137 return Err(StorageError::ReadOnly);
2138 }
2139 }
2140
2141 for (id, updates) in commands.iter() {
2143 if !updates.is_empty() {
2144 if !write_ts.less_than(&advance_to) {
2145 return Err(StorageError::UpdateBeyondUpper(*id));
2146 }
2147 }
2148 }
2149
2150 Ok(self
2151 .persist_table_worker
2152 .append(write_ts, advance_to, commands))
2153 }
2154
2155 fn monotonic_appender(
2156 &self,
2157 id: GlobalId,
2158 ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>> {
2159 self.collection_manager.monotonic_appender(id)
2160 }
2161
2162 fn webhook_statistics(
2163 &self,
2164 id: GlobalId,
2165 ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>> {
2166 let source_statistics = self.source_statistics.lock().expect("poisoned");
2168 source_statistics
2169 .webhook_statistics
2170 .get(&id)
2171 .cloned()
2172 .ok_or(StorageError::IdentifierMissing(id))
2173 }
2174
2175 async fn ready(&mut self) {
2176 if self.maintenance_scheduled {
2177 return;
2178 }
2179
2180 if !self.pending_table_handle_drops_rx.is_empty() {
2181 return;
2182 }
2183
2184 tokio::select! {
2185 Some(m) = self.instance_response_rx.recv() => {
2186 self.stashed_responses.push(m);
2187 while let Ok(m) = self.instance_response_rx.try_recv() {
2188 self.stashed_responses.push(m);
2189 }
2190 }
2191 _ = self.maintenance_ticker.tick() => {
2192 self.maintenance_scheduled = true;
2193 },
2194 };
2195 }
2196
2197 #[instrument(level = "debug")]
2198 fn process(
2199 &mut self,
2200 storage_metadata: &StorageMetadata,
2201 ) -> Result<Option<Response<T>>, anyhow::Error> {
2202 if self.maintenance_scheduled {
2204 self.maintain();
2205 self.maintenance_scheduled = false;
2206 }
2207
2208 for instance in self.instances.values_mut() {
2209 instance.rehydrate_failed_replicas();
2210 }
2211
2212 let mut status_updates = vec![];
2213 let mut updated_frontiers = BTreeMap::new();
2214
2215 let stashed_responses = std::mem::take(&mut self.stashed_responses);
2217 for resp in stashed_responses {
2218 match resp {
2219 (_replica_id, StorageResponse::FrontierUpper(id, upper)) => {
2220 self.update_write_frontier(id, &upper);
2221 updated_frontiers.insert(id, upper);
2222 }
2223 (replica_id, StorageResponse::DroppedId(id)) => {
2224 let replica_id = replica_id.expect("DroppedId from unknown replica");
2225 if let Some(remaining_replicas) = self.dropped_objects.get_mut(&id) {
2226 remaining_replicas.remove(&replica_id);
2227 if remaining_replicas.is_empty() {
2228 self.dropped_objects.remove(&id);
2229 }
2230 } else {
2231 soft_panic_or_log!("unexpected DroppedId for {id}");
2232 }
2233 }
2234 (replica_id, StorageResponse::StatisticsUpdates(source_stats, sink_stats)) => {
2235 {
2237 let replica_id = if let Some(replica_id) = replica_id {
2244 replica_id
2245 } else {
2246 tracing::error!(
2247 ?source_stats,
2248 "missing replica_id for source statistics update"
2249 );
2250 continue;
2251 };
2252
2253 let mut shared_stats = self.source_statistics.lock().expect("poisoned");
2254
2255 for stat in source_stats {
2256 let collection_id = stat.id.clone();
2257
2258 if self.collection(collection_id).is_err() {
2259 continue;
2262 }
2263
2264 let entry = shared_stats
2265 .source_statistics
2266 .entry((stat.id, Some(replica_id)));
2267
2268 match entry {
2269 btree_map::Entry::Vacant(vacant_entry) => {
2270 let mut stats = ControllerSourceStatistics::new(
2271 collection_id,
2272 Some(replica_id),
2273 );
2274 stats.incorporate(stat);
2275 vacant_entry.insert(stats);
2276 }
2277 btree_map::Entry::Occupied(mut occupied_entry) => {
2278 occupied_entry.get_mut().incorporate(stat);
2279 }
2280 }
2281 }
2282 }
2283
2284 {
2285 let replica_id = if let Some(replica_id) = replica_id {
2296 replica_id
2297 } else {
2298 tracing::error!(
2299 ?sink_stats,
2300 "missing replica_id for sink statistics update"
2301 );
2302 continue;
2303 };
2304
2305 let mut shared_stats = self.sink_statistics.lock().expect("poisoned");
2306
2307 for stat in sink_stats {
2308 let collection_id = stat.id.clone();
2309
2310 if self.collection(collection_id).is_err() {
2311 continue;
2314 }
2315
2316 let entry = shared_stats.entry((stat.id, Some(replica_id)));
2317
2318 match entry {
2319 btree_map::Entry::Vacant(vacant_entry) => {
2320 let mut stats =
2321 ControllerSinkStatistics::new(collection_id, replica_id);
2322 stats.incorporate(stat);
2323 vacant_entry.insert(stats);
2324 }
2325 btree_map::Entry::Occupied(mut occupied_entry) => {
2326 occupied_entry.get_mut().incorporate(stat);
2327 }
2328 }
2329 }
2330 }
2331 }
2332 (replica_id, StorageResponse::StatusUpdate(mut status_update)) => {
2333 match status_update.status {
2349 Status::Running => {
2350 let collection = self.collections.get_mut(&status_update.id);
2351 match collection {
2352 Some(collection) => {
2353 match collection.extra_state {
2354 CollectionStateExtra::Ingestion(
2355 ref mut ingestion_state,
2356 ) => {
2357 if ingestion_state.hydrated_on.is_empty() {
2358 tracing::debug!(ingestion_id = %status_update.id, "ingestion is hydrated");
2359 }
2360 ingestion_state.hydrated_on.insert(replica_id.expect(
2361 "replica id should be present for status running",
2362 ));
2363 }
2364 CollectionStateExtra::Export(_) => {
2365 }
2367 CollectionStateExtra::None => {
2368 }
2370 }
2371 }
2372 None => (), }
2375 }
2376 Status::Paused => {
2377 let collection = self.collections.get_mut(&status_update.id);
2378 match collection {
2379 Some(collection) => {
2380 match collection.extra_state {
2381 CollectionStateExtra::Ingestion(
2382 ref mut ingestion_state,
2383 ) => {
2384 tracing::debug!(ingestion_id = %status_update.id, "ingestion is now paused");
2391 ingestion_state.hydrated_on.clear();
2392 }
2393 CollectionStateExtra::Export(_) => {
2394 }
2396 CollectionStateExtra::None => {
2397 }
2399 }
2400 }
2401 None => (), }
2404 }
2405 _ => (),
2406 }
2407
2408 if let Some(id) = replica_id {
2410 status_update.replica_id = Some(id);
2411 }
2412 status_updates.push(status_update);
2413 }
2414 (_replica_id, StorageResponse::StagedBatches(batches)) => {
2415 for (ingestion_id, batches) in batches {
2416 match self.pending_oneshot_ingestions.remove(&ingestion_id) {
2417 Some(pending) => {
2418 if let Some(instance) = self.instances.get_mut(&pending.cluster_id)
2421 {
2422 instance
2423 .send(StorageCommand::CancelOneshotIngestion(ingestion_id));
2424 }
2425 (pending.result_tx)(batches)
2427 }
2428 None => {
2429 }
2432 }
2433 }
2434 }
2435 }
2436 }
2437
2438 self.record_status_updates(status_updates);
2439
2440 let mut dropped_table_ids = Vec::new();
2442 while let Ok(dropped_id) = self.pending_table_handle_drops_rx.try_recv() {
2443 dropped_table_ids.push(dropped_id);
2444 }
2445 if !dropped_table_ids.is_empty() {
2446 self.drop_sources(storage_metadata, dropped_table_ids)?;
2447 }
2448
2449 if updated_frontiers.is_empty() {
2450 Ok(None)
2451 } else {
2452 Ok(Some(Response::FrontierUpdates(
2453 updated_frontiers.into_iter().collect(),
2454 )))
2455 }
2456 }
2457
2458 async fn inspect_persist_state(
2459 &self,
2460 id: GlobalId,
2461 ) -> Result<serde_json::Value, anyhow::Error> {
2462 let collection = &self.storage_collections.collection_metadata(id)?;
2463 let client = self
2464 .persist
2465 .open(collection.persist_location.clone())
2466 .await?;
2467 let shard_state = client
2468 .inspect_shard::<Self::Timestamp>(&collection.data_shard)
2469 .await?;
2470 let json_state = serde_json::to_value(shard_state)?;
2471 Ok(json_state)
2472 }
2473
2474 fn append_introspection_updates(
2475 &mut self,
2476 type_: IntrospectionType,
2477 updates: Vec<(Row, Diff)>,
2478 ) {
2479 let id = self.introspection_ids[&type_];
2480 let updates = updates.into_iter().map(|update| update.into()).collect();
2481 self.collection_manager.blind_write(id, updates);
2482 }
2483
2484 fn append_status_introspection_updates(
2485 &mut self,
2486 type_: IntrospectionType,
2487 updates: Vec<StatusUpdate>,
2488 ) {
2489 let id = self.introspection_ids[&type_];
2490 let updates: Vec<_> = updates.into_iter().map(|update| update.into()).collect();
2491 if !updates.is_empty() {
2492 self.collection_manager.blind_write(id, updates);
2493 }
2494 }
2495
2496 fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp) {
2497 let id = self.introspection_ids[&type_];
2498 self.collection_manager.differential_write(id, op);
2499 }
2500
2501 fn append_only_introspection_tx(
2502 &self,
2503 type_: IntrospectionType,
2504 ) -> mpsc::UnboundedSender<(
2505 Vec<AppendOnlyUpdate>,
2506 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2507 )> {
2508 let id = self.introspection_ids[&type_];
2509 self.collection_manager.append_only_write_sender(id)
2510 }
2511
2512 fn differential_introspection_tx(
2513 &self,
2514 type_: IntrospectionType,
2515 ) -> mpsc::UnboundedSender<(
2516 StorageWriteOp,
2517 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2518 )> {
2519 let id = self.introspection_ids[&type_];
2520 self.collection_manager.differential_write_sender(id)
2521 }
2522
2523 async fn real_time_recent_timestamp(
2524 &self,
2525 timestamp_objects: BTreeSet<GlobalId>,
2526 timeout: Duration,
2527 ) -> Result<
2528 BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>,
2529 StorageError<Self::Timestamp>,
2530 > {
2531 use mz_storage_types::sources::GenericSourceConnection;
2532
2533 let mut rtr_futures = BTreeMap::new();
2534
2535 for id in timestamp_objects.into_iter().filter(GlobalId::is_user) {
2537 let collection = match self.collection(id) {
2538 Ok(c) => c,
2539 Err(_) => continue,
2541 };
2542
2543 let (source_conn, remap_id) = match &collection.data_source {
2544 DataSource::Ingestion(IngestionDescription {
2545 desc: SourceDesc { connection, .. },
2546 remap_collection_id,
2547 ..
2548 }) => match connection {
2549 GenericSourceConnection::Kafka(_)
2550 | GenericSourceConnection::Postgres(_)
2551 | GenericSourceConnection::MySql(_)
2552 | GenericSourceConnection::SqlServer(_) => {
2553 (connection.clone(), *remap_collection_id)
2554 }
2555
2556 GenericSourceConnection::LoadGenerator(_) => continue,
2561 },
2562 _ => {
2564 continue;
2565 }
2566 };
2567
2568 let config = self.config().clone();
2570
2571 let read_handle = self.read_handle_for_snapshot(remap_id).await?;
2579
2580 let remap_read_hold = self
2583 .storage_collections
2584 .acquire_read_holds(vec![remap_id])
2585 .map_err(|_e| StorageError::ReadBeforeSince(remap_id))?
2586 .expect_element(|| "known to be exactly one");
2587
2588 let remap_as_of = remap_read_hold
2589 .since()
2590 .to_owned()
2591 .into_option()
2592 .ok_or(StorageError::ReadBeforeSince(remap_id))?;
2593
2594 rtr_futures.insert(
2595 id,
2596 tokio::time::timeout(timeout, async move {
2597 use mz_storage_types::sources::SourceConnection as _;
2598
2599 let as_of = Antichain::from_elem(remap_as_of);
2602 let remap_subscribe = read_handle
2603 .subscribe(as_of.clone())
2604 .await
2605 .map_err(|_| StorageError::ReadBeforeSince(remap_id))?;
2606
2607 tracing::debug!(?id, type_ = source_conn.name(), upstream = ?source_conn.external_reference(), "fetching real time recency");
2608
2609 let result = rtr::real_time_recency_ts(source_conn, id, config, as_of, remap_subscribe)
2610 .await.map_err(|e| {
2611 tracing::debug!(?id, "real time recency error: {:?}", e);
2612 e
2613 });
2614
2615 drop(remap_read_hold);
2617
2618 result
2619 }),
2620 );
2621 }
2622
2623 Ok(Box::pin(async move {
2624 let (ids, futs): (Vec<_>, Vec<_>) = rtr_futures.into_iter().unzip();
2625 ids.into_iter()
2626 .zip_eq(futures::future::join_all(futs).await)
2627 .try_fold(T::minimum(), |curr, (id, per_source_res)| {
2628 let new =
2629 per_source_res.map_err(|_e: Elapsed| StorageError::RtrTimeout(id))??;
2630 Ok::<_, StorageError<Self::Timestamp>>(std::cmp::max(curr, new))
2631 })
2632 }))
2633 }
2634}
2635
2636pub fn prepare_initialization<T>(txn: &mut dyn StorageTxn<T>) -> Result<(), StorageError<T>> {
2643 if txn.get_txn_wal_shard().is_none() {
2644 let txns_id = ShardId::new();
2645 txn.write_txn_wal_shard(txns_id)?;
2646 }
2647
2648 Ok(())
2649}
2650
2651impl<T> Controller<T>
2652where
2653 T: Timestamp
2654 + Lattice
2655 + TotalOrder
2656 + Codec64
2657 + From<EpochMillis>
2658 + TimestampManipulation
2659 + Into<Datum<'static>>,
2660 StorageCommand<T>: RustType<ProtoStorageCommand>,
2661 StorageResponse<T>: RustType<ProtoStorageResponse>,
2662 Self: StorageController<Timestamp = T>,
2663{
2664 pub async fn new(
2672 build_info: &'static BuildInfo,
2673 persist_location: PersistLocation,
2674 persist_clients: Arc<PersistClientCache>,
2675 now: NowFn,
2676 wallclock_lag: WallclockLagFn<T>,
2677 txns_metrics: Arc<TxnMetrics>,
2678 read_only: bool,
2679 metrics_registry: &MetricsRegistry,
2680 controller_metrics: ControllerMetrics,
2681 connection_context: ConnectionContext,
2682 txn: &dyn StorageTxn<T>,
2683 storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
2684 ) -> Self {
2685 let txns_client = persist_clients
2686 .open(persist_location.clone())
2687 .await
2688 .expect("location should be valid");
2689
2690 let persist_warm_task = warm_persist_state_in_background(
2691 txns_client.clone(),
2692 txn.get_collection_metadata().into_values(),
2693 );
2694 let persist_warm_task = Some(persist_warm_task.abort_on_drop());
2695
2696 let txns_id = txn
2700 .get_txn_wal_shard()
2701 .expect("must call prepare initialization before creating storage controller");
2702
2703 let persist_table_worker = if read_only {
2704 let txns_write = txns_client
2705 .open_writer(
2706 txns_id,
2707 Arc::new(TxnsCodecRow::desc()),
2708 Arc::new(UnitSchema),
2709 Diagnostics {
2710 shard_name: "txns".to_owned(),
2711 handle_purpose: "follow txns upper".to_owned(),
2712 },
2713 )
2714 .await
2715 .expect("txns schema shouldn't change");
2716 persist_handles::PersistTableWriteWorker::new_read_only_mode(txns_write)
2717 } else {
2718 let txns = TxnsHandle::open(
2719 T::minimum(),
2720 txns_client.clone(),
2721 txns_client.dyncfgs().clone(),
2722 Arc::clone(&txns_metrics),
2723 txns_id,
2724 )
2725 .await;
2726 persist_handles::PersistTableWriteWorker::new_txns(txns)
2727 };
2728 let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
2729
2730 let collection_manager = collection_mgmt::CollectionManager::new(read_only, now.clone());
2731
2732 let introspection_ids = BTreeMap::new();
2733 let introspection_tokens = Arc::new(Mutex::new(BTreeMap::new()));
2734
2735 let (statistics_interval_sender, _) =
2736 channel(mz_storage_types::parameters::STATISTICS_INTERVAL_DEFAULT);
2737
2738 let (pending_table_handle_drops_tx, pending_table_handle_drops_rx) =
2739 tokio::sync::mpsc::unbounded_channel();
2740
2741 let mut maintenance_ticker = tokio::time::interval(Duration::from_secs(1));
2742 maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2743
2744 let (instance_response_tx, instance_response_rx) = mpsc::unbounded_channel();
2745
2746 let metrics = StorageControllerMetrics::new(metrics_registry, controller_metrics);
2747
2748 let now_dt = mz_ore::now::to_datetime(now());
2749
2750 Self {
2751 build_info,
2752 collections: BTreeMap::default(),
2753 dropped_objects: Default::default(),
2754 persist_table_worker,
2755 txns_read,
2756 txns_metrics,
2757 stashed_responses: vec![],
2758 pending_table_handle_drops_tx,
2759 pending_table_handle_drops_rx,
2760 pending_oneshot_ingestions: BTreeMap::default(),
2761 collection_manager,
2762 introspection_ids,
2763 introspection_tokens,
2764 now,
2765 read_only,
2766 source_statistics: Arc::new(Mutex::new(statistics::SourceStatistics {
2767 source_statistics: BTreeMap::new(),
2768 webhook_statistics: BTreeMap::new(),
2769 })),
2770 sink_statistics: Arc::new(Mutex::new(BTreeMap::new())),
2771 statistics_interval_sender,
2772 instances: BTreeMap::new(),
2773 initialized: false,
2774 config: StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs()),
2775 persist_location,
2776 persist: persist_clients,
2777 metrics,
2778 recorded_frontiers: BTreeMap::new(),
2779 recorded_replica_frontiers: BTreeMap::new(),
2780 wallclock_lag,
2781 wallclock_lag_last_recorded: now_dt,
2782 storage_collections,
2783 migrated_storage_collections: BTreeSet::new(),
2784 maintenance_ticker,
2785 maintenance_scheduled: false,
2786 instance_response_rx,
2787 instance_response_tx,
2788 persist_warm_task,
2789 }
2790 }
2791
2792 #[instrument(level = "debug")]
2800 fn set_hold_policies(&mut self, policies: Vec<(GlobalId, ReadPolicy<T>)>) {
2801 let mut read_capability_changes = BTreeMap::default();
2802
2803 for (id, policy) in policies.into_iter() {
2804 if let Some(collection) = self.collections.get_mut(&id) {
2805 let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state
2806 {
2807 CollectionStateExtra::Ingestion(ingestion) => (
2808 ingestion.write_frontier.borrow(),
2809 &mut ingestion.derived_since,
2810 &mut ingestion.hold_policy,
2811 ),
2812 CollectionStateExtra::None => {
2813 unreachable!("set_hold_policies is only called for ingestions");
2814 }
2815 CollectionStateExtra::Export(export) => (
2816 export.write_frontier.borrow(),
2817 &mut export.derived_since,
2818 &mut export.read_policy,
2819 ),
2820 };
2821
2822 let new_derived_since = policy.frontier(write_frontier);
2823 let mut update = swap_updates(derived_since, new_derived_since);
2824 if !update.is_empty() {
2825 read_capability_changes.insert(id, update);
2826 }
2827
2828 *hold_policy = policy;
2829 }
2830 }
2831
2832 if !read_capability_changes.is_empty() {
2833 self.update_hold_capabilities(&mut read_capability_changes);
2834 }
2835 }
2836
2837 #[instrument(level = "debug", fields(updates))]
2838 fn update_write_frontier(&mut self, id: GlobalId, new_upper: &Antichain<T>) {
2839 let mut read_capability_changes = BTreeMap::default();
2840
2841 if let Some(collection) = self.collections.get_mut(&id) {
2842 let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state {
2843 CollectionStateExtra::Ingestion(ingestion) => (
2844 &mut ingestion.write_frontier,
2845 &mut ingestion.derived_since,
2846 &ingestion.hold_policy,
2847 ),
2848 CollectionStateExtra::None => {
2849 if matches!(collection.data_source, DataSource::Progress) {
2850 } else {
2852 tracing::error!(
2853 ?collection,
2854 ?new_upper,
2855 "updated write frontier for collection which is not an ingestion"
2856 );
2857 }
2858 return;
2859 }
2860 CollectionStateExtra::Export(export) => (
2861 &mut export.write_frontier,
2862 &mut export.derived_since,
2863 &export.read_policy,
2864 ),
2865 };
2866
2867 if PartialOrder::less_than(write_frontier, new_upper) {
2868 write_frontier.clone_from(new_upper);
2869 }
2870
2871 let new_derived_since = hold_policy.frontier(write_frontier.borrow());
2872 let mut update = swap_updates(derived_since, new_derived_since);
2873 if !update.is_empty() {
2874 read_capability_changes.insert(id, update);
2875 }
2876 } else if self.dropped_objects.contains_key(&id) {
2877 } else {
2880 soft_panic_or_log!("spurious upper update for {id}: {new_upper:?}");
2881 }
2882
2883 if !read_capability_changes.is_empty() {
2884 self.update_hold_capabilities(&mut read_capability_changes);
2885 }
2886 }
2887
2888 #[instrument(level = "debug", fields(updates))]
2892 fn update_hold_capabilities(&mut self, updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>) {
2893 let mut collections_net = BTreeMap::new();
2895
2896 while let Some(key) = updates.keys().rev().next().cloned() {
2901 let mut update = updates.remove(&key).unwrap();
2902
2903 if key.is_user() {
2904 debug!(id = %key, ?update, "update_hold_capability");
2905 }
2906
2907 if let Some(collection) = self.collections.get_mut(&key) {
2908 match &mut collection.extra_state {
2909 CollectionStateExtra::Ingestion(ingestion) => {
2910 let changes = ingestion.read_capabilities.update_iter(update.drain());
2911 update.extend(changes);
2912
2913 let (changes, frontier, _cluster_id) =
2914 collections_net.entry(key).or_insert_with(|| {
2915 (
2916 <ChangeBatch<_>>::new(),
2917 Antichain::new(),
2918 ingestion.instance_id,
2919 )
2920 });
2921
2922 changes.extend(update.drain());
2923 *frontier = ingestion.read_capabilities.frontier().to_owned();
2924 }
2925 CollectionStateExtra::None => {
2926 soft_panic_or_log!(
2928 "trying to update holds for collection {collection:?} which is not \
2929 an ingestion: {update:?}"
2930 );
2931 continue;
2932 }
2933 CollectionStateExtra::Export(export) => {
2934 let changes = export.read_capabilities.update_iter(update.drain());
2935 update.extend(changes);
2936
2937 let (changes, frontier, _cluster_id) =
2938 collections_net.entry(key).or_insert_with(|| {
2939 (<ChangeBatch<_>>::new(), Antichain::new(), export.cluster_id)
2940 });
2941
2942 changes.extend(update.drain());
2943 *frontier = export.read_capabilities.frontier().to_owned();
2944 }
2945 }
2946 } else {
2947 tracing::warn!(id = ?key, ?update, "update_hold_capabilities for unknown object");
2949 }
2950 }
2951
2952 for (key, (mut changes, frontier, cluster_id)) in collections_net {
2955 if !changes.is_empty() {
2956 if key.is_user() {
2957 debug!(id = %key, ?frontier, "downgrading ingestion read holds!");
2958 }
2959
2960 let collection = self
2961 .collections
2962 .get_mut(&key)
2963 .expect("missing collection state");
2964
2965 let read_holds = match &mut collection.extra_state {
2966 CollectionStateExtra::Ingestion(ingestion) => {
2967 ingestion.dependency_read_holds.as_mut_slice()
2968 }
2969 CollectionStateExtra::Export(export) => export.read_holds.as_mut_slice(),
2970 CollectionStateExtra::None => {
2971 soft_panic_or_log!(
2972 "trying to downgrade read holds for collection which is not an \
2973 ingestion: {collection:?}"
2974 );
2975 continue;
2976 }
2977 };
2978
2979 for read_hold in read_holds.iter_mut() {
2980 read_hold
2981 .try_downgrade(frontier.clone())
2982 .expect("we only advance the frontier");
2983 }
2984
2985 if let Some(instance) = self.instances.get_mut(&cluster_id) {
2987 instance.send(StorageCommand::AllowCompaction(key, frontier.clone()));
2988 } else {
2989 soft_panic_or_log!(
2990 "missing instance client for cluster {cluster_id} while we still have outstanding AllowCompaction command {frontier:?} for {key}"
2991 );
2992 }
2993 }
2994 }
2995 }
2996
2997 fn validate_collection_ids(
2999 &self,
3000 ids: impl Iterator<Item = GlobalId>,
3001 ) -> Result<(), StorageError<T>> {
3002 for id in ids {
3003 self.storage_collections.check_exists(id)?;
3004 }
3005 Ok(())
3006 }
3007
3008 fn validate_export_ids(
3010 &self,
3011 ids: impl Iterator<Item = GlobalId>,
3012 ) -> Result<(), StorageError<T>> {
3013 for id in ids {
3014 self.export(id)?;
3015 }
3016 Ok(())
3017 }
3018
3019 async fn open_data_handles(
3027 &self,
3028 id: &GlobalId,
3029 shard: ShardId,
3030 relation_desc: RelationDesc,
3031 persist_client: &PersistClient,
3032 ) -> WriteHandle<SourceData, (), T, StorageDiff> {
3033 let diagnostics = Diagnostics {
3034 shard_name: id.to_string(),
3035 handle_purpose: format!("controller data for {}", id),
3036 };
3037
3038 let mut write = persist_client
3039 .open_writer(
3040 shard,
3041 Arc::new(relation_desc),
3042 Arc::new(UnitSchema),
3043 diagnostics.clone(),
3044 )
3045 .await
3046 .expect("invalid persist usage");
3047
3048 write.fetch_recent_upper().await;
3057
3058 write
3059 }
3060
3061 fn register_introspection_collection(
3066 &mut self,
3067 id: GlobalId,
3068 introspection_type: IntrospectionType,
3069 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
3070 persist_client: PersistClient,
3071 ) -> Result<(), StorageError<T>> {
3072 tracing::info!(%id, ?introspection_type, "registering introspection collection");
3073
3074 let force_writable = self.read_only && self.migrated_storage_collections.contains(&id);
3078 if force_writable {
3079 assert!(id.is_system(), "unexpected non-system global id: {id:?}");
3080 info!("writing to migrated storage collection {id} in read-only mode");
3081 }
3082
3083 let prev = self.introspection_ids.insert(introspection_type, id);
3084 assert!(
3085 prev.is_none(),
3086 "cannot have multiple IDs for introspection type"
3087 );
3088
3089 let metadata = self.storage_collections.collection_metadata(id)?.clone();
3090
3091 let read_handle_fn = move || {
3092 let persist_client = persist_client.clone();
3093 let metadata = metadata.clone();
3094
3095 let fut = async move {
3096 let read_handle = persist_client
3097 .open_leased_reader::<SourceData, (), T, StorageDiff>(
3098 metadata.data_shard,
3099 Arc::new(metadata.relation_desc.clone()),
3100 Arc::new(UnitSchema),
3101 Diagnostics {
3102 shard_name: id.to_string(),
3103 handle_purpose: format!("snapshot {}", id),
3104 },
3105 USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3106 )
3107 .await
3108 .expect("invalid persist usage");
3109 read_handle
3110 };
3111
3112 fut.boxed()
3113 };
3114
3115 let recent_upper = write_handle.shared_upper();
3116
3117 match CollectionManagerKind::from(&introspection_type) {
3118 CollectionManagerKind::Differential => {
3123 let statistics_retention_duration =
3124 dyncfgs::STATISTICS_RETENTION_DURATION.get(self.config().config_set());
3125
3126 let introspection_config = DifferentialIntrospectionConfig {
3128 recent_upper,
3129 introspection_type,
3130 storage_collections: Arc::clone(&self.storage_collections),
3131 collection_manager: self.collection_manager.clone(),
3132 source_statistics: Arc::clone(&self.source_statistics),
3133 sink_statistics: Arc::clone(&self.sink_statistics),
3134 statistics_interval: self.config.parameters.statistics_interval.clone(),
3135 statistics_interval_receiver: self.statistics_interval_sender.subscribe(),
3136 statistics_retention_duration,
3137 metrics: self.metrics.clone(),
3138 introspection_tokens: Arc::clone(&self.introspection_tokens),
3139 };
3140 self.collection_manager.register_differential_collection(
3141 id,
3142 write_handle,
3143 read_handle_fn,
3144 force_writable,
3145 introspection_config,
3146 );
3147 }
3148 CollectionManagerKind::AppendOnly => {
3156 let introspection_config = AppendOnlyIntrospectionConfig {
3157 introspection_type,
3158 config_set: Arc::clone(self.config.config_set()),
3159 parameters: self.config.parameters.clone(),
3160 storage_collections: Arc::clone(&self.storage_collections),
3161 };
3162 self.collection_manager.register_append_only_collection(
3163 id,
3164 write_handle,
3165 force_writable,
3166 Some(introspection_config),
3167 );
3168 }
3169 }
3170
3171 Ok(())
3172 }
3173
3174 fn reconcile_dangling_statistics(&self) {
3177 self.source_statistics
3178 .lock()
3179 .expect("poisoned")
3180 .source_statistics
3181 .retain(|(k, _replica_id), _| self.storage_collections.check_exists(*k).is_ok());
3183 self.sink_statistics
3184 .lock()
3185 .expect("poisoned")
3186 .retain(|(k, _replica_id), _| self.export(*k).is_ok());
3187 }
3188
3189 #[instrument(level = "debug")]
3199 fn append_shard_mappings<I>(&self, global_ids: I, diff: Diff)
3200 where
3201 I: Iterator<Item = GlobalId>,
3202 {
3203 mz_ore::soft_assert_or_log!(
3204 diff == Diff::MINUS_ONE || diff == Diff::ONE,
3205 "use 1 for insert or -1 for delete"
3206 );
3207
3208 let id = *self
3209 .introspection_ids
3210 .get(&IntrospectionType::ShardMapping)
3211 .expect("should be registered before this call");
3212
3213 let mut updates = vec![];
3214 let mut row_buf = Row::default();
3216
3217 for global_id in global_ids {
3218 let shard_id = if let Some(collection) = self.collections.get(&global_id) {
3219 collection.collection_metadata.data_shard.clone()
3220 } else {
3221 panic!("unknown global id: {}", global_id);
3222 };
3223
3224 let mut packer = row_buf.packer();
3225 packer.push(Datum::from(global_id.to_string().as_str()));
3226 packer.push(Datum::from(shard_id.to_string().as_str()));
3227 updates.push((row_buf.clone(), diff));
3228 }
3229
3230 self.collection_manager.differential_append(id, updates);
3231 }
3232
3233 fn determine_collection_dependencies(
3235 &self,
3236 self_id: GlobalId,
3237 data_source: &DataSource<T>,
3238 ) -> Result<Vec<GlobalId>, StorageError<T>> {
3239 let dependency = match &data_source {
3240 DataSource::Introspection(_)
3241 | DataSource::Webhook
3242 | DataSource::Table { primary: None }
3243 | DataSource::Progress
3244 | DataSource::Other => vec![],
3245 DataSource::Table {
3246 primary: Some(primary),
3247 } => vec![*primary],
3248 DataSource::IngestionExport { ingestion_id, .. } => {
3249 let source_collection = self.collection(*ingestion_id)?;
3252 let ingestion_remap_collection_id = match &source_collection.data_source {
3253 DataSource::Ingestion(ingestion) => ingestion.remap_collection_id,
3254 _ => unreachable!(
3255 "SourceExport must only refer to primary sources that already exist"
3256 ),
3257 };
3258
3259 vec![self_id, ingestion_remap_collection_id]
3265 }
3266 DataSource::Ingestion(ingestion) => {
3268 vec![self_id, ingestion.remap_collection_id]
3273 }
3274 DataSource::Sink { desc } => {
3275 vec![self_id, desc.sink.from]
3277 }
3278 };
3279
3280 Ok(dependency)
3281 }
3282
3283 async fn read_handle_for_snapshot(
3284 &self,
3285 id: GlobalId,
3286 ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
3287 let metadata = self.storage_collections.collection_metadata(id)?;
3288 read_handle_for_snapshot(&self.persist, id, &metadata).await
3289 }
3290
3291 fn record_status_updates(&mut self, updates: Vec<StatusUpdate>) {
3294 if self.read_only {
3295 return;
3296 }
3297
3298 let mut sink_status_updates = vec![];
3299 let mut source_status_updates = vec![];
3300
3301 for update in updates {
3302 let id = update.id;
3303 if self.export(id).is_ok() {
3304 sink_status_updates.push(update);
3305 } else if self.storage_collections.check_exists(id).is_ok() {
3306 source_status_updates.push(update);
3307 }
3308 }
3309
3310 self.append_status_introspection_updates(
3311 IntrospectionType::SourceStatusHistory,
3312 source_status_updates,
3313 );
3314 self.append_status_introspection_updates(
3315 IntrospectionType::SinkStatusHistory,
3316 sink_status_updates,
3317 );
3318 }
3319
3320 fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, StorageError<T>> {
3321 self.collections
3322 .get(&id)
3323 .ok_or(StorageError::IdentifierMissing(id))
3324 }
3325
3326 fn run_ingestion(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3329 tracing::info!(%id, "starting ingestion");
3330
3331 let collection = self.collection(id)?;
3332 let ingestion_description = match &collection.data_source {
3333 DataSource::Ingestion(i) => i.clone(),
3334 _ => {
3335 tracing::warn!("run_ingestion called on non-ingestion ID {}", id);
3336 Err(StorageError::IdentifierInvalid(id))?
3337 }
3338 };
3339
3340 let mut source_exports = BTreeMap::new();
3342 for (
3343 export_id,
3344 SourceExport {
3345 storage_metadata: (),
3346 details,
3347 data_config,
3348 },
3349 ) in ingestion_description.source_exports.clone()
3350 {
3351 let export_storage_metadata = self.collection(export_id)?.collection_metadata.clone();
3352 source_exports.insert(
3353 export_id,
3354 SourceExport {
3355 storage_metadata: export_storage_metadata,
3356 details,
3357 data_config,
3358 },
3359 );
3360 }
3361
3362 let description = IngestionDescription::<CollectionMetadata> {
3363 source_exports,
3364 ingestion_metadata: collection.collection_metadata.clone(),
3367 desc: ingestion_description.desc.clone(),
3369 instance_id: ingestion_description.instance_id,
3370 remap_collection_id: ingestion_description.remap_collection_id,
3371 };
3372
3373 let storage_instance_id = description.instance_id;
3374 let instance = self
3376 .instances
3377 .get_mut(&storage_instance_id)
3378 .ok_or_else(|| StorageError::IngestionInstanceMissing {
3379 storage_instance_id,
3380 ingestion_id: id,
3381 })?;
3382
3383 let augmented_ingestion = Box::new(RunIngestionCommand { id, description });
3384 instance.send(StorageCommand::RunIngestion(augmented_ingestion));
3385
3386 Ok(())
3387 }
3388
3389 fn run_export(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3392 let DataSource::Sink { desc: description } = &self.collections[&id].data_source else {
3393 return Err(StorageError::IdentifierMissing(id));
3394 };
3395
3396 let from_storage_metadata = self
3397 .storage_collections
3398 .collection_metadata(description.sink.from)?;
3399 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
3400
3401 let export_state = self.storage_collections.collection_frontiers(id)?;
3405 let mut as_of = description.sink.as_of.clone();
3406 as_of.join_assign(&export_state.implied_capability);
3407 let with_snapshot = description.sink.with_snapshot
3408 && !PartialOrder::less_than(&as_of, &export_state.write_frontier);
3409
3410 info!(
3411 sink_id = %id,
3412 from_id = %description.sink.from,
3413 write_frontier = ?export_state.write_frontier,
3414 ?as_of,
3415 ?with_snapshot,
3416 "run_export"
3417 );
3418
3419 let cmd = RunSinkCommand {
3420 id,
3421 description: StorageSinkDesc {
3422 from: description.sink.from,
3423 from_desc: description.sink.from_desc.clone(),
3424 connection: description.sink.connection.clone(),
3425 envelope: description.sink.envelope,
3426 as_of,
3427 version: description.sink.version,
3428 from_storage_metadata,
3429 with_snapshot,
3430 to_storage_metadata,
3431 },
3432 };
3433
3434 let storage_instance_id = description.instance_id.clone();
3435
3436 let instance = self
3437 .instances
3438 .get_mut(&storage_instance_id)
3439 .ok_or_else(|| StorageError::ExportInstanceMissing {
3440 storage_instance_id,
3441 export_id: id,
3442 })?;
3443
3444 instance.send(StorageCommand::RunSink(Box::new(cmd)));
3445
3446 Ok(())
3447 }
3448
3449 fn update_frontier_introspection(&mut self) {
3454 let mut global_frontiers = BTreeMap::new();
3455 let mut replica_frontiers = BTreeMap::new();
3456
3457 for collection_frontiers in self.storage_collections.active_collection_frontiers() {
3458 let id = collection_frontiers.id;
3459 let since = collection_frontiers.read_capabilities;
3460 let upper = collection_frontiers.write_frontier;
3461
3462 let instance = self
3463 .collections
3464 .get(&id)
3465 .and_then(|collection_state| match &collection_state.extra_state {
3466 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
3467 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
3468 CollectionStateExtra::None => None,
3469 })
3470 .and_then(|i| self.instances.get(&i));
3471
3472 if let Some(instance) = instance {
3473 for replica_id in instance.replica_ids() {
3474 replica_frontiers.insert((id, replica_id), upper.clone());
3475 }
3476 }
3477
3478 global_frontiers.insert(id, (since, upper));
3479 }
3480
3481 let mut global_updates = Vec::new();
3482 let mut replica_updates = Vec::new();
3483
3484 let mut push_global_update =
3485 |id: GlobalId, (since, upper): (Antichain<T>, Antichain<T>), diff: Diff| {
3486 let read_frontier = since.into_option().map_or(Datum::Null, |t| t.into());
3487 let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3488 let row = Row::pack_slice(&[
3489 Datum::String(&id.to_string()),
3490 read_frontier,
3491 write_frontier,
3492 ]);
3493 global_updates.push((row, diff));
3494 };
3495
3496 let mut push_replica_update =
3497 |(id, replica_id): (GlobalId, ReplicaId), upper: Antichain<T>, diff: Diff| {
3498 let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3499 let row = Row::pack_slice(&[
3500 Datum::String(&id.to_string()),
3501 Datum::String(&replica_id.to_string()),
3502 write_frontier,
3503 ]);
3504 replica_updates.push((row, diff));
3505 };
3506
3507 let mut old_global_frontiers =
3508 std::mem::replace(&mut self.recorded_frontiers, global_frontiers);
3509 for (&id, new) in &self.recorded_frontiers {
3510 match old_global_frontiers.remove(&id) {
3511 Some(old) if &old != new => {
3512 push_global_update(id, new.clone(), Diff::ONE);
3513 push_global_update(id, old, Diff::MINUS_ONE);
3514 }
3515 Some(_) => (),
3516 None => push_global_update(id, new.clone(), Diff::ONE),
3517 }
3518 }
3519 for (id, old) in old_global_frontiers {
3520 push_global_update(id, old, Diff::MINUS_ONE);
3521 }
3522
3523 let mut old_replica_frontiers =
3524 std::mem::replace(&mut self.recorded_replica_frontiers, replica_frontiers);
3525 for (&key, new) in &self.recorded_replica_frontiers {
3526 match old_replica_frontiers.remove(&key) {
3527 Some(old) if &old != new => {
3528 push_replica_update(key, new.clone(), Diff::ONE);
3529 push_replica_update(key, old, Diff::MINUS_ONE);
3530 }
3531 Some(_) => (),
3532 None => push_replica_update(key, new.clone(), Diff::ONE),
3533 }
3534 }
3535 for (key, old) in old_replica_frontiers {
3536 push_replica_update(key, old, Diff::MINUS_ONE);
3537 }
3538
3539 let id = self.introspection_ids[&IntrospectionType::Frontiers];
3540 self.collection_manager
3541 .differential_append(id, global_updates);
3542
3543 let id = self.introspection_ids[&IntrospectionType::ReplicaFrontiers];
3544 self.collection_manager
3545 .differential_append(id, replica_updates);
3546 }
3547
3548 fn refresh_wallclock_lag(&mut self) {
3567 let now_ms = (self.now)();
3568 let histogram_period =
3569 WallclockLagHistogramPeriod::from_epoch_millis(now_ms, self.config.config_set());
3570
3571 let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
3572 Some(ts) => (self.wallclock_lag)(ts.clone()),
3573 None => Duration::ZERO,
3574 };
3575
3576 for frontiers in self.storage_collections.active_collection_frontiers() {
3577 let id = frontiers.id;
3578 let Some(collection) = self.collections.get_mut(&id) else {
3579 continue;
3580 };
3581
3582 let collection_unreadable =
3583 PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
3584 let lag = if collection_unreadable {
3585 WallclockLag::Undefined
3586 } else {
3587 let lag = frontier_lag(&frontiers.write_frontier);
3588 WallclockLag::Seconds(lag.as_secs())
3589 };
3590
3591 collection.wallclock_lag_max = collection.wallclock_lag_max.max(lag);
3592
3593 let secs = lag.unwrap_seconds_or(u64::MAX);
3596 collection.wallclock_lag_metrics.observe(secs);
3597
3598 if !ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION.get(self.config.config_set()) {
3599 continue;
3600 }
3601
3602 if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
3603 let bucket = lag.map_seconds(|secs| secs.next_power_of_two());
3604
3605 let instance_id = match &collection.extra_state {
3606 CollectionStateExtra::Ingestion(i) => Some(i.instance_id),
3607 CollectionStateExtra::Export(e) => Some(e.cluster_id()),
3608 CollectionStateExtra::None => None,
3609 };
3610 let workload_class = instance_id
3611 .and_then(|id| self.instances.get(&id))
3612 .and_then(|i| i.workload_class.clone());
3613 let labels = match workload_class {
3614 Some(wc) => [("workload_class", wc.clone())].into(),
3615 None => BTreeMap::new(),
3616 };
3617
3618 let key = (histogram_period, bucket, labels);
3619 *stash.entry(key).or_default() += Diff::ONE;
3620 }
3621 }
3622
3623 self.maybe_record_wallclock_lag();
3625 }
3626
3627 fn maybe_record_wallclock_lag(&mut self) {
3635 if self.read_only {
3636 return;
3637 }
3638
3639 let duration_trunc = |datetime: DateTime<_>, interval| {
3640 let td = TimeDelta::from_std(interval).ok()?;
3641 datetime.duration_trunc(td).ok()
3642 };
3643
3644 let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(self.config.config_set());
3645 let now_dt = mz_ore::now::to_datetime((self.now)());
3646 let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
3647 soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
3648 let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
3649 duration_trunc(now_dt, *default).unwrap()
3650 });
3651 if now_trunc <= self.wallclock_lag_last_recorded {
3652 return;
3653 }
3654
3655 let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
3656
3657 let mut history_updates = Vec::new();
3658 let mut histogram_updates = Vec::new();
3659 let mut row_buf = Row::default();
3660 for frontiers in self.storage_collections.active_collection_frontiers() {
3661 let id = frontiers.id;
3662 let Some(collection) = self.collections.get_mut(&id) else {
3663 continue;
3664 };
3665
3666 let max_lag = std::mem::replace(&mut collection.wallclock_lag_max, WallclockLag::MIN);
3667 let row = Row::pack_slice(&[
3668 Datum::String(&id.to_string()),
3669 Datum::Null,
3670 max_lag.into_interval_datum(),
3671 Datum::TimestampTz(now_ts),
3672 ]);
3673 history_updates.push((row, Diff::ONE));
3674
3675 let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
3676 continue;
3677 };
3678
3679 for ((period, lag, labels), count) in std::mem::take(stash) {
3680 let mut packer = row_buf.packer();
3681 packer.extend([
3682 Datum::TimestampTz(period.start),
3683 Datum::TimestampTz(period.end),
3684 Datum::String(&id.to_string()),
3685 lag.into_uint64_datum(),
3686 ]);
3687 let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
3688 packer.push_dict(labels);
3689
3690 histogram_updates.push((row_buf.clone(), count));
3691 }
3692 }
3693
3694 if !history_updates.is_empty() {
3695 self.append_introspection_updates(
3696 IntrospectionType::WallclockLagHistory,
3697 history_updates,
3698 );
3699 }
3700 if !histogram_updates.is_empty() {
3701 self.append_introspection_updates(
3702 IntrospectionType::WallclockLagHistogram,
3703 histogram_updates,
3704 );
3705 }
3706
3707 self.wallclock_lag_last_recorded = now_trunc;
3708 }
3709
3710 fn maintain(&mut self) {
3715 self.update_frontier_introspection();
3716 self.refresh_wallclock_lag();
3717
3718 for instance in self.instances.values_mut() {
3720 instance.refresh_state_metrics();
3721 }
3722 }
3723}
3724
3725impl From<&IntrospectionType> for CollectionManagerKind {
3726 fn from(value: &IntrospectionType) -> Self {
3727 match value {
3728 IntrospectionType::ShardMapping
3729 | IntrospectionType::Frontiers
3730 | IntrospectionType::ReplicaFrontiers
3731 | IntrospectionType::StorageSourceStatistics
3732 | IntrospectionType::StorageSinkStatistics
3733 | IntrospectionType::ComputeDependencies
3734 | IntrospectionType::ComputeOperatorHydrationStatus
3735 | IntrospectionType::ComputeMaterializedViewRefreshes
3736 | IntrospectionType::ComputeErrorCounts
3737 | IntrospectionType::ComputeHydrationTimes => CollectionManagerKind::Differential,
3738
3739 IntrospectionType::SourceStatusHistory
3740 | IntrospectionType::SinkStatusHistory
3741 | IntrospectionType::PrivatelinkConnectionStatusHistory
3742 | IntrospectionType::ReplicaStatusHistory
3743 | IntrospectionType::ReplicaMetricsHistory
3744 | IntrospectionType::WallclockLagHistory
3745 | IntrospectionType::WallclockLagHistogram
3746 | IntrospectionType::PreparedStatementHistory
3747 | IntrospectionType::StatementExecutionHistory
3748 | IntrospectionType::SessionHistory
3749 | IntrospectionType::StatementLifecycleHistory
3750 | IntrospectionType::SqlText => CollectionManagerKind::AppendOnly,
3751 }
3752 }
3753}
3754
3755async fn snapshot_statistics<T>(
3761 id: GlobalId,
3762 upper: Antichain<T>,
3763 storage_collections: &Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
3764) -> Vec<Row>
3765where
3766 T: Codec64 + From<EpochMillis> + TimestampManipulation,
3767{
3768 match upper.as_option() {
3769 Some(f) if f > &T::minimum() => {
3770 let as_of = f.step_back().unwrap();
3771
3772 let snapshot = storage_collections.snapshot(id, as_of).await.unwrap();
3773 snapshot
3774 .into_iter()
3775 .map(|(row, diff)| {
3776 assert_eq!(diff, 1);
3777 row
3778 })
3779 .collect()
3780 }
3781 _ => Vec::new(),
3784 }
3785}
3786
3787async fn read_handle_for_snapshot<T>(
3788 persist: &PersistClientCache,
3789 id: GlobalId,
3790 metadata: &CollectionMetadata,
3791) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>>
3792where
3793 T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
3794{
3795 let persist_client = persist
3796 .open(metadata.persist_location.clone())
3797 .await
3798 .unwrap();
3799
3800 let read_handle = persist_client
3805 .open_leased_reader::<SourceData, (), _, _>(
3806 metadata.data_shard,
3807 Arc::new(metadata.relation_desc.clone()),
3808 Arc::new(UnitSchema),
3809 Diagnostics {
3810 shard_name: id.to_string(),
3811 handle_purpose: format!("snapshot {}", id),
3812 },
3813 USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3814 )
3815 .await
3816 .expect("invalid persist usage");
3817 Ok(read_handle)
3818}
3819
3820#[derive(Debug)]
3822struct CollectionState<T: TimelyTimestamp> {
3823 pub data_source: DataSource<T>,
3825
3826 pub collection_metadata: CollectionMetadata,
3827
3828 pub extra_state: CollectionStateExtra<T>,
3829
3830 wallclock_lag_max: WallclockLag,
3832 wallclock_lag_histogram_stash: Option<
3839 BTreeMap<
3840 (
3841 WallclockLagHistogramPeriod,
3842 WallclockLag,
3843 BTreeMap<&'static str, String>,
3844 ),
3845 Diff,
3846 >,
3847 >,
3848 wallclock_lag_metrics: WallclockLagMetrics,
3850}
3851
3852impl<T: TimelyTimestamp> CollectionState<T> {
3853 fn new(
3854 data_source: DataSource<T>,
3855 collection_metadata: CollectionMetadata,
3856 extra_state: CollectionStateExtra<T>,
3857 wallclock_lag_metrics: WallclockLagMetrics,
3858 ) -> Self {
3859 let wallclock_lag_histogram_stash = match &data_source {
3863 DataSource::Other => None,
3864 _ => Some(Default::default()),
3865 };
3866
3867 Self {
3868 data_source,
3869 collection_metadata,
3870 extra_state,
3871 wallclock_lag_max: WallclockLag::MIN,
3872 wallclock_lag_histogram_stash,
3873 wallclock_lag_metrics,
3874 }
3875 }
3876}
3877
3878#[derive(Debug)]
3880enum CollectionStateExtra<T: TimelyTimestamp> {
3881 Ingestion(IngestionState<T>),
3882 Export(ExportState<T>),
3883 None,
3884}
3885
3886#[derive(Debug)]
3888struct IngestionState<T: TimelyTimestamp> {
3889 pub read_capabilities: MutableAntichain<T>,
3891
3892 pub derived_since: Antichain<T>,
3895
3896 pub dependency_read_holds: Vec<ReadHold<T>>,
3898
3899 pub write_frontier: Antichain<T>,
3901
3902 pub hold_policy: ReadPolicy<T>,
3909
3910 pub instance_id: StorageInstanceId,
3912
3913 pub hydrated_on: BTreeSet<ReplicaId>,
3915}
3916
3917struct StatusHistoryDesc<K> {
3922 retention_policy: StatusHistoryRetentionPolicy,
3923 extract_key: Box<dyn Fn(&[Datum]) -> K + Send>,
3924 extract_time: Box<dyn Fn(&[Datum]) -> CheckedTimestamp<DateTime<Utc>> + Send>,
3925}
3926enum StatusHistoryRetentionPolicy {
3927 LastN(usize),
3929 TimeWindow(Duration),
3931}
3932
3933fn source_status_history_desc(
3934 params: &StorageParameters,
3935) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3936 let desc = &MZ_SOURCE_STATUS_HISTORY_DESC;
3937 let (source_id_idx, _) = desc.get_by_name(&"source_id".into()).expect("exists");
3938 let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3939 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3940
3941 StatusHistoryDesc {
3942 retention_policy: StatusHistoryRetentionPolicy::LastN(
3943 params.keep_n_source_status_history_entries,
3944 ),
3945 extract_key: Box::new(move |datums| {
3946 (
3947 GlobalId::from_str(datums[source_id_idx].unwrap_str()).expect("GlobalId column"),
3948 if datums[replica_id_idx].is_null() {
3949 None
3950 } else {
3951 Some(
3952 ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3953 .expect("ReplicaId column"),
3954 )
3955 },
3956 )
3957 }),
3958 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3959 }
3960}
3961
3962fn sink_status_history_desc(
3963 params: &StorageParameters,
3964) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3965 let desc = &MZ_SINK_STATUS_HISTORY_DESC;
3966 let (sink_id_idx, _) = desc.get_by_name(&"sink_id".into()).expect("exists");
3967 let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3968 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3969
3970 StatusHistoryDesc {
3971 retention_policy: StatusHistoryRetentionPolicy::LastN(
3972 params.keep_n_sink_status_history_entries,
3973 ),
3974 extract_key: Box::new(move |datums| {
3975 (
3976 GlobalId::from_str(datums[sink_id_idx].unwrap_str()).expect("GlobalId column"),
3977 if datums[replica_id_idx].is_null() {
3978 None
3979 } else {
3980 Some(
3981 ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3982 .expect("ReplicaId column"),
3983 )
3984 },
3985 )
3986 }),
3987 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3988 }
3989}
3990
3991fn privatelink_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<GlobalId> {
3992 let desc = &MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC;
3993 let (key_idx, _) = desc.get_by_name(&"connection_id".into()).expect("exists");
3994 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3995
3996 StatusHistoryDesc {
3997 retention_policy: StatusHistoryRetentionPolicy::LastN(
3998 params.keep_n_privatelink_status_history_entries,
3999 ),
4000 extract_key: Box::new(move |datums| {
4001 GlobalId::from_str(datums[key_idx].unwrap_str()).expect("GlobalId column")
4002 }),
4003 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4004 }
4005}
4006
4007fn replica_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<(GlobalId, u64)> {
4008 let desc = &REPLICA_STATUS_HISTORY_DESC;
4009 let (replica_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
4010 let (process_idx, _) = desc.get_by_name(&"process_id".into()).expect("exists");
4011 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4012
4013 StatusHistoryDesc {
4014 retention_policy: StatusHistoryRetentionPolicy::TimeWindow(
4015 params.replica_status_history_retention_window,
4016 ),
4017 extract_key: Box::new(move |datums| {
4018 (
4019 GlobalId::from_str(datums[replica_idx].unwrap_str()).expect("GlobalId column"),
4020 datums[process_idx].unwrap_uint64(),
4021 )
4022 }),
4023 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4024 }
4025}
4026
4027fn swap_updates<T: Timestamp>(
4029 from: &mut Antichain<T>,
4030 mut replace_with: Antichain<T>,
4031) -> ChangeBatch<T> {
4032 let mut update = ChangeBatch::new();
4033 if PartialOrder::less_equal(from, &replace_with) {
4034 update.extend(replace_with.iter().map(|time| (time.clone(), 1)));
4035 std::mem::swap(from, &mut replace_with);
4036 update.extend(replace_with.iter().map(|time| (time.clone(), -1)));
4037 }
4038 update
4039}