1use std::any::Any;
13use std::collections::btree_map;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt::{Debug, Display};
16use std::str::FromStr;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use async_trait::async_trait;
21use chrono::{DateTime, DurationRound, TimeDelta, Utc};
22use derivative::Derivative;
23use differential_dataflow::lattice::Lattice;
24use futures::FutureExt;
25use futures::StreamExt;
26use itertools::Itertools;
27use mz_build_info::BuildInfo;
28use mz_cluster_client::client::ClusterReplicaLocation;
29use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
30use mz_cluster_client::{ReplicaId, WallclockLagFn};
31use mz_controller_types::dyncfgs::{
32 ENABLE_0DT_DEPLOYMENT_SOURCES, WALLCLOCK_LAG_RECORDING_INTERVAL,
33};
34use mz_ore::collections::CollectionExt;
35use mz_ore::metrics::MetricsRegistry;
36use mz_ore::now::{EpochMillis, NowFn};
37use mz_ore::task::AbortOnDropHandle;
38use mz_ore::{assert_none, halt, instrument, soft_panic_or_log};
39use mz_persist_client::batch::ProtoBatch;
40use mz_persist_client::cache::PersistClientCache;
41use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
42use mz_persist_client::read::ReadHandle;
43use mz_persist_client::schema::CaESchema;
44use mz_persist_client::write::WriteHandle;
45use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
46use mz_persist_types::Codec64;
47use mz_persist_types::codec_impls::UnitSchema;
48use mz_repr::adt::timestamp::CheckedTimestamp;
49use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
50use mz_storage_client::client::{
51 AppendOnlyUpdate, RunIngestionCommand, RunOneshotIngestion, RunSinkCommand, Status,
52 StatusUpdate, StorageCommand, StorageResponse, TableData,
53};
54use mz_storage_client::controller::{
55 BoxFuture, CollectionDescription, DataSource, ExportDescription, ExportState,
56 IntrospectionType, MonotonicAppender, PersistEpoch, Response, StorageController,
57 StorageMetadata, StorageTxn, StorageWriteOp, WallclockLag, WallclockLagHistogramPeriod,
58};
59use mz_storage_client::healthcheck::{
60 MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC,
61 MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_STATUS_HISTORY_DESC,
62};
63use mz_storage_client::metrics::StorageControllerMetrics;
64use mz_storage_client::statistics::{
65 ControllerSinkStatistics, ControllerSourceStatistics, WebhookStatistics,
66};
67use mz_storage_client::storage_collections::StorageCollections;
68use mz_storage_types::configuration::StorageConfiguration;
69use mz_storage_types::connections::ConnectionContext;
70use mz_storage_types::connections::inline::InlinedConnection;
71use mz_storage_types::controller::{AlterError, CollectionMetadata, StorageError, TxnsCodecRow};
72use mz_storage_types::instances::StorageInstanceId;
73use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
74use mz_storage_types::parameters::StorageParameters;
75use mz_storage_types::read_holds::ReadHold;
76use mz_storage_types::read_policy::ReadPolicy;
77use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
78use mz_storage_types::sources::{
79 GenericSourceConnection, IngestionDescription, SourceConnection, SourceData, SourceDesc,
80 SourceExport, SourceExportDataConfig,
81};
82use mz_storage_types::{AlterCompatible, StorageDiff, dyncfgs};
83use mz_txn_wal::metrics::Metrics as TxnMetrics;
84use mz_txn_wal::txn_read::TxnsRead;
85use mz_txn_wal::txns::TxnsHandle;
86use timely::order::{PartialOrder, TotalOrder};
87use timely::progress::Timestamp as TimelyTimestamp;
88use timely::progress::frontier::MutableAntichain;
89use timely::progress::{Antichain, ChangeBatch, Timestamp};
90use tokio::sync::watch::{Sender, channel};
91use tokio::sync::{mpsc, oneshot};
92use tokio::time::MissedTickBehavior;
93use tokio::time::error::Elapsed;
94use tracing::{debug, info, warn};
95
96use crate::collection_mgmt::{
97 AppendOnlyIntrospectionConfig, CollectionManagerKind, DifferentialIntrospectionConfig,
98};
99use crate::instance::{Instance, ReplicaConfig};
100
101mod collection_mgmt;
102mod history;
103mod instance;
104mod persist_handles;
105mod rtr;
106mod statistics;
107
108#[derive(Derivative)]
109#[derivative(Debug)]
110struct PendingOneshotIngestion {
111 #[derivative(Debug = "ignore")]
113 result_tx: OneshotResultCallback<ProtoBatch>,
114 cluster_id: StorageInstanceId,
116}
117
118impl PendingOneshotIngestion {
119 pub(crate) fn cancel(self) {
123 (self.result_tx)(vec![Err("canceled".to_string())])
124 }
125}
126
127#[derive(Derivative)]
129#[derivative(Debug)]
130pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation>
131{
132 build_info: &'static BuildInfo,
134 now: NowFn,
136
137 read_only: bool,
143
144 pub(crate) collections: BTreeMap<GlobalId, CollectionState<T>>,
149
150 dropped_objects: BTreeMap<GlobalId, BTreeSet<ReplicaId>>,
159
160 pub(crate) persist_table_worker: persist_handles::PersistTableWriteWorker<T>,
162 txns_read: TxnsRead<T>,
164 txns_metrics: Arc<TxnMetrics>,
165 stashed_responses: Vec<(Option<ReplicaId>, StorageResponse<T>)>,
166 #[derivative(Debug = "ignore")]
168 pending_table_handle_drops_tx: mpsc::UnboundedSender<GlobalId>,
169 #[derivative(Debug = "ignore")]
171 pending_table_handle_drops_rx: mpsc::UnboundedReceiver<GlobalId>,
172 #[derivative(Debug = "ignore")]
174 pending_oneshot_ingestions: BTreeMap<uuid::Uuid, PendingOneshotIngestion>,
175
176 pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
178
179 pub(crate) introspection_ids: BTreeMap<IntrospectionType, GlobalId>,
181 introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
186
187 source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
192 sink_statistics: Arc<Mutex<BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSinkStatistics>>>,
195 statistics_interval_sender: Sender<Duration>,
197
198 instances: BTreeMap<StorageInstanceId, Instance<T>>,
200 initialized: bool,
202 config: StorageConfiguration,
204 persist_location: PersistLocation,
206 persist: Arc<PersistClientCache>,
208 metrics: StorageControllerMetrics,
210 recorded_frontiers: BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)>,
213 recorded_replica_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<T>>,
216
217 #[derivative(Debug = "ignore")]
219 wallclock_lag: WallclockLagFn<T>,
220 wallclock_lag_last_recorded: DateTime<Utc>,
222
223 storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
225 migrated_storage_collections: BTreeSet<GlobalId>,
227
228 maintenance_ticker: tokio::time::Interval,
230 maintenance_scheduled: bool,
232
233 instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
235 instance_response_rx: mpsc::UnboundedReceiver<(Option<ReplicaId>, StorageResponse<T>)>,
237
238 persist_warm_task: Option<AbortOnDropHandle<Box<dyn Debug + Send>>>,
240}
241
242fn warm_persist_state_in_background(
247 client: PersistClient,
248 shard_ids: impl Iterator<Item = ShardId> + Send + 'static,
249) -> mz_ore::task::JoinHandle<Box<dyn Debug + Send>> {
250 const MAX_CONCURRENT_WARMS: usize = 16;
252 let logic = async move {
253 let fetchers: Vec<_> = tokio_stream::iter(shard_ids)
254 .map(|shard_id| {
255 let client = client.clone();
256 async move {
257 client
258 .create_batch_fetcher::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
259 shard_id,
260 Arc::new(RelationDesc::empty()),
261 Arc::new(UnitSchema),
262 true,
263 Diagnostics::from_purpose("warm persist load state"),
264 )
265 .await
266 }
267 })
268 .buffer_unordered(MAX_CONCURRENT_WARMS)
269 .collect()
270 .await;
271 let fetchers: Box<dyn Debug + Send> = Box::new(fetchers);
272 fetchers
273 };
274 mz_ore::task::spawn(|| "warm_persist_load_state", logic)
275}
276
277#[async_trait(?Send)]
278impl<T> StorageController for Controller<T>
279where
280 T: Timestamp
281 + Lattice
282 + TotalOrder
283 + Codec64
284 + From<EpochMillis>
285 + TimestampManipulation
286 + Into<Datum<'static>>
287 + Display,
288{
289 type Timestamp = T;
290
291 fn initialization_complete(&mut self) {
292 self.reconcile_dangling_statistics();
293 self.initialized = true;
294
295 for instance in self.instances.values_mut() {
296 instance.send(StorageCommand::InitializationComplete);
297 }
298 }
299
300 fn update_parameters(&mut self, config_params: StorageParameters) {
301 self.storage_collections
302 .update_parameters(config_params.clone());
303
304 self.persist.cfg().apply_from(&config_params.dyncfg_updates);
307
308 for instance in self.instances.values_mut() {
309 let params = Box::new(config_params.clone());
310 instance.send(StorageCommand::UpdateConfiguration(params));
311 }
312 self.config.update(config_params);
313 self.statistics_interval_sender
314 .send_replace(self.config.parameters.statistics_interval);
315 self.collection_manager.update_user_batch_duration(
316 self.config
317 .parameters
318 .user_storage_managed_collections_batch_duration,
319 );
320 }
321
322 fn config(&self) -> &StorageConfiguration {
324 &self.config
325 }
326
327 fn collection_metadata(
328 &self,
329 id: GlobalId,
330 ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>> {
331 self.storage_collections.collection_metadata(id)
332 }
333
334 fn collection_hydrated(
335 &self,
336 collection_id: GlobalId,
337 ) -> Result<bool, StorageError<Self::Timestamp>> {
338 let collection = self.collection(collection_id)?;
339
340 let instance_id = match &collection.data_source {
341 DataSource::Ingestion(ingestion_description) => ingestion_description.instance_id,
342 DataSource::IngestionExport { ingestion_id, .. } => {
343 let ingestion_state = self.collections.get(ingestion_id).expect("known to exist");
344
345 let instance_id = match &ingestion_state.data_source {
346 DataSource::Ingestion(ingestion_desc) => ingestion_desc.instance_id,
347 _ => unreachable!("SourceExport must only refer to primary source"),
348 };
349
350 instance_id
351 }
352 _ => return Ok(true),
353 };
354
355 let instance = self.instances.get(&instance_id).ok_or_else(|| {
356 StorageError::IngestionInstanceMissing {
357 storage_instance_id: instance_id,
358 ingestion_id: collection_id,
359 }
360 })?;
361
362 if instance.replica_ids().next().is_none() {
363 return Ok(true);
366 }
367
368 match &collection.extra_state {
369 CollectionStateExtra::Ingestion(ingestion_state) => {
370 Ok(ingestion_state.hydrated_on.len() >= 1)
372 }
373 CollectionStateExtra::Export(_) => {
374 Ok(true)
379 }
380 CollectionStateExtra::None => {
381 Ok(true)
385 }
386 }
387 }
388
389 #[mz_ore::instrument(level = "debug")]
390 fn collections_hydrated_on_replicas(
391 &self,
392 target_replica_ids: Option<Vec<ReplicaId>>,
393 target_cluster_id: &StorageInstanceId,
394 exclude_collections: &BTreeSet<GlobalId>,
395 ) -> Result<bool, StorageError<Self::Timestamp>> {
396 if target_replica_ids.as_ref().is_some_and(|v| v.is_empty()) {
399 return Ok(true);
400 }
401
402 let target_replicas: Option<BTreeSet<ReplicaId>> =
405 target_replica_ids.map(|ids| ids.into_iter().collect());
406
407 let mut all_hydrated = true;
408 for (collection_id, collection_state) in self.collections.iter() {
409 if collection_id.is_transient() || exclude_collections.contains(collection_id) {
410 continue;
411 }
412 let hydrated = match &collection_state.extra_state {
413 CollectionStateExtra::Ingestion(state) => {
414 if &state.instance_id != target_cluster_id {
415 continue;
416 }
417 match &target_replicas {
418 Some(target_replicas) => !state.hydrated_on.is_disjoint(target_replicas),
419 None => {
420 state.hydrated_on.len() >= 1
423 }
424 }
425 }
426 CollectionStateExtra::Export(_) => {
427 true
432 }
433 CollectionStateExtra::None => {
434 true
438 }
439 };
440 if !hydrated {
441 tracing::info!(%collection_id, "collection is not hydrated on any replica");
442 all_hydrated = false;
443 }
446 }
447 Ok(all_hydrated)
448 }
449
450 fn collection_frontiers(
451 &self,
452 id: GlobalId,
453 ) -> Result<
454 (Antichain<Self::Timestamp>, Antichain<Self::Timestamp>),
455 StorageError<Self::Timestamp>,
456 > {
457 let frontiers = self.storage_collections.collection_frontiers(id)?;
458 Ok((frontiers.implied_capability, frontiers.write_frontier))
459 }
460
461 fn collections_frontiers(
462 &self,
463 mut ids: Vec<GlobalId>,
464 ) -> Result<Vec<(GlobalId, Antichain<T>, Antichain<T>)>, StorageError<Self::Timestamp>> {
465 let mut result = vec![];
466 ids.retain(|&id| match self.export(id) {
471 Ok(export) => {
472 result.push((
473 id,
474 export.input_hold().since().clone(),
475 export.write_frontier.clone(),
476 ));
477 false
478 }
479 Err(_) => true,
480 });
481 result.extend(
482 self.storage_collections
483 .collections_frontiers(ids)?
484 .into_iter()
485 .map(|frontiers| {
486 (
487 frontiers.id,
488 frontiers.implied_capability,
489 frontiers.write_frontier,
490 )
491 }),
492 );
493
494 Ok(result)
495 }
496
497 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
498 self.storage_collections.active_collection_metadatas()
499 }
500
501 fn active_ingestion_exports(
502 &self,
503 instance_id: StorageInstanceId,
504 ) -> Box<dyn Iterator<Item = &GlobalId> + '_> {
505 let active_storage_collections: BTreeMap<_, _> = self
506 .storage_collections
507 .active_collection_frontiers()
508 .into_iter()
509 .map(|c| (c.id, c))
510 .collect();
511
512 let active_exports = self.instances[&instance_id]
513 .active_ingestion_exports()
514 .filter(move |id| {
515 let frontiers = active_storage_collections.get(id);
516 match frontiers {
517 Some(frontiers) => !frontiers.write_frontier.is_empty(),
518 None => {
519 false
521 }
522 }
523 });
524
525 Box::new(active_exports)
526 }
527
528 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
529 self.storage_collections.check_exists(id)
530 }
531
532 fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>) {
533 let metrics = self.metrics.for_instance(id);
534 let mut instance = Instance::new(
535 workload_class,
536 metrics,
537 self.now.clone(),
538 self.instance_response_tx.clone(),
539 );
540 if self.initialized {
541 instance.send(StorageCommand::InitializationComplete);
542 }
543 if !self.read_only {
544 instance.send(StorageCommand::AllowWrites);
545 }
546
547 let params = Box::new(self.config.parameters.clone());
548 instance.send(StorageCommand::UpdateConfiguration(params));
549
550 let old_instance = self.instances.insert(id, instance);
551 assert_none!(old_instance, "storage instance {id} already exists");
552 }
553
554 fn drop_instance(&mut self, id: StorageInstanceId) {
555 let instance = self.instances.remove(&id);
556 assert!(instance.is_some(), "storage instance {id} does not exist");
557 }
558
559 fn update_instance_workload_class(
560 &mut self,
561 id: StorageInstanceId,
562 workload_class: Option<String>,
563 ) {
564 let instance = self
565 .instances
566 .get_mut(&id)
567 .unwrap_or_else(|| panic!("instance {id} does not exist"));
568
569 instance.workload_class = workload_class;
570 }
571
572 fn connect_replica(
573 &mut self,
574 instance_id: StorageInstanceId,
575 replica_id: ReplicaId,
576 location: ClusterReplicaLocation,
577 ) {
578 let instance = self
579 .instances
580 .get_mut(&instance_id)
581 .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
582
583 let config = ReplicaConfig {
584 build_info: self.build_info,
585 location,
586 grpc_client: self.config.parameters.grpc_client.clone(),
587 };
588 instance.add_replica(replica_id, config);
589 }
590
591 fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId) {
592 let instance = self
593 .instances
594 .get_mut(&instance_id)
595 .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
596
597 let status_now = mz_ore::now::to_datetime((self.now)());
598 let mut source_status_updates = vec![];
599 let mut sink_status_updates = vec![];
600
601 let make_update = |id, object_type| StatusUpdate {
602 id,
603 status: Status::Paused,
604 timestamp: status_now,
605 error: None,
606 hints: BTreeSet::from([format!(
607 "The replica running this {object_type} has been dropped"
608 )]),
609 namespaced_errors: Default::default(),
610 replica_id: Some(replica_id),
611 };
612
613 for ingestion_id in instance.active_ingestions() {
614 if let Some(active_replicas) = self.dropped_objects.get_mut(ingestion_id) {
615 active_replicas.remove(&replica_id);
616 if active_replicas.is_empty() {
617 self.dropped_objects.remove(ingestion_id);
618 }
619 }
620
621 let ingestion = self
622 .collections
623 .get_mut(ingestion_id)
624 .expect("instance contains unknown ingestion");
625
626 let ingestion_description = match &ingestion.data_source {
627 DataSource::Ingestion(ingestion_description) => ingestion_description.clone(),
628 _ => panic!(
629 "unexpected data source for ingestion: {:?}",
630 ingestion.data_source
631 ),
632 };
633
634 let old_style_ingestion = *ingestion_id != ingestion_description.remap_collection_id;
635 let subsource_ids = ingestion_description.collection_ids().filter(|id| {
636 let should_discard =
641 old_style_ingestion && id == &ingestion_description.remap_collection_id;
642 !should_discard
643 });
644 for id in subsource_ids {
645 source_status_updates.push(make_update(id, "source"));
646 }
647 }
648
649 for id in instance.active_exports() {
650 if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
651 active_replicas.remove(&replica_id);
652 if active_replicas.is_empty() {
653 self.dropped_objects.remove(id);
654 }
655 }
656
657 sink_status_updates.push(make_update(*id, "sink"));
658 }
659
660 instance.drop_replica(replica_id);
661
662 if !self.read_only {
663 if !source_status_updates.is_empty() {
664 self.append_status_introspection_updates(
665 IntrospectionType::SourceStatusHistory,
666 source_status_updates,
667 );
668 }
669 if !sink_status_updates.is_empty() {
670 self.append_status_introspection_updates(
671 IntrospectionType::SinkStatusHistory,
672 sink_status_updates,
673 );
674 }
675 }
676 }
677
678 async fn evolve_nullability_for_bootstrap(
679 &mut self,
680 storage_metadata: &StorageMetadata,
681 collections: Vec<(GlobalId, RelationDesc)>,
682 ) -> Result<(), StorageError<Self::Timestamp>> {
683 let persist_client = self
684 .persist
685 .open(self.persist_location.clone())
686 .await
687 .unwrap();
688
689 for (global_id, relation_desc) in collections {
690 let shard_id = storage_metadata.get_collection_shard(global_id)?;
691 let diagnostics = Diagnostics {
692 shard_name: global_id.to_string(),
693 handle_purpose: "evolve nullability for bootstrap".to_string(),
694 };
695 let latest_schema = persist_client
696 .latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
697 .await
698 .expect("invalid persist usage");
699 let Some((schema_id, current_schema, _)) = latest_schema else {
700 tracing::debug!(?global_id, "no schema registered");
701 continue;
702 };
703 tracing::debug!(?global_id, ?current_schema, new_schema = ?relation_desc, "migrating schema");
704
705 let diagnostics = Diagnostics {
706 shard_name: global_id.to_string(),
707 handle_purpose: "evolve nullability for bootstrap".to_string(),
708 };
709 let evolve_result = persist_client
710 .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
711 shard_id,
712 schema_id,
713 &relation_desc,
714 &UnitSchema,
715 diagnostics,
716 )
717 .await
718 .expect("invalid persist usage");
719 match evolve_result {
720 CaESchema::Ok(_) => (),
721 CaESchema::ExpectedMismatch {
722 schema_id,
723 key,
724 val: _,
725 } => {
726 return Err(StorageError::PersistSchemaEvolveRace {
727 global_id,
728 shard_id,
729 schema_id,
730 relation_desc: key,
731 });
732 }
733 CaESchema::Incompatible => {
734 return Err(StorageError::PersistInvalidSchemaEvolve {
735 global_id,
736 shard_id,
737 });
738 }
739 };
740 }
741
742 Ok(())
743 }
744
745 #[instrument(name = "storage::create_collections")]
764 async fn create_collections_for_bootstrap(
765 &mut self,
766 storage_metadata: &StorageMetadata,
767 register_ts: Option<Self::Timestamp>,
768 mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
769 migrated_storage_collections: &BTreeSet<GlobalId>,
770 ) -> Result<(), StorageError<Self::Timestamp>> {
771 self.migrated_storage_collections
772 .extend(migrated_storage_collections.iter().cloned());
773
774 self.storage_collections
775 .create_collections_for_bootstrap(
776 storage_metadata,
777 register_ts.clone(),
778 collections.clone(),
779 migrated_storage_collections,
780 )
781 .await?;
782
783 drop(self.persist_warm_task.take());
786
787 collections.sort_by_key(|(id, _)| *id);
792 collections.dedup();
793 for pos in 1..collections.len() {
794 if collections[pos - 1].0 == collections[pos].0 {
795 return Err(StorageError::CollectionIdReused(collections[pos].0));
796 }
797 }
798
799 let enriched_with_metadata = collections
801 .into_iter()
802 .map(|(id, description)| {
803 let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
804
805 let txns_shard = description
808 .data_source
809 .in_txns()
810 .then(|| *self.txns_read.txns_id());
811
812 let metadata = CollectionMetadata {
813 persist_location: self.persist_location.clone(),
814 data_shard,
815 relation_desc: description.desc.clone(),
816 txns_shard,
817 };
818
819 Ok((id, description, metadata))
820 })
821 .collect_vec();
822
823 let persist_client = self
825 .persist
826 .open(self.persist_location.clone())
827 .await
828 .unwrap();
829 let persist_client = &persist_client;
830
831 use futures::stream::{StreamExt, TryStreamExt};
834 let this = &*self;
835 let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
836 .map(|data: Result<_, StorageError<Self::Timestamp>>| {
837 async move {
838 let (id, description, metadata) = data?;
839
840 debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
843
844 let write = this
845 .open_data_handles(
846 &id,
847 metadata.data_shard,
848 metadata.relation_desc.clone(),
849 persist_client,
850 )
851 .await;
852
853 Ok::<_, StorageError<T>>((id, description, write, metadata))
854 }
855 })
856 .buffer_unordered(50)
858 .try_collect()
871 .await?;
872
873 let mut to_execute = BTreeSet::new();
876 let mut new_collections = BTreeSet::new();
881 let mut table_registers = Vec::with_capacity(to_register.len());
882
883 to_register.sort_by_key(|(id, ..)| *id);
885
886 let (tables_to_register, collections_to_register): (Vec<_>, Vec<_>) = to_register
892 .into_iter()
893 .partition(|(_id, desc, ..)| matches!(desc.data_source, DataSource::Table { .. }));
894 let to_register = tables_to_register
895 .into_iter()
896 .rev()
897 .chain(collections_to_register.into_iter());
898
899 let mut new_source_statistic_entries = BTreeSet::new();
903 let mut new_webhook_statistic_entries = BTreeSet::new();
904 let mut new_sink_statistic_entries = BTreeSet::new();
905
906 for (id, description, write, metadata) in to_register {
907 let is_in_txns = |id, metadata: &CollectionMetadata| {
908 metadata.txns_shard.is_some()
909 && !(self.read_only && migrated_storage_collections.contains(&id))
910 };
911
912 let data_source = description.data_source;
913
914 to_execute.insert(id);
915 new_collections.insert(id);
916
917 let write_frontier = write.upper();
918
919 let storage_dependencies = self.determine_collection_dependencies(id, &data_source)?;
921
922 let dependency_read_holds = self
923 .storage_collections
924 .acquire_read_holds(storage_dependencies)
925 .expect("can acquire read holds");
926
927 let mut dependency_since = Antichain::from_elem(T::minimum());
928 for read_hold in dependency_read_holds.iter() {
929 dependency_since.join_assign(read_hold.since());
930 }
931
932 if !dependency_read_holds.is_empty()
941 && !is_in_txns(id, &metadata)
942 && !matches!(&data_source, DataSource::Sink { .. })
943 {
944 if dependency_since.is_empty() {
950 halt!(
951 "dependency since frontier is empty while dependent upper \
952 is not empty (dependent id={id}, write_frontier={:?}, dependency_read_holds={:?}), \
953 this indicates concurrent deletion of a collection",
954 write_frontier,
955 dependency_read_holds,
956 );
957 }
958
959 mz_ore::soft_assert_or_log!(
977 write_frontier.elements() == &[T::minimum()]
978 || write_frontier.is_empty()
979 || PartialOrder::less_than(&dependency_since, write_frontier),
980 "dependency since has advanced past dependent ({id}) upper \n
981 dependent ({id}): upper {:?} \n
982 dependency since {:?} \n
983 dependency read holds: {:?}",
984 write_frontier,
985 dependency_since,
986 dependency_read_holds,
987 );
988 }
989
990 let mut extra_state = CollectionStateExtra::None;
992 let mut maybe_instance_id = None;
993 match &data_source {
994 DataSource::Introspection(typ) => {
995 debug!(
996 ?data_source, meta = ?metadata,
997 "registering {id} with persist monotonic worker",
998 );
999 self.register_introspection_collection(
1005 id,
1006 *typ,
1007 write,
1008 persist_client.clone(),
1009 )?;
1010 }
1011 DataSource::Webhook => {
1012 debug!(
1013 ?data_source, meta = ?metadata,
1014 "registering {id} with persist monotonic worker",
1015 );
1016 new_source_statistic_entries.insert(id);
1017 new_webhook_statistic_entries.insert(id);
1020 self.collection_manager
1026 .register_append_only_collection(id, write, false, None);
1027 }
1028 DataSource::IngestionExport {
1029 ingestion_id,
1030 details,
1031 data_config,
1032 } => {
1033 debug!(
1034 ?data_source, meta = ?metadata,
1035 "not registering {id} with a controller persist worker",
1036 );
1037 let ingestion_state = self
1039 .collections
1040 .get_mut(ingestion_id)
1041 .expect("known to exist");
1042
1043 let instance_id = match &mut ingestion_state.data_source {
1044 DataSource::Ingestion(ingestion_desc) => {
1045 ingestion_desc.source_exports.insert(
1046 id,
1047 SourceExport {
1048 storage_metadata: (),
1049 details: details.clone(),
1050 data_config: data_config.clone(),
1051 },
1052 );
1053
1054 ingestion_desc.instance_id
1059 }
1060 _ => unreachable!(
1061 "SourceExport must only refer to primary sources that already exist"
1062 ),
1063 };
1064
1065 to_execute.remove(&id);
1067 to_execute.insert(*ingestion_id);
1068
1069 let ingestion_state = IngestionState {
1070 read_capabilities: MutableAntichain::from(dependency_since.clone()),
1071 dependency_read_holds,
1072 derived_since: dependency_since,
1073 write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1074 hold_policy: ReadPolicy::step_back(),
1075 instance_id,
1076 hydrated_on: BTreeSet::new(),
1077 };
1078
1079 extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1080 maybe_instance_id = Some(instance_id);
1081
1082 new_source_statistic_entries.insert(id);
1083 }
1084 DataSource::Table { .. } => {
1085 debug!(
1086 ?data_source, meta = ?metadata,
1087 "registering {id} with persist table worker",
1088 );
1089 table_registers.push((id, write));
1090 }
1091 DataSource::Progress | DataSource::Other => {
1092 debug!(
1093 ?data_source, meta = ?metadata,
1094 "not registering {id} with a controller persist worker",
1095 );
1096 }
1097 DataSource::Ingestion(ingestion_desc) => {
1098 debug!(
1099 ?data_source, meta = ?metadata,
1100 "not registering {id} with a controller persist worker",
1101 );
1102
1103 let mut dependency_since = Antichain::from_elem(T::minimum());
1104 for read_hold in dependency_read_holds.iter() {
1105 dependency_since.join_assign(read_hold.since());
1106 }
1107
1108 let ingestion_state = IngestionState {
1109 read_capabilities: MutableAntichain::from(dependency_since.clone()),
1110 dependency_read_holds,
1111 derived_since: dependency_since,
1112 write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1113 hold_policy: ReadPolicy::step_back(),
1114 instance_id: ingestion_desc.instance_id,
1115 hydrated_on: BTreeSet::new(),
1116 };
1117
1118 extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1119 maybe_instance_id = Some(ingestion_desc.instance_id);
1120
1121 new_source_statistic_entries.insert(id);
1122 }
1123 DataSource::Sink { desc } => {
1124 let mut dependency_since = Antichain::from_elem(T::minimum());
1125 for read_hold in dependency_read_holds.iter() {
1126 dependency_since.join_assign(read_hold.since());
1127 }
1128
1129 let [self_hold, read_hold] =
1130 dependency_read_holds.try_into().expect("two holds");
1131
1132 let state = ExportState::new(
1133 desc.instance_id,
1134 read_hold,
1135 self_hold,
1136 write_frontier.clone(),
1137 ReadPolicy::step_back(),
1138 );
1139 maybe_instance_id = Some(state.cluster_id);
1140 extra_state = CollectionStateExtra::Export(state);
1141
1142 new_sink_statistic_entries.insert(id);
1143 }
1144 }
1145
1146 let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(id, maybe_instance_id);
1147 let collection_state =
1148 CollectionState::new(data_source, metadata, extra_state, wallclock_lag_metrics);
1149
1150 self.collections.insert(id, collection_state);
1151 }
1152
1153 {
1154 let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1155
1156 for id in new_webhook_statistic_entries {
1159 source_statistics.webhook_statistics.entry(id).or_default();
1160 }
1161
1162 }
1166
1167 if !table_registers.is_empty() {
1169 let register_ts = register_ts
1170 .expect("caller should have provided a register_ts when creating a table");
1171
1172 if self.read_only {
1173 table_registers
1183 .retain(|(id, _write_handle)| migrated_storage_collections.contains(id));
1184
1185 self.persist_table_worker
1186 .register(register_ts, table_registers)
1187 .await
1188 .expect("table worker unexpectedly shut down");
1189 } else {
1190 self.persist_table_worker
1191 .register(register_ts, table_registers)
1192 .await
1193 .expect("table worker unexpectedly shut down");
1194 }
1195 }
1196
1197 self.append_shard_mappings(new_collections.into_iter(), Diff::ONE);
1198
1199 for id in to_execute {
1201 match &self.collection(id)?.data_source {
1202 DataSource::Ingestion(ingestion) => {
1203 if !self.read_only
1204 || (ENABLE_0DT_DEPLOYMENT_SOURCES.get(self.config.config_set())
1205 && ingestion.desc.connection.supports_read_only())
1206 {
1207 self.run_ingestion(id)?;
1208 }
1209 }
1210 DataSource::IngestionExport { .. } => unreachable!(
1211 "ingestion exports do not execute directly, but instead schedule their source to be re-executed"
1212 ),
1213 DataSource::Introspection(_)
1214 | DataSource::Webhook
1215 | DataSource::Table { .. }
1216 | DataSource::Progress
1217 | DataSource::Other => {}
1218 DataSource::Sink { .. } => {
1219 if !self.read_only {
1220 self.run_export(id)?;
1221 }
1222 }
1223 };
1224 }
1225
1226 Ok(())
1227 }
1228
1229 fn check_alter_ingestion_source_desc(
1230 &mut self,
1231 ingestion_id: GlobalId,
1232 source_desc: &SourceDesc,
1233 ) -> Result<(), StorageError<Self::Timestamp>> {
1234 let source_collection = self.collection(ingestion_id)?;
1235 let data_source = &source_collection.data_source;
1236 match &data_source {
1237 DataSource::Ingestion(cur_ingestion) => {
1238 cur_ingestion
1239 .desc
1240 .alter_compatible(ingestion_id, source_desc)?;
1241 }
1242 o => {
1243 tracing::info!(
1244 "{ingestion_id} inalterable because its data source is {:?} and not an ingestion",
1245 o
1246 );
1247 Err(AlterError { id: ingestion_id })?
1248 }
1249 }
1250
1251 Ok(())
1252 }
1253
1254 async fn alter_ingestion_connections(
1255 &mut self,
1256 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
1257 ) -> Result<(), StorageError<Self::Timestamp>> {
1258 self.storage_collections
1260 .alter_ingestion_connections(source_connections.clone())
1261 .await?;
1262
1263 let mut ingestions_to_run = BTreeSet::new();
1264
1265 for (id, conn) in source_connections {
1266 let collection = self
1267 .collections
1268 .get_mut(&id)
1269 .ok_or_else(|| StorageError::IdentifierMissing(id))?;
1270
1271 match &mut collection.data_source {
1272 DataSource::Ingestion(ingestion) => {
1273 if ingestion.desc.connection != conn {
1276 tracing::info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
1277 ingestion.desc.connection = conn;
1278 ingestions_to_run.insert(id);
1279 } else {
1280 tracing::warn!(
1281 "update_source_connection called on {id} but the \
1282 connection was the same"
1283 );
1284 }
1285 }
1286 o => {
1287 tracing::warn!("update_source_connection called on {:?}", o);
1288 Err(StorageError::IdentifierInvalid(id))?;
1289 }
1290 }
1291 }
1292
1293 for id in ingestions_to_run {
1294 self.run_ingestion(id)?;
1295 }
1296 Ok(())
1297 }
1298
1299 async fn alter_ingestion_export_data_configs(
1300 &mut self,
1301 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
1302 ) -> Result<(), StorageError<Self::Timestamp>> {
1303 self.storage_collections
1305 .alter_ingestion_export_data_configs(source_exports.clone())
1306 .await?;
1307
1308 let mut ingestions_to_run = BTreeSet::new();
1309
1310 for (source_export_id, new_data_config) in source_exports {
1311 let source_export_collection = self
1314 .collections
1315 .get_mut(&source_export_id)
1316 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1317 let ingestion_id = match &mut source_export_collection.data_source {
1318 DataSource::IngestionExport {
1319 ingestion_id,
1320 details: _,
1321 data_config,
1322 } => {
1323 *data_config = new_data_config.clone();
1324 *ingestion_id
1325 }
1326 o => {
1327 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1328 Err(StorageError::IdentifierInvalid(source_export_id))?
1329 }
1330 };
1331 let ingestion_collection = self
1334 .collections
1335 .get_mut(&ingestion_id)
1336 .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
1337
1338 match &mut ingestion_collection.data_source {
1339 DataSource::Ingestion(ingestion_desc) => {
1340 let source_export = ingestion_desc
1341 .source_exports
1342 .get_mut(&source_export_id)
1343 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1344
1345 if source_export.data_config != new_data_config {
1348 tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
1349 source_export.data_config = new_data_config;
1350
1351 ingestions_to_run.insert(ingestion_id);
1352 } else {
1353 tracing::warn!(
1354 "alter_ingestion_export_data_configs called on \
1355 export {source_export_id} of {ingestion_id} but \
1356 the data config was the same"
1357 );
1358 }
1359 }
1360 o => {
1361 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1362 Err(StorageError::IdentifierInvalid(ingestion_id))?
1363 }
1364 }
1365 }
1366
1367 for id in ingestions_to_run {
1368 self.run_ingestion(id)?;
1369 }
1370 Ok(())
1371 }
1372
1373 async fn alter_table_desc(
1374 &mut self,
1375 existing_collection: GlobalId,
1376 new_collection: GlobalId,
1377 new_desc: RelationDesc,
1378 expected_version: RelationVersion,
1379 register_ts: Self::Timestamp,
1380 ) -> Result<(), StorageError<Self::Timestamp>> {
1381 let data_shard = {
1382 let Controller {
1383 collections,
1384 storage_collections,
1385 ..
1386 } = self;
1387
1388 let existing = collections
1389 .get(&existing_collection)
1390 .ok_or(StorageError::IdentifierMissing(existing_collection))?;
1391 if !matches!(existing.data_source, DataSource::Table { .. }) {
1392 return Err(StorageError::IdentifierInvalid(existing_collection));
1393 }
1394
1395 storage_collections
1397 .alter_table_desc(
1398 existing_collection,
1399 new_collection,
1400 new_desc.clone(),
1401 expected_version,
1402 )
1403 .await?;
1404
1405 existing.collection_metadata.data_shard.clone()
1406 };
1407
1408 let persist_client = self
1409 .persist
1410 .open(self.persist_location.clone())
1411 .await
1412 .expect("invalid persist location");
1413 let write_handle = self
1414 .open_data_handles(
1415 &existing_collection,
1416 data_shard,
1417 new_desc.clone(),
1418 &persist_client,
1419 )
1420 .await;
1421
1422 let collection_desc = CollectionDescription::<T>::for_table(new_desc.clone(), None);
1424 let collection_meta = CollectionMetadata {
1425 persist_location: self.persist_location.clone(),
1426 data_shard,
1427 relation_desc: new_desc.clone(),
1428 txns_shard: Some(self.txns_read.txns_id().clone()),
1430 };
1431 let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(new_collection, None);
1433 let collection_state = CollectionState::new(
1434 collection_desc.data_source.clone(),
1435 collection_meta,
1436 CollectionStateExtra::None,
1437 wallclock_lag_metrics,
1438 );
1439
1440 self.collections.insert(new_collection, collection_state);
1443 let existing = self
1444 .collections
1445 .get_mut(&existing_collection)
1446 .expect("missing existing collection");
1447 assert!(matches!(
1448 existing.data_source,
1449 DataSource::Table { primary: None }
1450 ));
1451 existing.data_source = DataSource::Table {
1452 primary: Some(new_collection),
1453 };
1454
1455 self.persist_table_worker
1456 .register(register_ts, vec![(new_collection, write_handle)])
1457 .await
1458 .expect("table worker unexpectedly shut down");
1459
1460 self.append_shard_mappings([new_collection].into_iter(), Diff::ONE);
1461
1462 Ok(())
1463 }
1464
1465 fn export(
1466 &self,
1467 id: GlobalId,
1468 ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1469 self.collections
1470 .get(&id)
1471 .and_then(|c| match &c.extra_state {
1472 CollectionStateExtra::Export(state) => Some(state),
1473 _ => None,
1474 })
1475 .ok_or(StorageError::IdentifierMissing(id))
1476 }
1477
1478 fn export_mut(
1479 &mut self,
1480 id: GlobalId,
1481 ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1482 self.collections
1483 .get_mut(&id)
1484 .and_then(|c| match &mut c.extra_state {
1485 CollectionStateExtra::Export(state) => Some(state),
1486 _ => None,
1487 })
1488 .ok_or(StorageError::IdentifierMissing(id))
1489 }
1490
1491 async fn create_oneshot_ingestion(
1493 &mut self,
1494 ingestion_id: uuid::Uuid,
1495 collection_id: GlobalId,
1496 instance_id: StorageInstanceId,
1497 request: OneshotIngestionRequest,
1498 result_tx: OneshotResultCallback<ProtoBatch>,
1499 ) -> Result<(), StorageError<Self::Timestamp>> {
1500 let collection_meta = self
1501 .collections
1502 .get(&collection_id)
1503 .ok_or_else(|| StorageError::IdentifierMissing(collection_id))?
1504 .collection_metadata
1505 .clone();
1506 let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1507 StorageError::Generic(anyhow::anyhow!("missing cluster {instance_id}"))
1509 })?;
1510 let oneshot_cmd = RunOneshotIngestion {
1511 ingestion_id,
1512 collection_id,
1513 collection_meta,
1514 request,
1515 };
1516
1517 if !self.read_only {
1518 instance.send(StorageCommand::RunOneshotIngestion(Box::new(oneshot_cmd)));
1519 let pending = PendingOneshotIngestion {
1520 result_tx,
1521 cluster_id: instance_id,
1522 };
1523 let novel = self
1524 .pending_oneshot_ingestions
1525 .insert(ingestion_id, pending);
1526 assert_none!(novel);
1527 Ok(())
1528 } else {
1529 Err(StorageError::ReadOnly)
1530 }
1531 }
1532
1533 fn cancel_oneshot_ingestion(
1534 &mut self,
1535 ingestion_id: uuid::Uuid,
1536 ) -> Result<(), StorageError<Self::Timestamp>> {
1537 if self.read_only {
1538 return Err(StorageError::ReadOnly);
1539 }
1540
1541 let pending = self
1542 .pending_oneshot_ingestions
1543 .remove(&ingestion_id)
1544 .ok_or_else(|| {
1545 StorageError::Generic(anyhow::anyhow!("missing oneshot ingestion {ingestion_id}"))
1547 })?;
1548
1549 match self.instances.get_mut(&pending.cluster_id) {
1550 Some(instance) => {
1551 instance.send(StorageCommand::CancelOneshotIngestion(ingestion_id));
1552 }
1553 None => {
1554 mz_ore::soft_panic_or_log!(
1555 "canceling oneshot ingestion on non-existent cluster, ingestion {:?}, instance {}",
1556 ingestion_id,
1557 pending.cluster_id,
1558 );
1559 }
1560 }
1561 pending.cancel();
1563
1564 Ok(())
1565 }
1566
1567 async fn alter_export(
1568 &mut self,
1569 id: GlobalId,
1570 new_description: ExportDescription<Self::Timestamp>,
1571 ) -> Result<(), StorageError<Self::Timestamp>> {
1572 let from_id = new_description.sink.from;
1573
1574 let desired_read_holds = vec![from_id.clone(), id.clone()];
1577 let [input_hold, self_hold] = self
1578 .storage_collections
1579 .acquire_read_holds(desired_read_holds)
1580 .expect("missing dependency")
1581 .try_into()
1582 .expect("expected number of holds");
1583 let from_storage_metadata = self.storage_collections.collection_metadata(from_id)?;
1584 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1585
1586 let cur_export = self.export_mut(id)?;
1588 let input_readable = cur_export
1589 .write_frontier
1590 .iter()
1591 .all(|t| input_hold.since().less_than(t));
1592 if !input_readable {
1593 return Err(StorageError::ReadBeforeSince(from_id));
1594 }
1595
1596 let new_export = ExportState {
1597 read_capabilities: cur_export.read_capabilities.clone(),
1598 cluster_id: new_description.instance_id,
1599 derived_since: cur_export.derived_since.clone(),
1600 read_holds: [input_hold, self_hold],
1601 read_policy: cur_export.read_policy.clone(),
1602 write_frontier: cur_export.write_frontier.clone(),
1603 };
1604 *cur_export = new_export;
1605
1606 let cmd = RunSinkCommand {
1607 id,
1608 description: StorageSinkDesc {
1609 from: from_id,
1610 from_desc: new_description.sink.from_desc,
1611 connection: new_description.sink.connection,
1612 envelope: new_description.sink.envelope,
1613 as_of: new_description.sink.as_of,
1614 version: new_description.sink.version,
1615 from_storage_metadata,
1616 with_snapshot: new_description.sink.with_snapshot,
1617 to_storage_metadata,
1618 },
1619 };
1620
1621 let instance = self
1623 .instances
1624 .get_mut(&new_description.instance_id)
1625 .ok_or_else(|| StorageError::ExportInstanceMissing {
1626 storage_instance_id: new_description.instance_id,
1627 export_id: id,
1628 })?;
1629
1630 instance.send(StorageCommand::RunSink(Box::new(cmd)));
1631 Ok(())
1632 }
1633
1634 async fn alter_export_connections(
1636 &mut self,
1637 exports: BTreeMap<GlobalId, StorageSinkConnection>,
1638 ) -> Result<(), StorageError<Self::Timestamp>> {
1639 let mut updates_by_instance =
1640 BTreeMap::<StorageInstanceId, Vec<(RunSinkCommand<T>, ExportDescription<T>)>>::new();
1641
1642 for (id, connection) in exports {
1643 let (mut new_export_description, as_of): (ExportDescription<Self::Timestamp>, _) = {
1651 let export = &self.collections[&id];
1652 let DataSource::Sink { desc } = &export.data_source else {
1653 panic!("export exists")
1654 };
1655 let CollectionStateExtra::Export(state) = &export.extra_state else {
1656 panic!("export exists")
1657 };
1658 let export_description = desc.clone();
1659 let as_of = state.input_hold().since().clone();
1660
1661 (export_description, as_of)
1662 };
1663 let current_sink = new_export_description.sink.clone();
1664
1665 new_export_description.sink.connection = connection;
1666
1667 current_sink.alter_compatible(id, &new_export_description.sink)?;
1669
1670 let from_storage_metadata = self
1671 .storage_collections
1672 .collection_metadata(new_export_description.sink.from)?;
1673 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1674
1675 let cmd = RunSinkCommand {
1676 id,
1677 description: StorageSinkDesc {
1678 from: new_export_description.sink.from,
1679 from_desc: new_export_description.sink.from_desc.clone(),
1680 connection: new_export_description.sink.connection.clone(),
1681 envelope: new_export_description.sink.envelope,
1682 with_snapshot: new_export_description.sink.with_snapshot,
1683 version: new_export_description.sink.version,
1684 as_of: as_of.to_owned(),
1695 from_storage_metadata,
1696 to_storage_metadata,
1697 },
1698 };
1699
1700 let update = updates_by_instance
1701 .entry(new_export_description.instance_id)
1702 .or_default();
1703 update.push((cmd, new_export_description));
1704 }
1705
1706 for (instance_id, updates) in updates_by_instance {
1707 let mut export_updates = BTreeMap::new();
1708 let mut cmds = Vec::with_capacity(updates.len());
1709
1710 for (cmd, export_state) in updates {
1711 export_updates.insert(cmd.id, export_state);
1712 cmds.push(cmd);
1713 }
1714
1715 let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1717 StorageError::ExportInstanceMissing {
1718 storage_instance_id: instance_id,
1719 export_id: *export_updates
1720 .keys()
1721 .next()
1722 .expect("set of exports not empty"),
1723 }
1724 })?;
1725
1726 for cmd in cmds {
1727 instance.send(StorageCommand::RunSink(Box::new(cmd)));
1728 }
1729
1730 for (id, new_export_description) in export_updates {
1732 let Some(state) = self.collections.get_mut(&id) else {
1733 panic!("export known to exist")
1734 };
1735 let DataSource::Sink { desc } = &mut state.data_source else {
1736 panic!("export known to exist")
1737 };
1738 *desc = new_export_description;
1739 }
1740 }
1741
1742 Ok(())
1743 }
1744
1745 fn drop_tables(
1760 &mut self,
1761 storage_metadata: &StorageMetadata,
1762 identifiers: Vec<GlobalId>,
1763 ts: Self::Timestamp,
1764 ) -> Result<(), StorageError<Self::Timestamp>> {
1765 let (table_write_ids, data_source_ids): (Vec<_>, Vec<_>) = identifiers
1767 .into_iter()
1768 .partition(|id| match self.collections[id].data_source {
1769 DataSource::Table { .. } => true,
1770 DataSource::IngestionExport { .. } | DataSource::Webhook => false,
1771 _ => panic!("identifier is not a table: {}", id),
1772 });
1773
1774 if table_write_ids.len() > 0 {
1776 let drop_notif = self
1777 .persist_table_worker
1778 .drop_handles(table_write_ids.clone(), ts);
1779 let tx = self.pending_table_handle_drops_tx.clone();
1780 mz_ore::task::spawn(|| "table-cleanup".to_string(), async move {
1781 drop_notif.await;
1782 for identifier in table_write_ids {
1783 let _ = tx.send(identifier);
1784 }
1785 });
1786 }
1787
1788 if data_source_ids.len() > 0 {
1790 self.validate_collection_ids(data_source_ids.iter().cloned())?;
1791 self.drop_sources_unvalidated(storage_metadata, data_source_ids)?;
1792 }
1793
1794 Ok(())
1795 }
1796
1797 fn drop_sources(
1798 &mut self,
1799 storage_metadata: &StorageMetadata,
1800 identifiers: Vec<GlobalId>,
1801 ) -> Result<(), StorageError<Self::Timestamp>> {
1802 self.validate_collection_ids(identifiers.iter().cloned())?;
1803 self.drop_sources_unvalidated(storage_metadata, identifiers)
1804 }
1805
1806 fn drop_sources_unvalidated(
1807 &mut self,
1808 storage_metadata: &StorageMetadata,
1809 ids: Vec<GlobalId>,
1810 ) -> Result<(), StorageError<Self::Timestamp>> {
1811 let mut ingestions_to_execute = BTreeSet::new();
1814 let mut ingestions_to_drop = BTreeSet::new();
1815 let mut source_statistics_to_drop = Vec::new();
1816
1817 let mut collections_to_drop = Vec::new();
1821
1822 for id in ids.iter() {
1823 let metadata = storage_metadata.get_collection_shard::<T>(*id);
1824 mz_ore::soft_assert_or_log!(
1825 matches!(metadata, Err(StorageError::IdentifierMissing(_))),
1826 "dropping {id}, but drop was not synchronized with storage \
1827 controller via `synchronize_collections`"
1828 );
1829
1830 let collection_state = self.collections.get(id);
1831
1832 if let Some(collection_state) = collection_state {
1833 match collection_state.data_source {
1834 DataSource::Webhook => {
1835 let fut = self.collection_manager.unregister_collection(*id);
1838 mz_ore::task::spawn(|| format!("storage-webhook-cleanup-{id}"), fut);
1839
1840 collections_to_drop.push(*id);
1841 source_statistics_to_drop.push(*id);
1842 }
1843 DataSource::Ingestion(_) => {
1844 ingestions_to_drop.insert(*id);
1845 source_statistics_to_drop.push(*id);
1846 }
1847 DataSource::IngestionExport { ingestion_id, .. } => {
1848 ingestions_to_execute.insert(ingestion_id);
1855
1856 let ingestion_state = match self.collections.get_mut(&ingestion_id) {
1858 Some(ingestion_collection) => ingestion_collection,
1859 None => {
1861 tracing::error!(
1862 "primary source {ingestion_id} seemingly dropped before subsource {id}"
1863 );
1864 continue;
1865 }
1866 };
1867
1868 match &mut ingestion_state.data_source {
1869 DataSource::Ingestion(ingestion_desc) => {
1870 let removed = ingestion_desc.source_exports.remove(id);
1871 mz_ore::soft_assert_or_log!(
1872 removed.is_some(),
1873 "dropped subsource {id} already removed from source exports"
1874 );
1875 }
1876 _ => unreachable!(
1877 "SourceExport must only refer to primary sources that already exist"
1878 ),
1879 };
1880
1881 ingestions_to_drop.insert(*id);
1885 source_statistics_to_drop.push(*id);
1886 }
1887 DataSource::Progress | DataSource::Table { .. } | DataSource::Other => {
1888 collections_to_drop.push(*id);
1889 }
1890 DataSource::Introspection(_) | DataSource::Sink { .. } => {
1891 soft_panic_or_log!(
1894 "drop_sources called on a {:?} (id={id}))",
1895 collection_state.data_source,
1896 );
1897 }
1898 }
1899 }
1900 }
1901
1902 ingestions_to_execute.retain(|id| !ingestions_to_drop.contains(id));
1904 for ingestion_id in ingestions_to_execute {
1905 self.run_ingestion(ingestion_id)?;
1906 }
1907
1908 let ingestion_policies = ingestions_to_drop
1915 .iter()
1916 .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
1917 .collect();
1918
1919 tracing::debug!(
1920 ?ingestion_policies,
1921 "dropping sources by setting read hold policies"
1922 );
1923 self.set_hold_policies(ingestion_policies);
1924
1925 let shards_to_update: BTreeSet<_> = ingestions_to_drop
1927 .iter()
1928 .chain(collections_to_drop.iter())
1929 .cloned()
1930 .collect();
1931 self.append_shard_mappings(shards_to_update.into_iter(), Diff::MINUS_ONE);
1932
1933 let status_now = mz_ore::now::to_datetime((self.now)());
1934 let mut status_updates = vec![];
1935 for id in ingestions_to_drop.iter() {
1936 status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
1937 }
1938
1939 if !self.read_only {
1940 self.append_status_introspection_updates(
1941 IntrospectionType::SourceStatusHistory,
1942 status_updates,
1943 );
1944 }
1945
1946 {
1947 let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1948 for id in source_statistics_to_drop {
1949 source_statistics
1950 .source_statistics
1951 .retain(|(stats_id, _), _| stats_id != &id);
1952 source_statistics
1953 .webhook_statistics
1954 .retain(|stats_id, _| stats_id != &id);
1955 }
1956 }
1957
1958 for id in ingestions_to_drop.iter().chain(collections_to_drop.iter()) {
1960 tracing::info!(%id, "dropping collection state");
1961 let collection = self
1962 .collections
1963 .remove(id)
1964 .expect("list populated after checking that self.collections contains it");
1965
1966 let instance = match &collection.extra_state {
1967 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
1968 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
1969 CollectionStateExtra::None => None,
1970 }
1971 .and_then(|i| self.instances.get(&i));
1972
1973 if let Some(instance) = instance {
1977 let active_replicas = instance.get_active_replicas_for_object(id);
1978 if !active_replicas.is_empty() {
1979 match &collection.data_source {
1986 DataSource::Ingestion(ingestion_desc) => {
1987 if *id != ingestion_desc.remap_collection_id {
1988 self.dropped_objects.insert(
1989 ingestion_desc.remap_collection_id,
1990 active_replicas.clone(),
1991 );
1992 }
1993 }
1994 _ => {}
1995 }
1996
1997 self.dropped_objects.insert(*id, active_replicas);
1998 }
1999 }
2000 }
2001
2002 self.storage_collections
2004 .drop_collections_unvalidated(storage_metadata, ids);
2005
2006 Ok(())
2007 }
2008
2009 fn drop_sinks(
2011 &mut self,
2012 storage_metadata: &StorageMetadata,
2013 identifiers: Vec<GlobalId>,
2014 ) -> Result<(), StorageError<Self::Timestamp>> {
2015 self.validate_export_ids(identifiers.iter().cloned())?;
2016 self.drop_sinks_unvalidated(storage_metadata, identifiers);
2017 Ok(())
2018 }
2019
2020 fn drop_sinks_unvalidated(
2021 &mut self,
2022 storage_metadata: &StorageMetadata,
2023 mut sinks_to_drop: Vec<GlobalId>,
2024 ) {
2025 sinks_to_drop.retain(|id| self.export(*id).is_ok());
2027
2028 let drop_policy = sinks_to_drop
2035 .iter()
2036 .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
2037 .collect();
2038
2039 tracing::debug!(
2040 ?drop_policy,
2041 "dropping sources by setting read hold policies"
2042 );
2043 self.set_hold_policies(drop_policy);
2044
2045 let status_now = mz_ore::now::to_datetime((self.now)());
2052
2053 let mut status_updates = vec![];
2055 {
2056 let mut sink_statistics = self.sink_statistics.lock().expect("poisoned");
2057 for id in sinks_to_drop.iter() {
2058 status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
2059 sink_statistics.retain(|(stats_id, _), _| stats_id != id);
2060 }
2061 }
2062
2063 if !self.read_only {
2064 self.append_status_introspection_updates(
2065 IntrospectionType::SinkStatusHistory,
2066 status_updates,
2067 );
2068 }
2069
2070 for id in sinks_to_drop.iter() {
2072 tracing::info!(%id, "dropping export state");
2073 let collection = self
2074 .collections
2075 .remove(id)
2076 .expect("list populated after checking that self.collections contains it");
2077
2078 let instance = match &collection.extra_state {
2079 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
2080 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
2081 CollectionStateExtra::None => None,
2082 }
2083 .and_then(|i| self.instances.get(&i));
2084
2085 if let Some(instance) = instance {
2089 let active_replicas = instance.get_active_replicas_for_object(id);
2090 if !active_replicas.is_empty() {
2091 self.dropped_objects.insert(*id, active_replicas);
2092 }
2093 }
2094 }
2095
2096 self.storage_collections
2098 .drop_collections_unvalidated(storage_metadata, sinks_to_drop);
2099 }
2100
2101 #[instrument(level = "debug")]
2102 fn append_table(
2103 &mut self,
2104 write_ts: Self::Timestamp,
2105 advance_to: Self::Timestamp,
2106 commands: Vec<(GlobalId, Vec<TableData>)>,
2107 ) -> Result<
2108 tokio::sync::oneshot::Receiver<Result<(), StorageError<Self::Timestamp>>>,
2109 StorageError<Self::Timestamp>,
2110 > {
2111 if self.read_only {
2112 if !commands
2115 .iter()
2116 .all(|(id, _)| id.is_system() && self.migrated_storage_collections.contains(id))
2117 {
2118 return Err(StorageError::ReadOnly);
2119 }
2120 }
2121
2122 for (id, updates) in commands.iter() {
2124 if !updates.is_empty() {
2125 if !write_ts.less_than(&advance_to) {
2126 return Err(StorageError::UpdateBeyondUpper(*id));
2127 }
2128 }
2129 }
2130
2131 Ok(self
2132 .persist_table_worker
2133 .append(write_ts, advance_to, commands))
2134 }
2135
2136 fn monotonic_appender(
2137 &self,
2138 id: GlobalId,
2139 ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>> {
2140 self.collection_manager.monotonic_appender(id)
2141 }
2142
2143 fn webhook_statistics(
2144 &self,
2145 id: GlobalId,
2146 ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>> {
2147 let source_statistics = self.source_statistics.lock().expect("poisoned");
2149 source_statistics
2150 .webhook_statistics
2151 .get(&id)
2152 .cloned()
2153 .ok_or(StorageError::IdentifierMissing(id))
2154 }
2155
2156 async fn ready(&mut self) {
2157 if self.maintenance_scheduled {
2158 return;
2159 }
2160
2161 if !self.pending_table_handle_drops_rx.is_empty() {
2162 return;
2163 }
2164
2165 tokio::select! {
2166 Some(m) = self.instance_response_rx.recv() => {
2167 self.stashed_responses.push(m);
2168 while let Ok(m) = self.instance_response_rx.try_recv() {
2169 self.stashed_responses.push(m);
2170 }
2171 }
2172 _ = self.maintenance_ticker.tick() => {
2173 self.maintenance_scheduled = true;
2174 },
2175 };
2176 }
2177
2178 #[instrument(level = "debug")]
2179 fn process(
2180 &mut self,
2181 storage_metadata: &StorageMetadata,
2182 ) -> Result<Option<Response<T>>, anyhow::Error> {
2183 if self.maintenance_scheduled {
2185 self.maintain();
2186 self.maintenance_scheduled = false;
2187 }
2188
2189 for instance in self.instances.values_mut() {
2190 instance.rehydrate_failed_replicas();
2191 }
2192
2193 let mut status_updates = vec![];
2194 let mut updated_frontiers = BTreeMap::new();
2195
2196 let stashed_responses = std::mem::take(&mut self.stashed_responses);
2198 for resp in stashed_responses {
2199 match resp {
2200 (_replica_id, StorageResponse::FrontierUpper(id, upper)) => {
2201 self.update_write_frontier(id, &upper);
2202 updated_frontiers.insert(id, upper);
2203 }
2204 (replica_id, StorageResponse::DroppedId(id)) => {
2205 let replica_id = replica_id.expect("DroppedId from unknown replica");
2206 if let Some(remaining_replicas) = self.dropped_objects.get_mut(&id) {
2207 remaining_replicas.remove(&replica_id);
2208 if remaining_replicas.is_empty() {
2209 self.dropped_objects.remove(&id);
2210 }
2211 } else {
2212 soft_panic_or_log!("unexpected DroppedId for {id}");
2213 }
2214 }
2215 (replica_id, StorageResponse::StatisticsUpdates(source_stats, sink_stats)) => {
2216 {
2218 let replica_id = if let Some(replica_id) = replica_id {
2225 replica_id
2226 } else {
2227 tracing::error!(
2228 ?source_stats,
2229 "missing replica_id for source statistics update"
2230 );
2231 continue;
2232 };
2233
2234 let mut shared_stats = self.source_statistics.lock().expect("poisoned");
2235
2236 for stat in source_stats {
2237 let collection_id = stat.id.clone();
2238
2239 if self.collection(collection_id).is_err() {
2240 continue;
2243 }
2244
2245 let entry = shared_stats
2246 .source_statistics
2247 .entry((stat.id, Some(replica_id)));
2248
2249 match entry {
2250 btree_map::Entry::Vacant(vacant_entry) => {
2251 let mut stats = ControllerSourceStatistics::new(
2252 collection_id,
2253 Some(replica_id),
2254 );
2255 stats.incorporate(stat);
2256 vacant_entry.insert(stats);
2257 }
2258 btree_map::Entry::Occupied(mut occupied_entry) => {
2259 occupied_entry.get_mut().incorporate(stat);
2260 }
2261 }
2262 }
2263 }
2264
2265 {
2266 let replica_id = if let Some(replica_id) = replica_id {
2277 replica_id
2278 } else {
2279 tracing::error!(
2280 ?sink_stats,
2281 "missing replica_id for sink statistics update"
2282 );
2283 continue;
2284 };
2285
2286 let mut shared_stats = self.sink_statistics.lock().expect("poisoned");
2287
2288 for stat in sink_stats {
2289 let collection_id = stat.id.clone();
2290
2291 if self.collection(collection_id).is_err() {
2292 continue;
2295 }
2296
2297 let entry = shared_stats.entry((stat.id, Some(replica_id)));
2298
2299 match entry {
2300 btree_map::Entry::Vacant(vacant_entry) => {
2301 let mut stats =
2302 ControllerSinkStatistics::new(collection_id, replica_id);
2303 stats.incorporate(stat);
2304 vacant_entry.insert(stats);
2305 }
2306 btree_map::Entry::Occupied(mut occupied_entry) => {
2307 occupied_entry.get_mut().incorporate(stat);
2308 }
2309 }
2310 }
2311 }
2312 }
2313 (replica_id, StorageResponse::StatusUpdate(mut status_update)) => {
2314 match status_update.status {
2330 Status::Running => {
2331 let collection = self.collections.get_mut(&status_update.id);
2332 match collection {
2333 Some(collection) => {
2334 match collection.extra_state {
2335 CollectionStateExtra::Ingestion(
2336 ref mut ingestion_state,
2337 ) => {
2338 if ingestion_state.hydrated_on.is_empty() {
2339 tracing::debug!(ingestion_id = %status_update.id, "ingestion is hydrated");
2340 }
2341 ingestion_state.hydrated_on.insert(replica_id.expect(
2342 "replica id should be present for status running",
2343 ));
2344 }
2345 CollectionStateExtra::Export(_) => {
2346 }
2348 CollectionStateExtra::None => {
2349 }
2351 }
2352 }
2353 None => (), }
2356 }
2357 Status::Paused => {
2358 let collection = self.collections.get_mut(&status_update.id);
2359 match collection {
2360 Some(collection) => {
2361 match collection.extra_state {
2362 CollectionStateExtra::Ingestion(
2363 ref mut ingestion_state,
2364 ) => {
2365 tracing::debug!(ingestion_id = %status_update.id, "ingestion is now paused");
2372 ingestion_state.hydrated_on.clear();
2373 }
2374 CollectionStateExtra::Export(_) => {
2375 }
2377 CollectionStateExtra::None => {
2378 }
2380 }
2381 }
2382 None => (), }
2385 }
2386 _ => (),
2387 }
2388
2389 if let Some(id) = replica_id {
2391 status_update.replica_id = Some(id);
2392 }
2393 status_updates.push(status_update);
2394 }
2395 (_replica_id, StorageResponse::StagedBatches(batches)) => {
2396 for (ingestion_id, batches) in batches {
2397 match self.pending_oneshot_ingestions.remove(&ingestion_id) {
2398 Some(pending) => {
2399 if let Some(instance) = self.instances.get_mut(&pending.cluster_id)
2402 {
2403 instance
2404 .send(StorageCommand::CancelOneshotIngestion(ingestion_id));
2405 }
2406 (pending.result_tx)(batches)
2408 }
2409 None => {
2410 }
2413 }
2414 }
2415 }
2416 }
2417 }
2418
2419 self.record_status_updates(status_updates);
2420
2421 let mut dropped_table_ids = Vec::new();
2423 while let Ok(dropped_id) = self.pending_table_handle_drops_rx.try_recv() {
2424 dropped_table_ids.push(dropped_id);
2425 }
2426 if !dropped_table_ids.is_empty() {
2427 self.drop_sources(storage_metadata, dropped_table_ids)?;
2428 }
2429
2430 if updated_frontiers.is_empty() {
2431 Ok(None)
2432 } else {
2433 Ok(Some(Response::FrontierUpdates(
2434 updated_frontiers.into_iter().collect(),
2435 )))
2436 }
2437 }
2438
2439 async fn inspect_persist_state(
2440 &self,
2441 id: GlobalId,
2442 ) -> Result<serde_json::Value, anyhow::Error> {
2443 let collection = &self.storage_collections.collection_metadata(id)?;
2444 let client = self
2445 .persist
2446 .open(collection.persist_location.clone())
2447 .await?;
2448 let shard_state = client
2449 .inspect_shard::<Self::Timestamp>(&collection.data_shard)
2450 .await?;
2451 let json_state = serde_json::to_value(shard_state)?;
2452 Ok(json_state)
2453 }
2454
2455 fn append_introspection_updates(
2456 &mut self,
2457 type_: IntrospectionType,
2458 updates: Vec<(Row, Diff)>,
2459 ) {
2460 let id = self.introspection_ids[&type_];
2461 let updates = updates.into_iter().map(|update| update.into()).collect();
2462 self.collection_manager.blind_write(id, updates);
2463 }
2464
2465 fn append_status_introspection_updates(
2466 &mut self,
2467 type_: IntrospectionType,
2468 updates: Vec<StatusUpdate>,
2469 ) {
2470 let id = self.introspection_ids[&type_];
2471 let updates: Vec<_> = updates.into_iter().map(|update| update.into()).collect();
2472 if !updates.is_empty() {
2473 self.collection_manager.blind_write(id, updates);
2474 }
2475 }
2476
2477 fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp) {
2478 let id = self.introspection_ids[&type_];
2479 self.collection_manager.differential_write(id, op);
2480 }
2481
2482 fn append_only_introspection_tx(
2483 &self,
2484 type_: IntrospectionType,
2485 ) -> mpsc::UnboundedSender<(
2486 Vec<AppendOnlyUpdate>,
2487 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2488 )> {
2489 let id = self.introspection_ids[&type_];
2490 self.collection_manager.append_only_write_sender(id)
2491 }
2492
2493 fn differential_introspection_tx(
2494 &self,
2495 type_: IntrospectionType,
2496 ) -> mpsc::UnboundedSender<(
2497 StorageWriteOp,
2498 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2499 )> {
2500 let id = self.introspection_ids[&type_];
2501 self.collection_manager.differential_write_sender(id)
2502 }
2503
2504 async fn real_time_recent_timestamp(
2505 &self,
2506 timestamp_objects: BTreeSet<GlobalId>,
2507 timeout: Duration,
2508 ) -> Result<
2509 BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>,
2510 StorageError<Self::Timestamp>,
2511 > {
2512 use mz_storage_types::sources::GenericSourceConnection;
2513
2514 let mut rtr_futures = BTreeMap::new();
2515
2516 for id in timestamp_objects.into_iter().filter(GlobalId::is_user) {
2518 let collection = match self.collection(id) {
2519 Ok(c) => c,
2520 Err(_) => continue,
2522 };
2523
2524 let (source_conn, remap_id) = match &collection.data_source {
2525 DataSource::Ingestion(IngestionDescription {
2526 desc: SourceDesc { connection, .. },
2527 remap_collection_id,
2528 ..
2529 }) => match connection {
2530 GenericSourceConnection::Kafka(_)
2531 | GenericSourceConnection::Postgres(_)
2532 | GenericSourceConnection::MySql(_)
2533 | GenericSourceConnection::SqlServer(_) => {
2534 (connection.clone(), *remap_collection_id)
2535 }
2536
2537 GenericSourceConnection::LoadGenerator(_) => continue,
2542 },
2543 _ => {
2545 continue;
2546 }
2547 };
2548
2549 let config = self.config().clone();
2551
2552 let read_handle = self.read_handle_for_snapshot(remap_id).await?;
2560
2561 let remap_read_hold = self
2564 .storage_collections
2565 .acquire_read_holds(vec![remap_id])
2566 .map_err(|_e| StorageError::ReadBeforeSince(remap_id))?
2567 .expect_element(|| "known to be exactly one");
2568
2569 let remap_as_of = remap_read_hold
2570 .since()
2571 .to_owned()
2572 .into_option()
2573 .ok_or(StorageError::ReadBeforeSince(remap_id))?;
2574
2575 rtr_futures.insert(
2576 id,
2577 tokio::time::timeout(timeout, async move {
2578 use mz_storage_types::sources::SourceConnection as _;
2579
2580 let as_of = Antichain::from_elem(remap_as_of);
2583 let remap_subscribe = read_handle
2584 .subscribe(as_of.clone())
2585 .await
2586 .map_err(|_| StorageError::ReadBeforeSince(remap_id))?;
2587
2588 tracing::debug!(?id, type_ = source_conn.name(), upstream = ?source_conn.external_reference(), "fetching real time recency");
2589
2590 let result = rtr::real_time_recency_ts(source_conn, id, config, as_of, remap_subscribe)
2591 .await.map_err(|e| {
2592 tracing::debug!(?id, "real time recency error: {:?}", e);
2593 e
2594 });
2595
2596 drop(remap_read_hold);
2598
2599 result
2600 }),
2601 );
2602 }
2603
2604 Ok(Box::pin(async move {
2605 let (ids, futs): (Vec<_>, Vec<_>) = rtr_futures.into_iter().unzip();
2606 ids.into_iter()
2607 .zip_eq(futures::future::join_all(futs).await)
2608 .try_fold(T::minimum(), |curr, (id, per_source_res)| {
2609 let new =
2610 per_source_res.map_err(|_e: Elapsed| StorageError::RtrTimeout(id))??;
2611 Ok::<_, StorageError<Self::Timestamp>>(std::cmp::max(curr, new))
2612 })
2613 }))
2614 }
2615}
2616
2617pub fn prepare_initialization<T>(txn: &mut dyn StorageTxn<T>) -> Result<(), StorageError<T>> {
2624 if txn.get_txn_wal_shard().is_none() {
2625 let txns_id = ShardId::new();
2626 txn.write_txn_wal_shard(txns_id)?;
2627 }
2628
2629 Ok(())
2630}
2631
2632impl<T> Controller<T>
2633where
2634 T: Timestamp
2635 + Lattice
2636 + TotalOrder
2637 + Codec64
2638 + From<EpochMillis>
2639 + TimestampManipulation
2640 + Into<Datum<'static>>,
2641 Self: StorageController<Timestamp = T>,
2642{
2643 pub async fn new(
2651 build_info: &'static BuildInfo,
2652 persist_location: PersistLocation,
2653 persist_clients: Arc<PersistClientCache>,
2654 now: NowFn,
2655 wallclock_lag: WallclockLagFn<T>,
2656 txns_metrics: Arc<TxnMetrics>,
2657 read_only: bool,
2658 metrics_registry: &MetricsRegistry,
2659 controller_metrics: ControllerMetrics,
2660 connection_context: ConnectionContext,
2661 txn: &dyn StorageTxn<T>,
2662 storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
2663 ) -> Self {
2664 let txns_client = persist_clients
2665 .open(persist_location.clone())
2666 .await
2667 .expect("location should be valid");
2668
2669 let persist_warm_task = warm_persist_state_in_background(
2670 txns_client.clone(),
2671 txn.get_collection_metadata().into_values(),
2672 );
2673 let persist_warm_task = Some(persist_warm_task.abort_on_drop());
2674
2675 let txns_id = txn
2679 .get_txn_wal_shard()
2680 .expect("must call prepare initialization before creating storage controller");
2681
2682 let persist_table_worker = if read_only {
2683 let txns_write = txns_client
2684 .open_writer(
2685 txns_id,
2686 Arc::new(TxnsCodecRow::desc()),
2687 Arc::new(UnitSchema),
2688 Diagnostics {
2689 shard_name: "txns".to_owned(),
2690 handle_purpose: "follow txns upper".to_owned(),
2691 },
2692 )
2693 .await
2694 .expect("txns schema shouldn't change");
2695 persist_handles::PersistTableWriteWorker::new_read_only_mode(txns_write)
2696 } else {
2697 let txns = TxnsHandle::open(
2698 T::minimum(),
2699 txns_client.clone(),
2700 txns_client.dyncfgs().clone(),
2701 Arc::clone(&txns_metrics),
2702 txns_id,
2703 )
2704 .await;
2705 persist_handles::PersistTableWriteWorker::new_txns(txns)
2706 };
2707 let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
2708
2709 let collection_manager = collection_mgmt::CollectionManager::new(read_only, now.clone());
2710
2711 let introspection_ids = BTreeMap::new();
2712 let introspection_tokens = Arc::new(Mutex::new(BTreeMap::new()));
2713
2714 let (statistics_interval_sender, _) =
2715 channel(mz_storage_types::parameters::STATISTICS_INTERVAL_DEFAULT);
2716
2717 let (pending_table_handle_drops_tx, pending_table_handle_drops_rx) =
2718 tokio::sync::mpsc::unbounded_channel();
2719
2720 let mut maintenance_ticker = tokio::time::interval(Duration::from_secs(1));
2721 maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2722
2723 let (instance_response_tx, instance_response_rx) = mpsc::unbounded_channel();
2724
2725 let metrics = StorageControllerMetrics::new(metrics_registry, controller_metrics);
2726
2727 let now_dt = mz_ore::now::to_datetime(now());
2728
2729 Self {
2730 build_info,
2731 collections: BTreeMap::default(),
2732 dropped_objects: Default::default(),
2733 persist_table_worker,
2734 txns_read,
2735 txns_metrics,
2736 stashed_responses: vec![],
2737 pending_table_handle_drops_tx,
2738 pending_table_handle_drops_rx,
2739 pending_oneshot_ingestions: BTreeMap::default(),
2740 collection_manager,
2741 introspection_ids,
2742 introspection_tokens,
2743 now,
2744 read_only,
2745 source_statistics: Arc::new(Mutex::new(statistics::SourceStatistics {
2746 source_statistics: BTreeMap::new(),
2747 webhook_statistics: BTreeMap::new(),
2748 })),
2749 sink_statistics: Arc::new(Mutex::new(BTreeMap::new())),
2750 statistics_interval_sender,
2751 instances: BTreeMap::new(),
2752 initialized: false,
2753 config: StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs()),
2754 persist_location,
2755 persist: persist_clients,
2756 metrics,
2757 recorded_frontiers: BTreeMap::new(),
2758 recorded_replica_frontiers: BTreeMap::new(),
2759 wallclock_lag,
2760 wallclock_lag_last_recorded: now_dt,
2761 storage_collections,
2762 migrated_storage_collections: BTreeSet::new(),
2763 maintenance_ticker,
2764 maintenance_scheduled: false,
2765 instance_response_rx,
2766 instance_response_tx,
2767 persist_warm_task,
2768 }
2769 }
2770
2771 #[instrument(level = "debug")]
2779 fn set_hold_policies(&mut self, policies: Vec<(GlobalId, ReadPolicy<T>)>) {
2780 let mut read_capability_changes = BTreeMap::default();
2781
2782 for (id, policy) in policies.into_iter() {
2783 if let Some(collection) = self.collections.get_mut(&id) {
2784 let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state
2785 {
2786 CollectionStateExtra::Ingestion(ingestion) => (
2787 ingestion.write_frontier.borrow(),
2788 &mut ingestion.derived_since,
2789 &mut ingestion.hold_policy,
2790 ),
2791 CollectionStateExtra::None => {
2792 unreachable!("set_hold_policies is only called for ingestions");
2793 }
2794 CollectionStateExtra::Export(export) => (
2795 export.write_frontier.borrow(),
2796 &mut export.derived_since,
2797 &mut export.read_policy,
2798 ),
2799 };
2800
2801 let new_derived_since = policy.frontier(write_frontier);
2802 let mut update = swap_updates(derived_since, new_derived_since);
2803 if !update.is_empty() {
2804 read_capability_changes.insert(id, update);
2805 }
2806
2807 *hold_policy = policy;
2808 }
2809 }
2810
2811 if !read_capability_changes.is_empty() {
2812 self.update_hold_capabilities(&mut read_capability_changes);
2813 }
2814 }
2815
2816 #[instrument(level = "debug", fields(updates))]
2817 fn update_write_frontier(&mut self, id: GlobalId, new_upper: &Antichain<T>) {
2818 let mut read_capability_changes = BTreeMap::default();
2819
2820 if let Some(collection) = self.collections.get_mut(&id) {
2821 let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state {
2822 CollectionStateExtra::Ingestion(ingestion) => (
2823 &mut ingestion.write_frontier,
2824 &mut ingestion.derived_since,
2825 &ingestion.hold_policy,
2826 ),
2827 CollectionStateExtra::None => {
2828 if matches!(collection.data_source, DataSource::Progress) {
2829 } else {
2831 tracing::error!(
2832 ?collection,
2833 ?new_upper,
2834 "updated write frontier for collection which is not an ingestion"
2835 );
2836 }
2837 return;
2838 }
2839 CollectionStateExtra::Export(export) => (
2840 &mut export.write_frontier,
2841 &mut export.derived_since,
2842 &export.read_policy,
2843 ),
2844 };
2845
2846 if PartialOrder::less_than(write_frontier, new_upper) {
2847 write_frontier.clone_from(new_upper);
2848 }
2849
2850 let new_derived_since = hold_policy.frontier(write_frontier.borrow());
2851 let mut update = swap_updates(derived_since, new_derived_since);
2852 if !update.is_empty() {
2853 read_capability_changes.insert(id, update);
2854 }
2855 } else if self.dropped_objects.contains_key(&id) {
2856 } else {
2859 soft_panic_or_log!("spurious upper update for {id}: {new_upper:?}");
2860 }
2861
2862 if !read_capability_changes.is_empty() {
2863 self.update_hold_capabilities(&mut read_capability_changes);
2864 }
2865 }
2866
2867 #[instrument(level = "debug", fields(updates))]
2871 fn update_hold_capabilities(&mut self, updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>) {
2872 let mut collections_net = BTreeMap::new();
2874
2875 while let Some(key) = updates.keys().rev().next().cloned() {
2880 let mut update = updates.remove(&key).unwrap();
2881
2882 if key.is_user() {
2883 debug!(id = %key, ?update, "update_hold_capability");
2884 }
2885
2886 if let Some(collection) = self.collections.get_mut(&key) {
2887 match &mut collection.extra_state {
2888 CollectionStateExtra::Ingestion(ingestion) => {
2889 let changes = ingestion.read_capabilities.update_iter(update.drain());
2890 update.extend(changes);
2891
2892 let (changes, frontier, _cluster_id) =
2893 collections_net.entry(key).or_insert_with(|| {
2894 (
2895 <ChangeBatch<_>>::new(),
2896 Antichain::new(),
2897 ingestion.instance_id,
2898 )
2899 });
2900
2901 changes.extend(update.drain());
2902 *frontier = ingestion.read_capabilities.frontier().to_owned();
2903 }
2904 CollectionStateExtra::None => {
2905 soft_panic_or_log!(
2907 "trying to update holds for collection {collection:?} which is not \
2908 an ingestion: {update:?}"
2909 );
2910 continue;
2911 }
2912 CollectionStateExtra::Export(export) => {
2913 let changes = export.read_capabilities.update_iter(update.drain());
2914 update.extend(changes);
2915
2916 let (changes, frontier, _cluster_id) =
2917 collections_net.entry(key).or_insert_with(|| {
2918 (<ChangeBatch<_>>::new(), Antichain::new(), export.cluster_id)
2919 });
2920
2921 changes.extend(update.drain());
2922 *frontier = export.read_capabilities.frontier().to_owned();
2923 }
2924 }
2925 } else {
2926 tracing::warn!(id = ?key, ?update, "update_hold_capabilities for unknown object");
2928 }
2929 }
2930
2931 for (key, (mut changes, frontier, cluster_id)) in collections_net {
2934 if !changes.is_empty() {
2935 if key.is_user() {
2936 debug!(id = %key, ?frontier, "downgrading ingestion read holds!");
2937 }
2938
2939 let collection = self
2940 .collections
2941 .get_mut(&key)
2942 .expect("missing collection state");
2943
2944 let read_holds = match &mut collection.extra_state {
2945 CollectionStateExtra::Ingestion(ingestion) => {
2946 ingestion.dependency_read_holds.as_mut_slice()
2947 }
2948 CollectionStateExtra::Export(export) => export.read_holds.as_mut_slice(),
2949 CollectionStateExtra::None => {
2950 soft_panic_or_log!(
2951 "trying to downgrade read holds for collection which is not an \
2952 ingestion: {collection:?}"
2953 );
2954 continue;
2955 }
2956 };
2957
2958 for read_hold in read_holds.iter_mut() {
2959 read_hold
2960 .try_downgrade(frontier.clone())
2961 .expect("we only advance the frontier");
2962 }
2963
2964 if let Some(instance) = self.instances.get_mut(&cluster_id) {
2966 instance.send(StorageCommand::AllowCompaction(key, frontier.clone()));
2967 } else {
2968 soft_panic_or_log!(
2969 "missing instance client for cluster {cluster_id} while we still have outstanding AllowCompaction command {frontier:?} for {key}"
2970 );
2971 }
2972 }
2973 }
2974 }
2975
2976 fn validate_collection_ids(
2978 &self,
2979 ids: impl Iterator<Item = GlobalId>,
2980 ) -> Result<(), StorageError<T>> {
2981 for id in ids {
2982 self.storage_collections.check_exists(id)?;
2983 }
2984 Ok(())
2985 }
2986
2987 fn validate_export_ids(
2989 &self,
2990 ids: impl Iterator<Item = GlobalId>,
2991 ) -> Result<(), StorageError<T>> {
2992 for id in ids {
2993 self.export(id)?;
2994 }
2995 Ok(())
2996 }
2997
2998 async fn open_data_handles(
3006 &self,
3007 id: &GlobalId,
3008 shard: ShardId,
3009 relation_desc: RelationDesc,
3010 persist_client: &PersistClient,
3011 ) -> WriteHandle<SourceData, (), T, StorageDiff> {
3012 let diagnostics = Diagnostics {
3013 shard_name: id.to_string(),
3014 handle_purpose: format!("controller data for {}", id),
3015 };
3016
3017 let mut write = persist_client
3018 .open_writer(
3019 shard,
3020 Arc::new(relation_desc),
3021 Arc::new(UnitSchema),
3022 diagnostics.clone(),
3023 )
3024 .await
3025 .expect("invalid persist usage");
3026
3027 write.fetch_recent_upper().await;
3036
3037 write
3038 }
3039
3040 fn register_introspection_collection(
3045 &mut self,
3046 id: GlobalId,
3047 introspection_type: IntrospectionType,
3048 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
3049 persist_client: PersistClient,
3050 ) -> Result<(), StorageError<T>> {
3051 tracing::info!(%id, ?introspection_type, "registering introspection collection");
3052
3053 let force_writable = self.read_only && self.migrated_storage_collections.contains(&id);
3057 if force_writable {
3058 assert!(id.is_system(), "unexpected non-system global id: {id:?}");
3059 info!("writing to migrated storage collection {id} in read-only mode");
3060 }
3061
3062 let prev = self.introspection_ids.insert(introspection_type, id);
3063 assert!(
3064 prev.is_none(),
3065 "cannot have multiple IDs for introspection type"
3066 );
3067
3068 let metadata = self.storage_collections.collection_metadata(id)?.clone();
3069
3070 let read_handle_fn = move || {
3071 let persist_client = persist_client.clone();
3072 let metadata = metadata.clone();
3073
3074 let fut = async move {
3075 let read_handle = persist_client
3076 .open_leased_reader::<SourceData, (), T, StorageDiff>(
3077 metadata.data_shard,
3078 Arc::new(metadata.relation_desc.clone()),
3079 Arc::new(UnitSchema),
3080 Diagnostics {
3081 shard_name: id.to_string(),
3082 handle_purpose: format!("snapshot {}", id),
3083 },
3084 USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3085 )
3086 .await
3087 .expect("invalid persist usage");
3088 read_handle
3089 };
3090
3091 fut.boxed()
3092 };
3093
3094 let recent_upper = write_handle.shared_upper();
3095
3096 match CollectionManagerKind::from(&introspection_type) {
3097 CollectionManagerKind::Differential => {
3102 let statistics_retention_duration =
3103 dyncfgs::STATISTICS_RETENTION_DURATION.get(self.config().config_set());
3104
3105 let introspection_config = DifferentialIntrospectionConfig {
3107 recent_upper,
3108 introspection_type,
3109 storage_collections: Arc::clone(&self.storage_collections),
3110 collection_manager: self.collection_manager.clone(),
3111 source_statistics: Arc::clone(&self.source_statistics),
3112 sink_statistics: Arc::clone(&self.sink_statistics),
3113 statistics_interval: self.config.parameters.statistics_interval.clone(),
3114 statistics_interval_receiver: self.statistics_interval_sender.subscribe(),
3115 statistics_retention_duration,
3116 metrics: self.metrics.clone(),
3117 introspection_tokens: Arc::clone(&self.introspection_tokens),
3118 };
3119 self.collection_manager.register_differential_collection(
3120 id,
3121 write_handle,
3122 read_handle_fn,
3123 force_writable,
3124 introspection_config,
3125 );
3126 }
3127 CollectionManagerKind::AppendOnly => {
3135 let introspection_config = AppendOnlyIntrospectionConfig {
3136 introspection_type,
3137 config_set: Arc::clone(self.config.config_set()),
3138 parameters: self.config.parameters.clone(),
3139 storage_collections: Arc::clone(&self.storage_collections),
3140 };
3141 self.collection_manager.register_append_only_collection(
3142 id,
3143 write_handle,
3144 force_writable,
3145 Some(introspection_config),
3146 );
3147 }
3148 }
3149
3150 Ok(())
3151 }
3152
3153 fn reconcile_dangling_statistics(&self) {
3156 self.source_statistics
3157 .lock()
3158 .expect("poisoned")
3159 .source_statistics
3160 .retain(|(k, _replica_id), _| self.storage_collections.check_exists(*k).is_ok());
3162 self.sink_statistics
3163 .lock()
3164 .expect("poisoned")
3165 .retain(|(k, _replica_id), _| self.export(*k).is_ok());
3166 }
3167
3168 #[instrument(level = "debug")]
3178 fn append_shard_mappings<I>(&self, global_ids: I, diff: Diff)
3179 where
3180 I: Iterator<Item = GlobalId>,
3181 {
3182 mz_ore::soft_assert_or_log!(
3183 diff == Diff::MINUS_ONE || diff == Diff::ONE,
3184 "use 1 for insert or -1 for delete"
3185 );
3186
3187 let id = *self
3188 .introspection_ids
3189 .get(&IntrospectionType::ShardMapping)
3190 .expect("should be registered before this call");
3191
3192 let mut updates = vec![];
3193 let mut row_buf = Row::default();
3195
3196 for global_id in global_ids {
3197 let shard_id = if let Some(collection) = self.collections.get(&global_id) {
3198 collection.collection_metadata.data_shard.clone()
3199 } else {
3200 panic!("unknown global id: {}", global_id);
3201 };
3202
3203 let mut packer = row_buf.packer();
3204 packer.push(Datum::from(global_id.to_string().as_str()));
3205 packer.push(Datum::from(shard_id.to_string().as_str()));
3206 updates.push((row_buf.clone(), diff));
3207 }
3208
3209 self.collection_manager.differential_append(id, updates);
3210 }
3211
3212 fn determine_collection_dependencies(
3214 &self,
3215 self_id: GlobalId,
3216 data_source: &DataSource<T>,
3217 ) -> Result<Vec<GlobalId>, StorageError<T>> {
3218 let dependency = match &data_source {
3219 DataSource::Introspection(_)
3220 | DataSource::Webhook
3221 | DataSource::Table { primary: None }
3222 | DataSource::Progress
3223 | DataSource::Other => vec![],
3224 DataSource::Table {
3225 primary: Some(primary),
3226 } => vec![*primary],
3227 DataSource::IngestionExport { ingestion_id, .. } => {
3228 let source_collection = self.collection(*ingestion_id)?;
3231 let ingestion_remap_collection_id = match &source_collection.data_source {
3232 DataSource::Ingestion(ingestion) => ingestion.remap_collection_id,
3233 _ => unreachable!(
3234 "SourceExport must only refer to primary sources that already exist"
3235 ),
3236 };
3237
3238 vec![self_id, ingestion_remap_collection_id]
3244 }
3245 DataSource::Ingestion(ingestion) => {
3247 let mut dependencies = vec![self_id];
3252 if self_id != ingestion.remap_collection_id {
3253 dependencies.push(ingestion.remap_collection_id);
3254 }
3255 dependencies
3256 }
3257 DataSource::Sink { desc } => {
3258 vec![self_id, desc.sink.from]
3260 }
3261 };
3262
3263 Ok(dependency)
3264 }
3265
3266 async fn read_handle_for_snapshot(
3267 &self,
3268 id: GlobalId,
3269 ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
3270 let metadata = self.storage_collections.collection_metadata(id)?;
3271 read_handle_for_snapshot(&self.persist, id, &metadata).await
3272 }
3273
3274 fn record_status_updates(&mut self, updates: Vec<StatusUpdate>) {
3277 if self.read_only {
3278 return;
3279 }
3280
3281 let mut sink_status_updates = vec![];
3282 let mut source_status_updates = vec![];
3283
3284 for update in updates {
3285 let id = update.id;
3286 if self.export(id).is_ok() {
3287 sink_status_updates.push(update);
3288 } else if self.storage_collections.check_exists(id).is_ok() {
3289 source_status_updates.push(update);
3290 }
3291 }
3292
3293 self.append_status_introspection_updates(
3294 IntrospectionType::SourceStatusHistory,
3295 source_status_updates,
3296 );
3297 self.append_status_introspection_updates(
3298 IntrospectionType::SinkStatusHistory,
3299 sink_status_updates,
3300 );
3301 }
3302
3303 fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, StorageError<T>> {
3304 self.collections
3305 .get(&id)
3306 .ok_or(StorageError::IdentifierMissing(id))
3307 }
3308
3309 fn run_ingestion(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3312 tracing::info!(%id, "starting ingestion");
3313
3314 let collection = self.collection(id)?;
3315 let ingestion_description = match &collection.data_source {
3316 DataSource::Ingestion(i) => i.clone(),
3317 _ => {
3318 tracing::warn!("run_ingestion called on non-ingestion ID {}", id);
3319 Err(StorageError::IdentifierInvalid(id))?
3320 }
3321 };
3322
3323 let mut source_exports = BTreeMap::new();
3325 for (export_id, export) in ingestion_description.source_exports.clone() {
3326 let export_storage_metadata = self.collection(export_id)?.collection_metadata.clone();
3327 source_exports.insert(
3328 export_id,
3329 SourceExport {
3330 storage_metadata: export_storage_metadata,
3331 details: export.details,
3332 data_config: export.data_config,
3333 },
3334 );
3335 }
3336
3337 let remap_collection = self.collection(ingestion_description.remap_collection_id)?;
3338
3339 let description = IngestionDescription::<CollectionMetadata> {
3340 source_exports,
3341 remap_metadata: remap_collection.collection_metadata.clone(),
3342 desc: ingestion_description.desc.clone(),
3344 instance_id: ingestion_description.instance_id,
3345 remap_collection_id: ingestion_description.remap_collection_id,
3346 };
3347
3348 let storage_instance_id = description.instance_id;
3349 let instance = self
3351 .instances
3352 .get_mut(&storage_instance_id)
3353 .ok_or_else(|| StorageError::IngestionInstanceMissing {
3354 storage_instance_id,
3355 ingestion_id: id,
3356 })?;
3357
3358 let augmented_ingestion = Box::new(RunIngestionCommand { id, description });
3359 instance.send(StorageCommand::RunIngestion(augmented_ingestion));
3360
3361 Ok(())
3362 }
3363
3364 fn run_export(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3367 let DataSource::Sink { desc: description } = &self.collections[&id].data_source else {
3368 return Err(StorageError::IdentifierMissing(id));
3369 };
3370
3371 let from_storage_metadata = self
3372 .storage_collections
3373 .collection_metadata(description.sink.from)?;
3374 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
3375
3376 let export_state = self.storage_collections.collection_frontiers(id)?;
3380 let mut as_of = description.sink.as_of.clone();
3381 as_of.join_assign(&export_state.implied_capability);
3382 let with_snapshot = description.sink.with_snapshot
3383 && !PartialOrder::less_than(&as_of, &export_state.write_frontier);
3384
3385 info!(
3386 sink_id = %id,
3387 from_id = %description.sink.from,
3388 write_frontier = ?export_state.write_frontier,
3389 ?as_of,
3390 ?with_snapshot,
3391 "run_export"
3392 );
3393
3394 let cmd = RunSinkCommand {
3395 id,
3396 description: StorageSinkDesc {
3397 from: description.sink.from,
3398 from_desc: description.sink.from_desc.clone(),
3399 connection: description.sink.connection.clone(),
3400 envelope: description.sink.envelope,
3401 as_of,
3402 version: description.sink.version,
3403 from_storage_metadata,
3404 with_snapshot,
3405 to_storage_metadata,
3406 },
3407 };
3408
3409 let storage_instance_id = description.instance_id.clone();
3410
3411 let instance = self
3412 .instances
3413 .get_mut(&storage_instance_id)
3414 .ok_or_else(|| StorageError::ExportInstanceMissing {
3415 storage_instance_id,
3416 export_id: id,
3417 })?;
3418
3419 instance.send(StorageCommand::RunSink(Box::new(cmd)));
3420
3421 Ok(())
3422 }
3423
3424 fn update_frontier_introspection(&mut self) {
3429 let mut global_frontiers = BTreeMap::new();
3430 let mut replica_frontiers = BTreeMap::new();
3431
3432 for collection_frontiers in self.storage_collections.active_collection_frontiers() {
3433 let id = collection_frontiers.id;
3434 let since = collection_frontiers.read_capabilities;
3435 let upper = collection_frontiers.write_frontier;
3436
3437 let instance = self
3438 .collections
3439 .get(&id)
3440 .and_then(|collection_state| match &collection_state.extra_state {
3441 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
3442 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
3443 CollectionStateExtra::None => None,
3444 })
3445 .and_then(|i| self.instances.get(&i));
3446
3447 if let Some(instance) = instance {
3448 for replica_id in instance.replica_ids() {
3449 replica_frontiers.insert((id, replica_id), upper.clone());
3450 }
3451 }
3452
3453 global_frontiers.insert(id, (since, upper));
3454 }
3455
3456 let mut global_updates = Vec::new();
3457 let mut replica_updates = Vec::new();
3458
3459 let mut push_global_update =
3460 |id: GlobalId, (since, upper): (Antichain<T>, Antichain<T>), diff: Diff| {
3461 let read_frontier = since.into_option().map_or(Datum::Null, |t| t.into());
3462 let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3463 let row = Row::pack_slice(&[
3464 Datum::String(&id.to_string()),
3465 read_frontier,
3466 write_frontier,
3467 ]);
3468 global_updates.push((row, diff));
3469 };
3470
3471 let mut push_replica_update =
3472 |(id, replica_id): (GlobalId, ReplicaId), upper: Antichain<T>, diff: Diff| {
3473 let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3474 let row = Row::pack_slice(&[
3475 Datum::String(&id.to_string()),
3476 Datum::String(&replica_id.to_string()),
3477 write_frontier,
3478 ]);
3479 replica_updates.push((row, diff));
3480 };
3481
3482 let mut old_global_frontiers =
3483 std::mem::replace(&mut self.recorded_frontiers, global_frontiers);
3484 for (&id, new) in &self.recorded_frontiers {
3485 match old_global_frontiers.remove(&id) {
3486 Some(old) if &old != new => {
3487 push_global_update(id, new.clone(), Diff::ONE);
3488 push_global_update(id, old, Diff::MINUS_ONE);
3489 }
3490 Some(_) => (),
3491 None => push_global_update(id, new.clone(), Diff::ONE),
3492 }
3493 }
3494 for (id, old) in old_global_frontiers {
3495 push_global_update(id, old, Diff::MINUS_ONE);
3496 }
3497
3498 let mut old_replica_frontiers =
3499 std::mem::replace(&mut self.recorded_replica_frontiers, replica_frontiers);
3500 for (&key, new) in &self.recorded_replica_frontiers {
3501 match old_replica_frontiers.remove(&key) {
3502 Some(old) if &old != new => {
3503 push_replica_update(key, new.clone(), Diff::ONE);
3504 push_replica_update(key, old, Diff::MINUS_ONE);
3505 }
3506 Some(_) => (),
3507 None => push_replica_update(key, new.clone(), Diff::ONE),
3508 }
3509 }
3510 for (key, old) in old_replica_frontiers {
3511 push_replica_update(key, old, Diff::MINUS_ONE);
3512 }
3513
3514 let id = self.introspection_ids[&IntrospectionType::Frontiers];
3515 self.collection_manager
3516 .differential_append(id, global_updates);
3517
3518 let id = self.introspection_ids[&IntrospectionType::ReplicaFrontiers];
3519 self.collection_manager
3520 .differential_append(id, replica_updates);
3521 }
3522
3523 fn refresh_wallclock_lag(&mut self) {
3542 let now_ms = (self.now)();
3543 let histogram_period =
3544 WallclockLagHistogramPeriod::from_epoch_millis(now_ms, self.config.config_set());
3545
3546 let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
3547 Some(ts) => (self.wallclock_lag)(ts.clone()),
3548 None => Duration::ZERO,
3549 };
3550
3551 for frontiers in self.storage_collections.active_collection_frontiers() {
3552 let id = frontiers.id;
3553 let Some(collection) = self.collections.get_mut(&id) else {
3554 continue;
3555 };
3556
3557 let collection_unreadable =
3558 PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
3559 let lag = if collection_unreadable {
3560 WallclockLag::Undefined
3561 } else {
3562 let lag = frontier_lag(&frontiers.write_frontier);
3563 WallclockLag::Seconds(lag.as_secs())
3564 };
3565
3566 collection.wallclock_lag_max = collection.wallclock_lag_max.max(lag);
3567
3568 let secs = lag.unwrap_seconds_or(u64::MAX);
3571 collection.wallclock_lag_metrics.observe(secs);
3572
3573 if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
3574 let bucket = lag.map_seconds(|secs| secs.next_power_of_two());
3575
3576 let instance_id = match &collection.extra_state {
3577 CollectionStateExtra::Ingestion(i) => Some(i.instance_id),
3578 CollectionStateExtra::Export(e) => Some(e.cluster_id()),
3579 CollectionStateExtra::None => None,
3580 };
3581 let workload_class = instance_id
3582 .and_then(|id| self.instances.get(&id))
3583 .and_then(|i| i.workload_class.clone());
3584 let labels = match workload_class {
3585 Some(wc) => [("workload_class", wc.clone())].into(),
3586 None => BTreeMap::new(),
3587 };
3588
3589 let key = (histogram_period, bucket, labels);
3590 *stash.entry(key).or_default() += Diff::ONE;
3591 }
3592 }
3593
3594 self.maybe_record_wallclock_lag();
3596 }
3597
3598 fn maybe_record_wallclock_lag(&mut self) {
3606 if self.read_only {
3607 return;
3608 }
3609
3610 let duration_trunc = |datetime: DateTime<_>, interval| {
3611 let td = TimeDelta::from_std(interval).ok()?;
3612 datetime.duration_trunc(td).ok()
3613 };
3614
3615 let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(self.config.config_set());
3616 let now_dt = mz_ore::now::to_datetime((self.now)());
3617 let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
3618 soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
3619 let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
3620 duration_trunc(now_dt, *default).unwrap()
3621 });
3622 if now_trunc <= self.wallclock_lag_last_recorded {
3623 return;
3624 }
3625
3626 let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
3627
3628 let mut history_updates = Vec::new();
3629 let mut histogram_updates = Vec::new();
3630 let mut row_buf = Row::default();
3631 for frontiers in self.storage_collections.active_collection_frontiers() {
3632 let id = frontiers.id;
3633 let Some(collection) = self.collections.get_mut(&id) else {
3634 continue;
3635 };
3636
3637 let max_lag = std::mem::replace(&mut collection.wallclock_lag_max, WallclockLag::MIN);
3638 let row = Row::pack_slice(&[
3639 Datum::String(&id.to_string()),
3640 Datum::Null,
3641 max_lag.into_interval_datum(),
3642 Datum::TimestampTz(now_ts),
3643 ]);
3644 history_updates.push((row, Diff::ONE));
3645
3646 let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
3647 continue;
3648 };
3649
3650 for ((period, lag, labels), count) in std::mem::take(stash) {
3651 let mut packer = row_buf.packer();
3652 packer.extend([
3653 Datum::TimestampTz(period.start),
3654 Datum::TimestampTz(period.end),
3655 Datum::String(&id.to_string()),
3656 lag.into_uint64_datum(),
3657 ]);
3658 let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
3659 packer.push_dict(labels);
3660
3661 histogram_updates.push((row_buf.clone(), count));
3662 }
3663 }
3664
3665 if !history_updates.is_empty() {
3666 self.append_introspection_updates(
3667 IntrospectionType::WallclockLagHistory,
3668 history_updates,
3669 );
3670 }
3671 if !histogram_updates.is_empty() {
3672 self.append_introspection_updates(
3673 IntrospectionType::WallclockLagHistogram,
3674 histogram_updates,
3675 );
3676 }
3677
3678 self.wallclock_lag_last_recorded = now_trunc;
3679 }
3680
3681 fn maintain(&mut self) {
3686 self.update_frontier_introspection();
3687 self.refresh_wallclock_lag();
3688
3689 for instance in self.instances.values_mut() {
3691 instance.refresh_state_metrics();
3692 }
3693 }
3694}
3695
3696impl From<&IntrospectionType> for CollectionManagerKind {
3697 fn from(value: &IntrospectionType) -> Self {
3698 match value {
3699 IntrospectionType::ShardMapping
3700 | IntrospectionType::Frontiers
3701 | IntrospectionType::ReplicaFrontiers
3702 | IntrospectionType::StorageSourceStatistics
3703 | IntrospectionType::StorageSinkStatistics
3704 | IntrospectionType::ComputeDependencies
3705 | IntrospectionType::ComputeOperatorHydrationStatus
3706 | IntrospectionType::ComputeMaterializedViewRefreshes
3707 | IntrospectionType::ComputeErrorCounts
3708 | IntrospectionType::ComputeHydrationTimes => CollectionManagerKind::Differential,
3709
3710 IntrospectionType::SourceStatusHistory
3711 | IntrospectionType::SinkStatusHistory
3712 | IntrospectionType::PrivatelinkConnectionStatusHistory
3713 | IntrospectionType::ReplicaStatusHistory
3714 | IntrospectionType::ReplicaMetricsHistory
3715 | IntrospectionType::WallclockLagHistory
3716 | IntrospectionType::WallclockLagHistogram
3717 | IntrospectionType::PreparedStatementHistory
3718 | IntrospectionType::StatementExecutionHistory
3719 | IntrospectionType::SessionHistory
3720 | IntrospectionType::StatementLifecycleHistory
3721 | IntrospectionType::SqlText => CollectionManagerKind::AppendOnly,
3722 }
3723 }
3724}
3725
3726async fn snapshot_statistics<T>(
3732 id: GlobalId,
3733 upper: Antichain<T>,
3734 storage_collections: &Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
3735) -> Vec<Row>
3736where
3737 T: Codec64 + From<EpochMillis> + TimestampManipulation,
3738{
3739 match upper.as_option() {
3740 Some(f) if f > &T::minimum() => {
3741 let as_of = f.step_back().unwrap();
3742
3743 let snapshot = storage_collections.snapshot(id, as_of).await.unwrap();
3744 snapshot
3745 .into_iter()
3746 .map(|(row, diff)| {
3747 assert_eq!(diff, 1);
3748 row
3749 })
3750 .collect()
3751 }
3752 _ => Vec::new(),
3755 }
3756}
3757
3758async fn read_handle_for_snapshot<T>(
3759 persist: &PersistClientCache,
3760 id: GlobalId,
3761 metadata: &CollectionMetadata,
3762) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>>
3763where
3764 T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
3765{
3766 let persist_client = persist
3767 .open(metadata.persist_location.clone())
3768 .await
3769 .unwrap();
3770
3771 let read_handle = persist_client
3776 .open_leased_reader::<SourceData, (), _, _>(
3777 metadata.data_shard,
3778 Arc::new(metadata.relation_desc.clone()),
3779 Arc::new(UnitSchema),
3780 Diagnostics {
3781 shard_name: id.to_string(),
3782 handle_purpose: format!("snapshot {}", id),
3783 },
3784 USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3785 )
3786 .await
3787 .expect("invalid persist usage");
3788 Ok(read_handle)
3789}
3790
3791#[derive(Debug)]
3793struct CollectionState<T: TimelyTimestamp> {
3794 pub data_source: DataSource<T>,
3796
3797 pub collection_metadata: CollectionMetadata,
3798
3799 pub extra_state: CollectionStateExtra<T>,
3800
3801 wallclock_lag_max: WallclockLag,
3803 wallclock_lag_histogram_stash: Option<
3810 BTreeMap<
3811 (
3812 WallclockLagHistogramPeriod,
3813 WallclockLag,
3814 BTreeMap<&'static str, String>,
3815 ),
3816 Diff,
3817 >,
3818 >,
3819 wallclock_lag_metrics: WallclockLagMetrics,
3821}
3822
3823impl<T: TimelyTimestamp> CollectionState<T> {
3824 fn new(
3825 data_source: DataSource<T>,
3826 collection_metadata: CollectionMetadata,
3827 extra_state: CollectionStateExtra<T>,
3828 wallclock_lag_metrics: WallclockLagMetrics,
3829 ) -> Self {
3830 let wallclock_lag_histogram_stash = match &data_source {
3834 DataSource::Other => None,
3835 _ => Some(Default::default()),
3836 };
3837
3838 Self {
3839 data_source,
3840 collection_metadata,
3841 extra_state,
3842 wallclock_lag_max: WallclockLag::MIN,
3843 wallclock_lag_histogram_stash,
3844 wallclock_lag_metrics,
3845 }
3846 }
3847}
3848
3849#[derive(Debug)]
3851enum CollectionStateExtra<T: TimelyTimestamp> {
3852 Ingestion(IngestionState<T>),
3853 Export(ExportState<T>),
3854 None,
3855}
3856
3857#[derive(Debug)]
3859struct IngestionState<T: TimelyTimestamp> {
3860 pub read_capabilities: MutableAntichain<T>,
3862
3863 pub derived_since: Antichain<T>,
3866
3867 pub dependency_read_holds: Vec<ReadHold<T>>,
3869
3870 pub write_frontier: Antichain<T>,
3872
3873 pub hold_policy: ReadPolicy<T>,
3880
3881 pub instance_id: StorageInstanceId,
3883
3884 pub hydrated_on: BTreeSet<ReplicaId>,
3886}
3887
3888struct StatusHistoryDesc<K> {
3893 retention_policy: StatusHistoryRetentionPolicy,
3894 extract_key: Box<dyn Fn(&[Datum]) -> K + Send>,
3895 extract_time: Box<dyn Fn(&[Datum]) -> CheckedTimestamp<DateTime<Utc>> + Send>,
3896}
3897enum StatusHistoryRetentionPolicy {
3898 LastN(usize),
3900 TimeWindow(Duration),
3902}
3903
3904fn source_status_history_desc(
3905 params: &StorageParameters,
3906) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3907 let desc = &MZ_SOURCE_STATUS_HISTORY_DESC;
3908 let (source_id_idx, _) = desc.get_by_name(&"source_id".into()).expect("exists");
3909 let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3910 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3911
3912 StatusHistoryDesc {
3913 retention_policy: StatusHistoryRetentionPolicy::LastN(
3914 params.keep_n_source_status_history_entries,
3915 ),
3916 extract_key: Box::new(move |datums| {
3917 (
3918 GlobalId::from_str(datums[source_id_idx].unwrap_str()).expect("GlobalId column"),
3919 if datums[replica_id_idx].is_null() {
3920 None
3921 } else {
3922 Some(
3923 ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3924 .expect("ReplicaId column"),
3925 )
3926 },
3927 )
3928 }),
3929 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3930 }
3931}
3932
3933fn sink_status_history_desc(
3934 params: &StorageParameters,
3935) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3936 let desc = &MZ_SINK_STATUS_HISTORY_DESC;
3937 let (sink_id_idx, _) = desc.get_by_name(&"sink_id".into()).expect("exists");
3938 let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3939 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3940
3941 StatusHistoryDesc {
3942 retention_policy: StatusHistoryRetentionPolicy::LastN(
3943 params.keep_n_sink_status_history_entries,
3944 ),
3945 extract_key: Box::new(move |datums| {
3946 (
3947 GlobalId::from_str(datums[sink_id_idx].unwrap_str()).expect("GlobalId column"),
3948 if datums[replica_id_idx].is_null() {
3949 None
3950 } else {
3951 Some(
3952 ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3953 .expect("ReplicaId column"),
3954 )
3955 },
3956 )
3957 }),
3958 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3959 }
3960}
3961
3962fn privatelink_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<GlobalId> {
3963 let desc = &MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC;
3964 let (key_idx, _) = desc.get_by_name(&"connection_id".into()).expect("exists");
3965 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3966
3967 StatusHistoryDesc {
3968 retention_policy: StatusHistoryRetentionPolicy::LastN(
3969 params.keep_n_privatelink_status_history_entries,
3970 ),
3971 extract_key: Box::new(move |datums| {
3972 GlobalId::from_str(datums[key_idx].unwrap_str()).expect("GlobalId column")
3973 }),
3974 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3975 }
3976}
3977
3978fn replica_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<(GlobalId, u64)> {
3979 let desc = &REPLICA_STATUS_HISTORY_DESC;
3980 let (replica_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3981 let (process_idx, _) = desc.get_by_name(&"process_id".into()).expect("exists");
3982 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3983
3984 StatusHistoryDesc {
3985 retention_policy: StatusHistoryRetentionPolicy::TimeWindow(
3986 params.replica_status_history_retention_window,
3987 ),
3988 extract_key: Box::new(move |datums| {
3989 (
3990 GlobalId::from_str(datums[replica_idx].unwrap_str()).expect("GlobalId column"),
3991 datums[process_idx].unwrap_uint64(),
3992 )
3993 }),
3994 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3995 }
3996}
3997
3998fn swap_updates<T: Timestamp>(
4000 from: &mut Antichain<T>,
4001 mut replace_with: Antichain<T>,
4002) -> ChangeBatch<T> {
4003 let mut update = ChangeBatch::new();
4004 if PartialOrder::less_equal(from, &replace_with) {
4005 update.extend(replace_with.iter().map(|time| (time.clone(), 1)));
4006 std::mem::swap(from, &mut replace_with);
4007 update.extend(replace_with.iter().map(|time| (time.clone(), -1)));
4008 }
4009 update
4010}