1use std::any::Any;
13use std::collections::btree_map;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt::{Debug, Display};
16use std::str::FromStr;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use async_trait::async_trait;
21use chrono::{DateTime, DurationRound, TimeDelta, Utc};
22use derivative::Derivative;
23use differential_dataflow::lattice::Lattice;
24use futures::FutureExt;
25use futures::StreamExt;
26use itertools::Itertools;
27use mz_build_info::BuildInfo;
28use mz_cluster_client::client::ClusterReplicaLocation;
29use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
30use mz_cluster_client::{ReplicaId, WallclockLagFn};
31use mz_controller_types::dyncfgs::{
32 ENABLE_0DT_DEPLOYMENT_SOURCES, WALLCLOCK_LAG_RECORDING_INTERVAL,
33};
34use mz_ore::collections::CollectionExt;
35use mz_ore::metrics::MetricsRegistry;
36use mz_ore::now::{EpochMillis, NowFn};
37use mz_ore::task::AbortOnDropHandle;
38use mz_ore::{assert_none, halt, instrument, soft_panic_or_log};
39use mz_persist_client::batch::ProtoBatch;
40use mz_persist_client::cache::PersistClientCache;
41use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
42use mz_persist_client::read::ReadHandle;
43use mz_persist_client::schema::CaESchema;
44use mz_persist_client::write::WriteHandle;
45use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
46use mz_persist_types::Codec64;
47use mz_persist_types::codec_impls::UnitSchema;
48use mz_repr::adt::timestamp::CheckedTimestamp;
49use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
50use mz_storage_client::client::{
51 AppendOnlyUpdate, RunIngestionCommand, RunOneshotIngestion, RunSinkCommand, Status,
52 StatusUpdate, StorageCommand, StorageResponse, TableData,
53};
54use mz_storage_client::controller::{
55 BoxFuture, CollectionDescription, DataSource, ExportDescription, ExportState,
56 IntrospectionType, MonotonicAppender, PersistEpoch, Response, StorageController,
57 StorageMetadata, StorageTxn, StorageWriteOp, WallclockLag, WallclockLagHistogramPeriod,
58};
59use mz_storage_client::healthcheck::{
60 MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC,
61 MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_STATUS_HISTORY_DESC,
62};
63use mz_storage_client::metrics::StorageControllerMetrics;
64use mz_storage_client::statistics::{
65 ControllerSinkStatistics, ControllerSourceStatistics, WebhookStatistics,
66};
67use mz_storage_client::storage_collections::StorageCollections;
68use mz_storage_types::configuration::StorageConfiguration;
69use mz_storage_types::connections::ConnectionContext;
70use mz_storage_types::connections::inline::InlinedConnection;
71use mz_storage_types::controller::{AlterError, CollectionMetadata, StorageError, TxnsCodecRow};
72use mz_storage_types::instances::StorageInstanceId;
73use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
74use mz_storage_types::parameters::StorageParameters;
75use mz_storage_types::read_holds::ReadHold;
76use mz_storage_types::read_policy::ReadPolicy;
77use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
78use mz_storage_types::sources::{
79 GenericSourceConnection, IngestionDescription, SourceConnection, SourceData, SourceDesc,
80 SourceExport, SourceExportDataConfig,
81};
82use mz_storage_types::{AlterCompatible, StorageDiff, dyncfgs};
83use mz_txn_wal::metrics::Metrics as TxnMetrics;
84use mz_txn_wal::txn_read::TxnsRead;
85use mz_txn_wal::txns::TxnsHandle;
86use timely::order::{PartialOrder, TotalOrder};
87use timely::progress::Timestamp as TimelyTimestamp;
88use timely::progress::frontier::MutableAntichain;
89use timely::progress::{Antichain, ChangeBatch, Timestamp};
90use tokio::sync::watch::{Sender, channel};
91use tokio::sync::{mpsc, oneshot};
92use tokio::time::MissedTickBehavior;
93use tokio::time::error::Elapsed;
94use tracing::{debug, info, warn};
95
96use crate::collection_mgmt::{
97 AppendOnlyIntrospectionConfig, CollectionManagerKind, DifferentialIntrospectionConfig,
98};
99use crate::instance::{Instance, ReplicaConfig};
100
101mod collection_mgmt;
102mod history;
103mod instance;
104mod persist_handles;
105mod rtr;
106mod statistics;
107
108#[derive(Derivative)]
109#[derivative(Debug)]
110struct PendingOneshotIngestion {
111 #[derivative(Debug = "ignore")]
113 result_tx: OneshotResultCallback<ProtoBatch>,
114 cluster_id: StorageInstanceId,
116}
117
118impl PendingOneshotIngestion {
119 pub(crate) fn cancel(self) {
123 (self.result_tx)(vec![Err("canceled".to_string())])
124 }
125}
126
127#[derive(Derivative)]
129#[derivative(Debug)]
130pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation>
131{
132 build_info: &'static BuildInfo,
134 now: NowFn,
136
137 read_only: bool,
143
144 pub(crate) collections: BTreeMap<GlobalId, CollectionState<T>>,
149
150 dropped_objects: BTreeMap<GlobalId, BTreeSet<ReplicaId>>,
159
160 pub(crate) persist_table_worker: persist_handles::PersistTableWriteWorker<T>,
162 txns_read: TxnsRead<T>,
164 txns_metrics: Arc<TxnMetrics>,
165 stashed_responses: Vec<(Option<ReplicaId>, StorageResponse<T>)>,
166 #[derivative(Debug = "ignore")]
168 pending_table_handle_drops_tx: mpsc::UnboundedSender<GlobalId>,
169 #[derivative(Debug = "ignore")]
171 pending_table_handle_drops_rx: mpsc::UnboundedReceiver<GlobalId>,
172 #[derivative(Debug = "ignore")]
174 pending_oneshot_ingestions: BTreeMap<uuid::Uuid, PendingOneshotIngestion>,
175
176 pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
178
179 pub(crate) introspection_ids: BTreeMap<IntrospectionType, GlobalId>,
181 introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
186
187 source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
192 sink_statistics: Arc<Mutex<BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSinkStatistics>>>,
195 statistics_interval_sender: Sender<Duration>,
197
198 instances: BTreeMap<StorageInstanceId, Instance<T>>,
200 initialized: bool,
202 config: StorageConfiguration,
204 persist_location: PersistLocation,
206 persist: Arc<PersistClientCache>,
208 metrics: StorageControllerMetrics,
210 recorded_frontiers: BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)>,
213 recorded_replica_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<T>>,
216
217 #[derivative(Debug = "ignore")]
219 wallclock_lag: WallclockLagFn<T>,
220 wallclock_lag_last_recorded: DateTime<Utc>,
222
223 storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
225 migrated_storage_collections: BTreeSet<GlobalId>,
227
228 maintenance_ticker: tokio::time::Interval,
230 maintenance_scheduled: bool,
232
233 instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
235 instance_response_rx: mpsc::UnboundedReceiver<(Option<ReplicaId>, StorageResponse<T>)>,
237
238 persist_warm_task: Option<AbortOnDropHandle<Box<dyn Debug + Send>>>,
240}
241
242fn warm_persist_state_in_background(
247 client: PersistClient,
248 shard_ids: impl Iterator<Item = ShardId> + Send + 'static,
249) -> mz_ore::task::JoinHandle<Box<dyn Debug + Send>> {
250 const MAX_CONCURRENT_WARMS: usize = 16;
252 let logic = async move {
253 let fetchers: Vec<_> = tokio_stream::iter(shard_ids)
254 .map(|shard_id| {
255 let client = client.clone();
256 async move {
257 client
258 .create_batch_fetcher::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
259 shard_id,
260 Arc::new(RelationDesc::empty()),
261 Arc::new(UnitSchema),
262 true,
263 Diagnostics::from_purpose("warm persist load state"),
264 )
265 .await
266 }
267 })
268 .buffer_unordered(MAX_CONCURRENT_WARMS)
269 .collect()
270 .await;
271 let fetchers: Box<dyn Debug + Send> = Box::new(fetchers);
272 fetchers
273 };
274 mz_ore::task::spawn(|| "warm_persist_load_state", logic)
275}
276
277#[async_trait(?Send)]
278impl<T> StorageController for Controller<T>
279where
280 T: Timestamp
281 + Lattice
282 + TotalOrder
283 + Codec64
284 + From<EpochMillis>
285 + TimestampManipulation
286 + Into<Datum<'static>>
287 + Display,
288{
289 type Timestamp = T;
290
291 fn initialization_complete(&mut self) {
292 self.reconcile_dangling_statistics();
293 self.initialized = true;
294
295 for instance in self.instances.values_mut() {
296 instance.send(StorageCommand::InitializationComplete);
297 }
298 }
299
300 fn update_parameters(&mut self, config_params: StorageParameters) {
301 self.storage_collections
302 .update_parameters(config_params.clone());
303
304 self.persist.cfg().apply_from(&config_params.dyncfg_updates);
307
308 for instance in self.instances.values_mut() {
309 let params = Box::new(config_params.clone());
310 instance.send(StorageCommand::UpdateConfiguration(params));
311 }
312 self.config.update(config_params);
313 self.statistics_interval_sender
314 .send_replace(self.config.parameters.statistics_interval);
315 self.collection_manager.update_user_batch_duration(
316 self.config
317 .parameters
318 .user_storage_managed_collections_batch_duration,
319 );
320 }
321
322 fn config(&self) -> &StorageConfiguration {
324 &self.config
325 }
326
327 fn collection_metadata(
328 &self,
329 id: GlobalId,
330 ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>> {
331 self.storage_collections.collection_metadata(id)
332 }
333
334 fn collection_hydrated(
335 &self,
336 collection_id: GlobalId,
337 ) -> Result<bool, StorageError<Self::Timestamp>> {
338 let collection = self.collection(collection_id)?;
339
340 let instance_id = match &collection.data_source {
341 DataSource::Ingestion(ingestion_description) => ingestion_description.instance_id,
342 DataSource::IngestionExport { ingestion_id, .. } => {
343 let ingestion_state = self.collections.get(ingestion_id).expect("known to exist");
344
345 let instance_id = match &ingestion_state.data_source {
346 DataSource::Ingestion(ingestion_desc) => ingestion_desc.instance_id,
347 _ => unreachable!("SourceExport must only refer to primary source"),
348 };
349
350 instance_id
351 }
352 _ => return Ok(true),
353 };
354
355 let instance = self.instances.get(&instance_id).ok_or_else(|| {
356 StorageError::IngestionInstanceMissing {
357 storage_instance_id: instance_id,
358 ingestion_id: collection_id,
359 }
360 })?;
361
362 if instance.replica_ids().next().is_none() {
363 return Ok(true);
366 }
367
368 match &collection.extra_state {
369 CollectionStateExtra::Ingestion(ingestion_state) => {
370 Ok(ingestion_state.hydrated_on.len() >= 1)
372 }
373 CollectionStateExtra::Export(_) => {
374 Ok(true)
379 }
380 CollectionStateExtra::None => {
381 Ok(true)
385 }
386 }
387 }
388
389 #[mz_ore::instrument(level = "debug")]
390 fn collections_hydrated_on_replicas(
391 &self,
392 target_replica_ids: Option<Vec<ReplicaId>>,
393 target_cluster_id: &StorageInstanceId,
394 exclude_collections: &BTreeSet<GlobalId>,
395 ) -> Result<bool, StorageError<Self::Timestamp>> {
396 if target_replica_ids.as_ref().is_some_and(|v| v.is_empty()) {
399 return Ok(true);
400 }
401
402 let target_replicas: Option<BTreeSet<ReplicaId>> =
405 target_replica_ids.map(|ids| ids.into_iter().collect());
406
407 let mut all_hydrated = true;
408 for (collection_id, collection_state) in self.collections.iter() {
409 if collection_id.is_transient() || exclude_collections.contains(collection_id) {
410 continue;
411 }
412 let hydrated = match &collection_state.extra_state {
413 CollectionStateExtra::Ingestion(state) => {
414 if &state.instance_id != target_cluster_id {
415 continue;
416 }
417 match &target_replicas {
418 Some(target_replicas) => !state.hydrated_on.is_disjoint(target_replicas),
419 None => {
420 state.hydrated_on.len() >= 1
423 }
424 }
425 }
426 CollectionStateExtra::Export(_) => {
427 true
432 }
433 CollectionStateExtra::None => {
434 true
438 }
439 };
440 if !hydrated {
441 tracing::info!(%collection_id, "collection is not hydrated on any replica");
442 all_hydrated = false;
443 }
446 }
447 Ok(all_hydrated)
448 }
449
450 fn collection_frontiers(
451 &self,
452 id: GlobalId,
453 ) -> Result<
454 (Antichain<Self::Timestamp>, Antichain<Self::Timestamp>),
455 StorageError<Self::Timestamp>,
456 > {
457 let frontiers = self.storage_collections.collection_frontiers(id)?;
458 Ok((frontiers.implied_capability, frontiers.write_frontier))
459 }
460
461 fn collections_frontiers(
462 &self,
463 mut ids: Vec<GlobalId>,
464 ) -> Result<Vec<(GlobalId, Antichain<T>, Antichain<T>)>, StorageError<Self::Timestamp>> {
465 let mut result = vec![];
466 ids.retain(|&id| match self.export(id) {
471 Ok(export) => {
472 result.push((
473 id,
474 export.input_hold().since().clone(),
475 export.write_frontier.clone(),
476 ));
477 false
478 }
479 Err(_) => true,
480 });
481 result.extend(
482 self.storage_collections
483 .collections_frontiers(ids)?
484 .into_iter()
485 .map(|frontiers| {
486 (
487 frontiers.id,
488 frontiers.implied_capability,
489 frontiers.write_frontier,
490 )
491 }),
492 );
493
494 Ok(result)
495 }
496
497 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
498 self.storage_collections.active_collection_metadatas()
499 }
500
501 fn active_ingestion_exports(
502 &self,
503 instance_id: StorageInstanceId,
504 ) -> Box<dyn Iterator<Item = &GlobalId> + '_> {
505 let active_storage_collections: BTreeMap<_, _> = self
506 .storage_collections
507 .active_collection_frontiers()
508 .into_iter()
509 .map(|c| (c.id, c))
510 .collect();
511
512 let active_exports = self.instances[&instance_id]
513 .active_ingestion_exports()
514 .filter(move |id| {
515 let frontiers = active_storage_collections.get(id);
516 match frontiers {
517 Some(frontiers) => !frontiers.write_frontier.is_empty(),
518 None => {
519 false
521 }
522 }
523 });
524
525 Box::new(active_exports)
526 }
527
528 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
529 self.storage_collections.check_exists(id)
530 }
531
532 fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>) {
533 let metrics = self.metrics.for_instance(id);
534 let mut instance = Instance::new(
535 workload_class,
536 metrics,
537 self.now.clone(),
538 self.instance_response_tx.clone(),
539 );
540 if self.initialized {
541 instance.send(StorageCommand::InitializationComplete);
542 }
543 if !self.read_only {
544 instance.send(StorageCommand::AllowWrites);
545 }
546
547 let params = Box::new(self.config.parameters.clone());
548 instance.send(StorageCommand::UpdateConfiguration(params));
549
550 let old_instance = self.instances.insert(id, instance);
551 assert_none!(old_instance, "storage instance {id} already exists");
552 }
553
554 fn drop_instance(&mut self, id: StorageInstanceId) {
555 let instance = self.instances.remove(&id);
556 assert!(instance.is_some(), "storage instance {id} does not exist");
557 }
558
559 fn update_instance_workload_class(
560 &mut self,
561 id: StorageInstanceId,
562 workload_class: Option<String>,
563 ) {
564 let instance = self
565 .instances
566 .get_mut(&id)
567 .unwrap_or_else(|| panic!("instance {id} does not exist"));
568
569 instance.workload_class = workload_class;
570 }
571
572 fn connect_replica(
573 &mut self,
574 instance_id: StorageInstanceId,
575 replica_id: ReplicaId,
576 location: ClusterReplicaLocation,
577 ) {
578 let instance = self
579 .instances
580 .get_mut(&instance_id)
581 .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
582
583 let config = ReplicaConfig {
584 build_info: self.build_info,
585 location,
586 grpc_client: self.config.parameters.grpc_client.clone(),
587 };
588 instance.add_replica(replica_id, config);
589 }
590
591 fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId) {
592 let instance = self
593 .instances
594 .get_mut(&instance_id)
595 .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
596
597 let status_now = mz_ore::now::to_datetime((self.now)());
598 let mut source_status_updates = vec![];
599 let mut sink_status_updates = vec![];
600
601 let make_update = |id, object_type| StatusUpdate {
602 id,
603 status: Status::Paused,
604 timestamp: status_now,
605 error: None,
606 hints: BTreeSet::from([format!(
607 "The replica running this {object_type} has been dropped"
608 )]),
609 namespaced_errors: Default::default(),
610 replica_id: Some(replica_id),
611 };
612
613 for id in instance.active_ingestions() {
614 if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
615 active_replicas.remove(&replica_id);
616 if active_replicas.is_empty() {
617 self.dropped_objects.remove(id);
618 }
619 }
620
621 let ingestion = self
622 .collections
623 .get_mut(id)
624 .expect("instance contains unknown ingestion");
625
626 let ingestion_description = match &ingestion.data_source {
627 DataSource::Ingestion(ingestion_description) => ingestion_description.clone(),
628 _ => panic!(
629 "unexpected data source for ingestion: {:?}",
630 ingestion.data_source
631 ),
632 };
633
634 let subsource_ids = ingestion_description
644 .collection_ids()
645 .filter(|id| id != &ingestion_description.remap_collection_id);
646 for id in subsource_ids {
647 source_status_updates.push(make_update(id, "source"));
648 }
649 }
650
651 for id in instance.active_exports() {
652 if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
653 active_replicas.remove(&replica_id);
654 if active_replicas.is_empty() {
655 self.dropped_objects.remove(id);
656 }
657 }
658
659 sink_status_updates.push(make_update(*id, "sink"));
660 }
661
662 instance.drop_replica(replica_id);
663
664 if !self.read_only {
665 if !source_status_updates.is_empty() {
666 self.append_status_introspection_updates(
667 IntrospectionType::SourceStatusHistory,
668 source_status_updates,
669 );
670 }
671 if !sink_status_updates.is_empty() {
672 self.append_status_introspection_updates(
673 IntrospectionType::SinkStatusHistory,
674 sink_status_updates,
675 );
676 }
677 }
678 }
679
680 async fn evolve_nullability_for_bootstrap(
681 &mut self,
682 storage_metadata: &StorageMetadata,
683 collections: Vec<(GlobalId, RelationDesc)>,
684 ) -> Result<(), StorageError<Self::Timestamp>> {
685 let persist_client = self
686 .persist
687 .open(self.persist_location.clone())
688 .await
689 .unwrap();
690
691 for (global_id, relation_desc) in collections {
692 let shard_id = storage_metadata.get_collection_shard(global_id)?;
693 let diagnostics = Diagnostics {
694 shard_name: global_id.to_string(),
695 handle_purpose: "evolve nullability for bootstrap".to_string(),
696 };
697 let latest_schema = persist_client
698 .latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
699 .await
700 .expect("invalid persist usage");
701 let Some((schema_id, current_schema, _)) = latest_schema else {
702 tracing::debug!(?global_id, "no schema registered");
703 continue;
704 };
705 tracing::debug!(?global_id, ?current_schema, new_schema = ?relation_desc, "migrating schema");
706
707 let diagnostics = Diagnostics {
708 shard_name: global_id.to_string(),
709 handle_purpose: "evolve nullability for bootstrap".to_string(),
710 };
711 let evolve_result = persist_client
712 .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
713 shard_id,
714 schema_id,
715 &relation_desc,
716 &UnitSchema,
717 diagnostics,
718 )
719 .await
720 .expect("invalid persist usage");
721 match evolve_result {
722 CaESchema::Ok(_) => (),
723 CaESchema::ExpectedMismatch {
724 schema_id,
725 key,
726 val: _,
727 } => {
728 return Err(StorageError::PersistSchemaEvolveRace {
729 global_id,
730 shard_id,
731 schema_id,
732 relation_desc: key,
733 });
734 }
735 CaESchema::Incompatible => {
736 return Err(StorageError::PersistInvalidSchemaEvolve {
737 global_id,
738 shard_id,
739 });
740 }
741 };
742 }
743
744 Ok(())
745 }
746
747 #[instrument(name = "storage::create_collections")]
766 async fn create_collections_for_bootstrap(
767 &mut self,
768 storage_metadata: &StorageMetadata,
769 register_ts: Option<Self::Timestamp>,
770 mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
771 migrated_storage_collections: &BTreeSet<GlobalId>,
772 ) -> Result<(), StorageError<Self::Timestamp>> {
773 self.migrated_storage_collections
774 .extend(migrated_storage_collections.iter().cloned());
775
776 self.storage_collections
777 .create_collections_for_bootstrap(
778 storage_metadata,
779 register_ts.clone(),
780 collections.clone(),
781 migrated_storage_collections,
782 )
783 .await?;
784
785 drop(self.persist_warm_task.take());
788
789 collections.sort_by_key(|(id, _)| *id);
794 collections.dedup();
795 for pos in 1..collections.len() {
796 if collections[pos - 1].0 == collections[pos].0 {
797 return Err(StorageError::CollectionIdReused(collections[pos].0));
798 }
799 }
800
801 let enriched_with_metadata = collections
803 .into_iter()
804 .map(|(id, description)| {
805 let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
806
807 let txns_shard = description
810 .data_source
811 .in_txns()
812 .then(|| *self.txns_read.txns_id());
813
814 let metadata = CollectionMetadata {
815 persist_location: self.persist_location.clone(),
816 data_shard,
817 relation_desc: description.desc.clone(),
818 txns_shard,
819 };
820
821 Ok((id, description, metadata))
822 })
823 .collect_vec();
824
825 let persist_client = self
827 .persist
828 .open(self.persist_location.clone())
829 .await
830 .unwrap();
831 let persist_client = &persist_client;
832
833 use futures::stream::{StreamExt, TryStreamExt};
836 let this = &*self;
837 let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
838 .map(|data: Result<_, StorageError<Self::Timestamp>>| {
839 async move {
840 let (id, description, metadata) = data?;
841
842 debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
845
846 let write = this
847 .open_data_handles(
848 &id,
849 metadata.data_shard,
850 metadata.relation_desc.clone(),
851 persist_client,
852 )
853 .await;
854
855 Ok::<_, StorageError<T>>((id, description, write, metadata))
856 }
857 })
858 .buffer_unordered(50)
860 .try_collect()
873 .await?;
874
875 let mut to_execute = BTreeSet::new();
878 let mut new_collections = BTreeSet::new();
883 let mut table_registers = Vec::with_capacity(to_register.len());
884
885 to_register.sort_by_key(|(id, ..)| *id);
887
888 let (tables_to_register, collections_to_register): (Vec<_>, Vec<_>) = to_register
894 .into_iter()
895 .partition(|(_id, desc, ..)| matches!(desc.data_source, DataSource::Table { .. }));
896 let to_register = tables_to_register
897 .into_iter()
898 .rev()
899 .chain(collections_to_register.into_iter());
900
901 let mut new_source_statistic_entries = BTreeSet::new();
905 let mut new_webhook_statistic_entries = BTreeSet::new();
906 let mut new_sink_statistic_entries = BTreeSet::new();
907
908 for (id, description, write, metadata) in to_register {
909 let is_in_txns = |id, metadata: &CollectionMetadata| {
910 metadata.txns_shard.is_some()
911 && !(self.read_only && migrated_storage_collections.contains(&id))
912 };
913
914 let data_source = description.data_source;
915
916 to_execute.insert(id);
917 new_collections.insert(id);
918
919 let write_frontier = write.upper();
920
921 let storage_dependencies = self.determine_collection_dependencies(id, &data_source)?;
923
924 let dependency_read_holds = self
925 .storage_collections
926 .acquire_read_holds(storage_dependencies)
927 .expect("can acquire read holds");
928
929 let mut dependency_since = Antichain::from_elem(T::minimum());
930 for read_hold in dependency_read_holds.iter() {
931 dependency_since.join_assign(read_hold.since());
932 }
933
934 if !dependency_read_holds.is_empty()
943 && !is_in_txns(id, &metadata)
944 && !matches!(&data_source, DataSource::Sink { .. })
945 {
946 if dependency_since.is_empty() {
952 halt!(
953 "dependency since frontier is empty while dependent upper \
954 is not empty (dependent id={id}, write_frontier={:?}, dependency_read_holds={:?}), \
955 this indicates concurrent deletion of a collection",
956 write_frontier,
957 dependency_read_holds,
958 );
959 }
960
961 mz_ore::soft_assert_or_log!(
979 write_frontier.elements() == &[T::minimum()]
980 || write_frontier.is_empty()
981 || PartialOrder::less_than(&dependency_since, write_frontier),
982 "dependency since has advanced past dependent ({id}) upper \n
983 dependent ({id}): upper {:?} \n
984 dependency since {:?} \n
985 dependency read holds: {:?}",
986 write_frontier,
987 dependency_since,
988 dependency_read_holds,
989 );
990 }
991
992 let mut extra_state = CollectionStateExtra::None;
994 let mut maybe_instance_id = None;
995 match &data_source {
996 DataSource::Introspection(typ) => {
997 debug!(
998 ?data_source, meta = ?metadata,
999 "registering {id} with persist monotonic worker",
1000 );
1001 self.register_introspection_collection(
1007 id,
1008 *typ,
1009 write,
1010 persist_client.clone(),
1011 )?;
1012 }
1013 DataSource::Webhook => {
1014 debug!(
1015 ?data_source, meta = ?metadata,
1016 "registering {id} with persist monotonic worker",
1017 );
1018 new_source_statistic_entries.insert(id);
1019 new_webhook_statistic_entries.insert(id);
1022 self.collection_manager
1028 .register_append_only_collection(id, write, false, None);
1029 }
1030 DataSource::IngestionExport {
1031 ingestion_id,
1032 details,
1033 data_config,
1034 } => {
1035 debug!(
1036 ?data_source, meta = ?metadata,
1037 "not registering {id} with a controller persist worker",
1038 );
1039 let ingestion_state = self
1041 .collections
1042 .get_mut(ingestion_id)
1043 .expect("known to exist");
1044
1045 let instance_id = match &mut ingestion_state.data_source {
1046 DataSource::Ingestion(ingestion_desc) => {
1047 ingestion_desc.source_exports.insert(
1048 id,
1049 SourceExport {
1050 storage_metadata: (),
1051 details: details.clone(),
1052 data_config: data_config.clone(),
1053 },
1054 );
1055
1056 ingestion_desc.instance_id
1061 }
1062 _ => unreachable!(
1063 "SourceExport must only refer to primary sources that already exist"
1064 ),
1065 };
1066
1067 to_execute.remove(&id);
1069 to_execute.insert(*ingestion_id);
1070
1071 let ingestion_state = IngestionState {
1072 read_capabilities: MutableAntichain::from(dependency_since.clone()),
1073 dependency_read_holds,
1074 derived_since: dependency_since,
1075 write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1076 hold_policy: ReadPolicy::step_back(),
1077 instance_id,
1078 hydrated_on: BTreeSet::new(),
1079 };
1080
1081 extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1082 maybe_instance_id = Some(instance_id);
1083
1084 new_source_statistic_entries.insert(id);
1085 }
1086 DataSource::Table { .. } => {
1087 debug!(
1088 ?data_source, meta = ?metadata,
1089 "registering {id} with persist table worker",
1090 );
1091 table_registers.push((id, write));
1092 }
1093 DataSource::Progress | DataSource::Other => {
1094 debug!(
1095 ?data_source, meta = ?metadata,
1096 "not registering {id} with a controller persist worker",
1097 );
1098 }
1099 DataSource::Ingestion(ingestion_desc) => {
1100 debug!(
1101 ?data_source, meta = ?metadata,
1102 "not registering {id} with a controller persist worker",
1103 );
1104
1105 let mut dependency_since = Antichain::from_elem(T::minimum());
1106 for read_hold in dependency_read_holds.iter() {
1107 dependency_since.join_assign(read_hold.since());
1108 }
1109
1110 let ingestion_state = IngestionState {
1111 read_capabilities: MutableAntichain::from(dependency_since.clone()),
1112 dependency_read_holds,
1113 derived_since: dependency_since,
1114 write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1115 hold_policy: ReadPolicy::step_back(),
1116 instance_id: ingestion_desc.instance_id,
1117 hydrated_on: BTreeSet::new(),
1118 };
1119
1120 extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1121 maybe_instance_id = Some(ingestion_desc.instance_id);
1122
1123 new_source_statistic_entries.insert(id);
1124 }
1125 DataSource::Sink { desc } => {
1126 let mut dependency_since = Antichain::from_elem(T::minimum());
1127 for read_hold in dependency_read_holds.iter() {
1128 dependency_since.join_assign(read_hold.since());
1129 }
1130
1131 let [self_hold, read_hold] =
1132 dependency_read_holds.try_into().expect("two holds");
1133
1134 let state = ExportState::new(
1135 desc.instance_id,
1136 read_hold,
1137 self_hold,
1138 write_frontier.clone(),
1139 ReadPolicy::step_back(),
1140 );
1141 maybe_instance_id = Some(state.cluster_id);
1142 extra_state = CollectionStateExtra::Export(state);
1143
1144 new_sink_statistic_entries.insert(id);
1145 }
1146 }
1147
1148 let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(id, maybe_instance_id);
1149 let collection_state =
1150 CollectionState::new(data_source, metadata, extra_state, wallclock_lag_metrics);
1151
1152 self.collections.insert(id, collection_state);
1153 }
1154
1155 {
1156 let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1157
1158 for id in new_webhook_statistic_entries {
1161 source_statistics.webhook_statistics.entry(id).or_default();
1162 }
1163
1164 }
1168
1169 if !table_registers.is_empty() {
1171 let register_ts = register_ts
1172 .expect("caller should have provided a register_ts when creating a table");
1173
1174 if self.read_only {
1175 table_registers
1185 .retain(|(id, _write_handle)| migrated_storage_collections.contains(id));
1186
1187 self.persist_table_worker
1188 .register(register_ts, table_registers)
1189 .await
1190 .expect("table worker unexpectedly shut down");
1191 } else {
1192 self.persist_table_worker
1193 .register(register_ts, table_registers)
1194 .await
1195 .expect("table worker unexpectedly shut down");
1196 }
1197 }
1198
1199 self.append_shard_mappings(new_collections.into_iter(), Diff::ONE);
1200
1201 for id in to_execute {
1203 match &self.collection(id)?.data_source {
1204 DataSource::Ingestion(ingestion) => {
1205 if !self.read_only
1206 || (ENABLE_0DT_DEPLOYMENT_SOURCES.get(self.config.config_set())
1207 && ingestion.desc.connection.supports_read_only())
1208 {
1209 self.run_ingestion(id)?;
1210 }
1211 }
1212 DataSource::IngestionExport { .. } => unreachable!(
1213 "ingestion exports do not execute directly, but instead schedule their source to be re-executed"
1214 ),
1215 DataSource::Introspection(_)
1216 | DataSource::Webhook
1217 | DataSource::Table { .. }
1218 | DataSource::Progress
1219 | DataSource::Other => {}
1220 DataSource::Sink { .. } => {
1221 if !self.read_only {
1222 self.run_export(id)?;
1223 }
1224 }
1225 };
1226 }
1227
1228 Ok(())
1229 }
1230
1231 fn check_alter_ingestion_source_desc(
1232 &mut self,
1233 ingestion_id: GlobalId,
1234 source_desc: &SourceDesc,
1235 ) -> Result<(), StorageError<Self::Timestamp>> {
1236 let source_collection = self.collection(ingestion_id)?;
1237 let data_source = &source_collection.data_source;
1238 match &data_source {
1239 DataSource::Ingestion(cur_ingestion) => {
1240 cur_ingestion
1241 .desc
1242 .alter_compatible(ingestion_id, source_desc)?;
1243 }
1244 o => {
1245 tracing::info!(
1246 "{ingestion_id} inalterable because its data source is {:?} and not an ingestion",
1247 o
1248 );
1249 Err(AlterError { id: ingestion_id })?
1250 }
1251 }
1252
1253 Ok(())
1254 }
1255
1256 async fn alter_ingestion_connections(
1257 &mut self,
1258 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
1259 ) -> Result<(), StorageError<Self::Timestamp>> {
1260 self.storage_collections
1262 .alter_ingestion_connections(source_connections.clone())
1263 .await?;
1264
1265 let mut ingestions_to_run = BTreeSet::new();
1266
1267 for (id, conn) in source_connections {
1268 let collection = self
1269 .collections
1270 .get_mut(&id)
1271 .ok_or_else(|| StorageError::IdentifierMissing(id))?;
1272
1273 match &mut collection.data_source {
1274 DataSource::Ingestion(ingestion) => {
1275 if ingestion.desc.connection != conn {
1278 tracing::info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
1279 ingestion.desc.connection = conn;
1280 ingestions_to_run.insert(id);
1281 } else {
1282 tracing::warn!(
1283 "update_source_connection called on {id} but the \
1284 connection was the same"
1285 );
1286 }
1287 }
1288 o => {
1289 tracing::warn!("update_source_connection called on {:?}", o);
1290 Err(StorageError::IdentifierInvalid(id))?;
1291 }
1292 }
1293 }
1294
1295 for id in ingestions_to_run {
1296 self.run_ingestion(id)?;
1297 }
1298 Ok(())
1299 }
1300
1301 async fn alter_ingestion_export_data_configs(
1302 &mut self,
1303 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
1304 ) -> Result<(), StorageError<Self::Timestamp>> {
1305 self.storage_collections
1307 .alter_ingestion_export_data_configs(source_exports.clone())
1308 .await?;
1309
1310 let mut ingestions_to_run = BTreeSet::new();
1311
1312 for (source_export_id, new_data_config) in source_exports {
1313 let source_export_collection = self
1316 .collections
1317 .get_mut(&source_export_id)
1318 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1319 let ingestion_id = match &mut source_export_collection.data_source {
1320 DataSource::IngestionExport {
1321 ingestion_id,
1322 details: _,
1323 data_config,
1324 } => {
1325 *data_config = new_data_config.clone();
1326 *ingestion_id
1327 }
1328 o => {
1329 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1330 Err(StorageError::IdentifierInvalid(source_export_id))?
1331 }
1332 };
1333 let ingestion_collection = self
1336 .collections
1337 .get_mut(&ingestion_id)
1338 .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
1339
1340 match &mut ingestion_collection.data_source {
1341 DataSource::Ingestion(ingestion_desc) => {
1342 let source_export = ingestion_desc
1343 .source_exports
1344 .get_mut(&source_export_id)
1345 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1346
1347 if source_export.data_config != new_data_config {
1350 tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
1351 source_export.data_config = new_data_config;
1352
1353 ingestions_to_run.insert(ingestion_id);
1354 } else {
1355 tracing::warn!(
1356 "alter_ingestion_export_data_configs called on \
1357 export {source_export_id} of {ingestion_id} but \
1358 the data config was the same"
1359 );
1360 }
1361 }
1362 o => {
1363 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1364 Err(StorageError::IdentifierInvalid(ingestion_id))?
1365 }
1366 }
1367 }
1368
1369 for id in ingestions_to_run {
1370 self.run_ingestion(id)?;
1371 }
1372 Ok(())
1373 }
1374
1375 async fn alter_table_desc(
1376 &mut self,
1377 existing_collection: GlobalId,
1378 new_collection: GlobalId,
1379 new_desc: RelationDesc,
1380 expected_version: RelationVersion,
1381 register_ts: Self::Timestamp,
1382 ) -> Result<(), StorageError<Self::Timestamp>> {
1383 let data_shard = {
1384 let Controller {
1385 collections,
1386 storage_collections,
1387 ..
1388 } = self;
1389
1390 let existing = collections
1391 .get(&existing_collection)
1392 .ok_or(StorageError::IdentifierMissing(existing_collection))?;
1393 if !matches!(existing.data_source, DataSource::Table { .. }) {
1394 return Err(StorageError::IdentifierInvalid(existing_collection));
1395 }
1396
1397 storage_collections
1399 .alter_table_desc(
1400 existing_collection,
1401 new_collection,
1402 new_desc.clone(),
1403 expected_version,
1404 )
1405 .await?;
1406
1407 existing.collection_metadata.data_shard.clone()
1408 };
1409
1410 let persist_client = self
1411 .persist
1412 .open(self.persist_location.clone())
1413 .await
1414 .expect("invalid persist location");
1415 let write_handle = self
1416 .open_data_handles(
1417 &existing_collection,
1418 data_shard,
1419 new_desc.clone(),
1420 &persist_client,
1421 )
1422 .await;
1423
1424 let collection_desc = CollectionDescription::<T>::for_table(new_desc.clone(), None);
1426 let collection_meta = CollectionMetadata {
1427 persist_location: self.persist_location.clone(),
1428 data_shard,
1429 relation_desc: new_desc.clone(),
1430 txns_shard: Some(self.txns_read.txns_id().clone()),
1432 };
1433 let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(new_collection, None);
1435 let collection_state = CollectionState::new(
1436 collection_desc.data_source.clone(),
1437 collection_meta,
1438 CollectionStateExtra::None,
1439 wallclock_lag_metrics,
1440 );
1441
1442 self.collections.insert(new_collection, collection_state);
1445 let existing = self
1446 .collections
1447 .get_mut(&existing_collection)
1448 .expect("missing existing collection");
1449 assert!(matches!(
1450 existing.data_source,
1451 DataSource::Table { primary: None }
1452 ));
1453 existing.data_source = DataSource::Table {
1454 primary: Some(new_collection),
1455 };
1456
1457 self.persist_table_worker
1458 .register(register_ts, vec![(new_collection, write_handle)])
1459 .await
1460 .expect("table worker unexpectedly shut down");
1461
1462 self.append_shard_mappings([new_collection].into_iter(), Diff::ONE);
1463
1464 Ok(())
1465 }
1466
1467 fn export(
1468 &self,
1469 id: GlobalId,
1470 ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1471 self.collections
1472 .get(&id)
1473 .and_then(|c| match &c.extra_state {
1474 CollectionStateExtra::Export(state) => Some(state),
1475 _ => None,
1476 })
1477 .ok_or(StorageError::IdentifierMissing(id))
1478 }
1479
1480 fn export_mut(
1481 &mut self,
1482 id: GlobalId,
1483 ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1484 self.collections
1485 .get_mut(&id)
1486 .and_then(|c| match &mut c.extra_state {
1487 CollectionStateExtra::Export(state) => Some(state),
1488 _ => None,
1489 })
1490 .ok_or(StorageError::IdentifierMissing(id))
1491 }
1492
1493 async fn create_oneshot_ingestion(
1495 &mut self,
1496 ingestion_id: uuid::Uuid,
1497 collection_id: GlobalId,
1498 instance_id: StorageInstanceId,
1499 request: OneshotIngestionRequest,
1500 result_tx: OneshotResultCallback<ProtoBatch>,
1501 ) -> Result<(), StorageError<Self::Timestamp>> {
1502 let collection_meta = self
1503 .collections
1504 .get(&collection_id)
1505 .ok_or_else(|| StorageError::IdentifierMissing(collection_id))?
1506 .collection_metadata
1507 .clone();
1508 let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1509 StorageError::Generic(anyhow::anyhow!("missing cluster {instance_id}"))
1511 })?;
1512 let oneshot_cmd = RunOneshotIngestion {
1513 ingestion_id,
1514 collection_id,
1515 collection_meta,
1516 request,
1517 };
1518
1519 if !self.read_only {
1520 instance.send(StorageCommand::RunOneshotIngestion(Box::new(oneshot_cmd)));
1521 let pending = PendingOneshotIngestion {
1522 result_tx,
1523 cluster_id: instance_id,
1524 };
1525 let novel = self
1526 .pending_oneshot_ingestions
1527 .insert(ingestion_id, pending);
1528 assert_none!(novel);
1529 Ok(())
1530 } else {
1531 Err(StorageError::ReadOnly)
1532 }
1533 }
1534
1535 fn cancel_oneshot_ingestion(
1536 &mut self,
1537 ingestion_id: uuid::Uuid,
1538 ) -> Result<(), StorageError<Self::Timestamp>> {
1539 if self.read_only {
1540 return Err(StorageError::ReadOnly);
1541 }
1542
1543 let pending = self
1544 .pending_oneshot_ingestions
1545 .remove(&ingestion_id)
1546 .ok_or_else(|| {
1547 StorageError::Generic(anyhow::anyhow!("missing oneshot ingestion {ingestion_id}"))
1549 })?;
1550
1551 match self.instances.get_mut(&pending.cluster_id) {
1552 Some(instance) => {
1553 instance.send(StorageCommand::CancelOneshotIngestion(ingestion_id));
1554 }
1555 None => {
1556 mz_ore::soft_panic_or_log!(
1557 "canceling oneshot ingestion on non-existent cluster, ingestion {:?}, instance {}",
1558 ingestion_id,
1559 pending.cluster_id,
1560 );
1561 }
1562 }
1563 pending.cancel();
1565
1566 Ok(())
1567 }
1568
1569 async fn alter_export(
1570 &mut self,
1571 id: GlobalId,
1572 new_description: ExportDescription<Self::Timestamp>,
1573 ) -> Result<(), StorageError<Self::Timestamp>> {
1574 let from_id = new_description.sink.from;
1575
1576 let desired_read_holds = vec![from_id.clone(), id.clone()];
1579 let [input_hold, self_hold] = self
1580 .storage_collections
1581 .acquire_read_holds(desired_read_holds)
1582 .expect("missing dependency")
1583 .try_into()
1584 .expect("expected number of holds");
1585 let from_storage_metadata = self.storage_collections.collection_metadata(from_id)?;
1586 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1587
1588 let cur_export = self.export_mut(id)?;
1590 let input_readable = cur_export
1591 .write_frontier
1592 .iter()
1593 .all(|t| input_hold.since().less_than(t));
1594 if !input_readable {
1595 return Err(StorageError::ReadBeforeSince(from_id));
1596 }
1597
1598 let new_export = ExportState {
1599 read_capabilities: cur_export.read_capabilities.clone(),
1600 cluster_id: new_description.instance_id,
1601 derived_since: cur_export.derived_since.clone(),
1602 read_holds: [input_hold, self_hold],
1603 read_policy: cur_export.read_policy.clone(),
1604 write_frontier: cur_export.write_frontier.clone(),
1605 };
1606 *cur_export = new_export;
1607
1608 let cmd = RunSinkCommand {
1609 id,
1610 description: StorageSinkDesc {
1611 from: from_id,
1612 from_desc: new_description.sink.from_desc,
1613 connection: new_description.sink.connection,
1614 envelope: new_description.sink.envelope,
1615 as_of: new_description.sink.as_of,
1616 version: new_description.sink.version,
1617 from_storage_metadata,
1618 with_snapshot: new_description.sink.with_snapshot,
1619 to_storage_metadata,
1620 },
1621 };
1622
1623 let instance = self
1625 .instances
1626 .get_mut(&new_description.instance_id)
1627 .ok_or_else(|| StorageError::ExportInstanceMissing {
1628 storage_instance_id: new_description.instance_id,
1629 export_id: id,
1630 })?;
1631
1632 instance.send(StorageCommand::RunSink(Box::new(cmd)));
1633 Ok(())
1634 }
1635
1636 async fn alter_export_connections(
1638 &mut self,
1639 exports: BTreeMap<GlobalId, StorageSinkConnection>,
1640 ) -> Result<(), StorageError<Self::Timestamp>> {
1641 let mut updates_by_instance =
1642 BTreeMap::<StorageInstanceId, Vec<(RunSinkCommand<T>, ExportDescription<T>)>>::new();
1643
1644 for (id, connection) in exports {
1645 let (mut new_export_description, as_of): (ExportDescription<Self::Timestamp>, _) = {
1653 let export = &self.collections[&id];
1654 let DataSource::Sink { desc } = &export.data_source else {
1655 panic!("export exists")
1656 };
1657 let CollectionStateExtra::Export(state) = &export.extra_state else {
1658 panic!("export exists")
1659 };
1660 let export_description = desc.clone();
1661 let as_of = state.input_hold().since().clone();
1662
1663 (export_description, as_of)
1664 };
1665 let current_sink = new_export_description.sink.clone();
1666
1667 new_export_description.sink.connection = connection;
1668
1669 current_sink.alter_compatible(id, &new_export_description.sink)?;
1671
1672 let from_storage_metadata = self
1673 .storage_collections
1674 .collection_metadata(new_export_description.sink.from)?;
1675 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1676
1677 let cmd = RunSinkCommand {
1678 id,
1679 description: StorageSinkDesc {
1680 from: new_export_description.sink.from,
1681 from_desc: new_export_description.sink.from_desc.clone(),
1682 connection: new_export_description.sink.connection.clone(),
1683 envelope: new_export_description.sink.envelope,
1684 with_snapshot: new_export_description.sink.with_snapshot,
1685 version: new_export_description.sink.version,
1686 as_of: as_of.to_owned(),
1697 from_storage_metadata,
1698 to_storage_metadata,
1699 },
1700 };
1701
1702 let update = updates_by_instance
1703 .entry(new_export_description.instance_id)
1704 .or_default();
1705 update.push((cmd, new_export_description));
1706 }
1707
1708 for (instance_id, updates) in updates_by_instance {
1709 let mut export_updates = BTreeMap::new();
1710 let mut cmds = Vec::with_capacity(updates.len());
1711
1712 for (cmd, export_state) in updates {
1713 export_updates.insert(cmd.id, export_state);
1714 cmds.push(cmd);
1715 }
1716
1717 let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1719 StorageError::ExportInstanceMissing {
1720 storage_instance_id: instance_id,
1721 export_id: *export_updates
1722 .keys()
1723 .next()
1724 .expect("set of exports not empty"),
1725 }
1726 })?;
1727
1728 for cmd in cmds {
1729 instance.send(StorageCommand::RunSink(Box::new(cmd)));
1730 }
1731
1732 for (id, new_export_description) in export_updates {
1734 let Some(state) = self.collections.get_mut(&id) else {
1735 panic!("export known to exist")
1736 };
1737 let DataSource::Sink { desc } = &mut state.data_source else {
1738 panic!("export known to exist")
1739 };
1740 *desc = new_export_description;
1741 }
1742 }
1743
1744 Ok(())
1745 }
1746
1747 fn drop_tables(
1762 &mut self,
1763 storage_metadata: &StorageMetadata,
1764 identifiers: Vec<GlobalId>,
1765 ts: Self::Timestamp,
1766 ) -> Result<(), StorageError<Self::Timestamp>> {
1767 let (table_write_ids, data_source_ids): (Vec<_>, Vec<_>) = identifiers
1769 .into_iter()
1770 .partition(|id| match self.collections[id].data_source {
1771 DataSource::Table { .. } => true,
1772 DataSource::IngestionExport { .. } | DataSource::Webhook => false,
1773 _ => panic!("identifier is not a table: {}", id),
1774 });
1775
1776 if table_write_ids.len() > 0 {
1778 let drop_notif = self
1779 .persist_table_worker
1780 .drop_handles(table_write_ids.clone(), ts);
1781 let tx = self.pending_table_handle_drops_tx.clone();
1782 mz_ore::task::spawn(|| "table-cleanup".to_string(), async move {
1783 drop_notif.await;
1784 for identifier in table_write_ids {
1785 let _ = tx.send(identifier);
1786 }
1787 });
1788 }
1789
1790 if data_source_ids.len() > 0 {
1792 self.validate_collection_ids(data_source_ids.iter().cloned())?;
1793 self.drop_sources_unvalidated(storage_metadata, data_source_ids)?;
1794 }
1795
1796 Ok(())
1797 }
1798
1799 fn drop_sources(
1800 &mut self,
1801 storage_metadata: &StorageMetadata,
1802 identifiers: Vec<GlobalId>,
1803 ) -> Result<(), StorageError<Self::Timestamp>> {
1804 self.validate_collection_ids(identifiers.iter().cloned())?;
1805 self.drop_sources_unvalidated(storage_metadata, identifiers)
1806 }
1807
1808 fn drop_sources_unvalidated(
1809 &mut self,
1810 storage_metadata: &StorageMetadata,
1811 ids: Vec<GlobalId>,
1812 ) -> Result<(), StorageError<Self::Timestamp>> {
1813 let mut ingestions_to_execute = BTreeSet::new();
1816 let mut ingestions_to_drop = BTreeSet::new();
1817 let mut source_statistics_to_drop = Vec::new();
1818
1819 let mut collections_to_drop = Vec::new();
1823
1824 for id in ids.iter() {
1825 let metadata = storage_metadata.get_collection_shard::<T>(*id);
1826 mz_ore::soft_assert_or_log!(
1827 matches!(metadata, Err(StorageError::IdentifierMissing(_))),
1828 "dropping {id}, but drop was not synchronized with storage \
1829 controller via `synchronize_collections`"
1830 );
1831
1832 let collection_state = self.collections.get(id);
1833
1834 if let Some(collection_state) = collection_state {
1835 match collection_state.data_source {
1836 DataSource::Webhook => {
1837 let fut = self.collection_manager.unregister_collection(*id);
1840 mz_ore::task::spawn(|| format!("storage-webhook-cleanup-{id}"), fut);
1841
1842 collections_to_drop.push(*id);
1843 source_statistics_to_drop.push(*id);
1844 }
1845 DataSource::Ingestion(_) => {
1846 ingestions_to_drop.insert(*id);
1847 source_statistics_to_drop.push(*id);
1848 }
1849 DataSource::IngestionExport { ingestion_id, .. } => {
1850 ingestions_to_execute.insert(ingestion_id);
1857
1858 let ingestion_state = match self.collections.get_mut(&ingestion_id) {
1860 Some(ingestion_collection) => ingestion_collection,
1861 None => {
1863 tracing::error!(
1864 "primary source {ingestion_id} seemingly dropped before subsource {id}"
1865 );
1866 continue;
1867 }
1868 };
1869
1870 match &mut ingestion_state.data_source {
1871 DataSource::Ingestion(ingestion_desc) => {
1872 let removed = ingestion_desc.source_exports.remove(id);
1873 mz_ore::soft_assert_or_log!(
1874 removed.is_some(),
1875 "dropped subsource {id} already removed from source exports"
1876 );
1877 }
1878 _ => unreachable!(
1879 "SourceExport must only refer to primary sources that already exist"
1880 ),
1881 };
1882
1883 ingestions_to_drop.insert(*id);
1887 source_statistics_to_drop.push(*id);
1888 }
1889 DataSource::Progress | DataSource::Table { .. } | DataSource::Other => {
1890 collections_to_drop.push(*id);
1891 }
1892 DataSource::Introspection(_) | DataSource::Sink { .. } => {
1893 soft_panic_or_log!(
1896 "drop_sources called on a {:?} (id={id}))",
1897 collection_state.data_source,
1898 );
1899 }
1900 }
1901 }
1902 }
1903
1904 ingestions_to_execute.retain(|id| !ingestions_to_drop.contains(id));
1906 for ingestion_id in ingestions_to_execute {
1907 self.run_ingestion(ingestion_id)?;
1908 }
1909
1910 let ingestion_policies = ingestions_to_drop
1917 .iter()
1918 .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
1919 .collect();
1920
1921 tracing::debug!(
1922 ?ingestion_policies,
1923 "dropping sources by setting read hold policies"
1924 );
1925 self.set_hold_policies(ingestion_policies);
1926
1927 let shards_to_update: BTreeSet<_> = ingestions_to_drop
1929 .iter()
1930 .chain(collections_to_drop.iter())
1931 .cloned()
1932 .collect();
1933 self.append_shard_mappings(shards_to_update.into_iter(), Diff::MINUS_ONE);
1934
1935 let status_now = mz_ore::now::to_datetime((self.now)());
1936 let mut status_updates = vec![];
1937 for id in ingestions_to_drop.iter() {
1938 status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
1939 }
1940
1941 if !self.read_only {
1942 self.append_status_introspection_updates(
1943 IntrospectionType::SourceStatusHistory,
1944 status_updates,
1945 );
1946 }
1947
1948 {
1949 let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1950 for id in source_statistics_to_drop {
1951 source_statistics
1952 .source_statistics
1953 .retain(|(stats_id, _), _| stats_id != &id);
1954 source_statistics
1955 .webhook_statistics
1956 .retain(|stats_id, _| stats_id != &id);
1957 }
1958 }
1959
1960 for id in ingestions_to_drop.iter().chain(collections_to_drop.iter()) {
1962 tracing::info!(%id, "dropping collection state");
1963 let collection = self
1964 .collections
1965 .remove(id)
1966 .expect("list populated after checking that self.collections contains it");
1967
1968 let instance = match &collection.extra_state {
1969 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
1970 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
1971 CollectionStateExtra::None => None,
1972 }
1973 .and_then(|i| self.instances.get(&i));
1974
1975 if let Some(instance) = instance {
1979 let active_replicas = instance.get_active_replicas_for_object(id);
1980 if !active_replicas.is_empty() {
1981 match &collection.data_source {
1988 DataSource::Ingestion(ingestion_desc) => {
1989 self.dropped_objects.insert(
1990 ingestion_desc.remap_collection_id,
1991 active_replicas.clone(),
1992 );
1993 }
1994 _ => {}
1995 }
1996
1997 self.dropped_objects.insert(*id, active_replicas);
1998 }
1999 }
2000 }
2001
2002 self.storage_collections
2004 .drop_collections_unvalidated(storage_metadata, ids);
2005
2006 Ok(())
2007 }
2008
2009 fn drop_sinks(
2011 &mut self,
2012 storage_metadata: &StorageMetadata,
2013 identifiers: Vec<GlobalId>,
2014 ) -> Result<(), StorageError<Self::Timestamp>> {
2015 self.validate_export_ids(identifiers.iter().cloned())?;
2016 self.drop_sinks_unvalidated(storage_metadata, identifiers);
2017 Ok(())
2018 }
2019
2020 fn drop_sinks_unvalidated(
2021 &mut self,
2022 storage_metadata: &StorageMetadata,
2023 mut sinks_to_drop: Vec<GlobalId>,
2024 ) {
2025 sinks_to_drop.retain(|id| self.export(*id).is_ok());
2027
2028 let drop_policy = sinks_to_drop
2035 .iter()
2036 .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
2037 .collect();
2038
2039 tracing::debug!(
2040 ?drop_policy,
2041 "dropping sources by setting read hold policies"
2042 );
2043 self.set_hold_policies(drop_policy);
2044
2045 let status_now = mz_ore::now::to_datetime((self.now)());
2052
2053 let mut status_updates = vec![];
2055 {
2056 let mut sink_statistics = self.sink_statistics.lock().expect("poisoned");
2057 for id in sinks_to_drop.iter() {
2058 status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
2059 sink_statistics.retain(|(stats_id, _), _| stats_id != id);
2060 }
2061 }
2062
2063 if !self.read_only {
2064 self.append_status_introspection_updates(
2065 IntrospectionType::SinkStatusHistory,
2066 status_updates,
2067 );
2068 }
2069
2070 for id in sinks_to_drop.iter() {
2072 tracing::info!(%id, "dropping export state");
2073 let collection = self
2074 .collections
2075 .remove(id)
2076 .expect("list populated after checking that self.collections contains it");
2077
2078 let instance = match &collection.extra_state {
2079 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
2080 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
2081 CollectionStateExtra::None => None,
2082 }
2083 .and_then(|i| self.instances.get(&i));
2084
2085 if let Some(instance) = instance {
2089 let active_replicas = instance.get_active_replicas_for_object(id);
2090 if !active_replicas.is_empty() {
2091 self.dropped_objects.insert(*id, active_replicas);
2092 }
2093 }
2094 }
2095
2096 self.storage_collections
2098 .drop_collections_unvalidated(storage_metadata, sinks_to_drop);
2099 }
2100
2101 #[instrument(level = "debug")]
2102 fn append_table(
2103 &mut self,
2104 write_ts: Self::Timestamp,
2105 advance_to: Self::Timestamp,
2106 commands: Vec<(GlobalId, Vec<TableData>)>,
2107 ) -> Result<
2108 tokio::sync::oneshot::Receiver<Result<(), StorageError<Self::Timestamp>>>,
2109 StorageError<Self::Timestamp>,
2110 > {
2111 if self.read_only {
2112 if !commands
2115 .iter()
2116 .all(|(id, _)| id.is_system() && self.migrated_storage_collections.contains(id))
2117 {
2118 return Err(StorageError::ReadOnly);
2119 }
2120 }
2121
2122 for (id, updates) in commands.iter() {
2124 if !updates.is_empty() {
2125 if !write_ts.less_than(&advance_to) {
2126 return Err(StorageError::UpdateBeyondUpper(*id));
2127 }
2128 }
2129 }
2130
2131 Ok(self
2132 .persist_table_worker
2133 .append(write_ts, advance_to, commands))
2134 }
2135
2136 fn monotonic_appender(
2137 &self,
2138 id: GlobalId,
2139 ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>> {
2140 self.collection_manager.monotonic_appender(id)
2141 }
2142
2143 fn webhook_statistics(
2144 &self,
2145 id: GlobalId,
2146 ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>> {
2147 let source_statistics = self.source_statistics.lock().expect("poisoned");
2149 source_statistics
2150 .webhook_statistics
2151 .get(&id)
2152 .cloned()
2153 .ok_or(StorageError::IdentifierMissing(id))
2154 }
2155
2156 async fn ready(&mut self) {
2157 if self.maintenance_scheduled {
2158 return;
2159 }
2160
2161 if !self.pending_table_handle_drops_rx.is_empty() {
2162 return;
2163 }
2164
2165 tokio::select! {
2166 Some(m) = self.instance_response_rx.recv() => {
2167 self.stashed_responses.push(m);
2168 while let Ok(m) = self.instance_response_rx.try_recv() {
2169 self.stashed_responses.push(m);
2170 }
2171 }
2172 _ = self.maintenance_ticker.tick() => {
2173 self.maintenance_scheduled = true;
2174 },
2175 };
2176 }
2177
2178 #[instrument(level = "debug")]
2179 fn process(
2180 &mut self,
2181 storage_metadata: &StorageMetadata,
2182 ) -> Result<Option<Response<T>>, anyhow::Error> {
2183 if self.maintenance_scheduled {
2185 self.maintain();
2186 self.maintenance_scheduled = false;
2187 }
2188
2189 for instance in self.instances.values_mut() {
2190 instance.rehydrate_failed_replicas();
2191 }
2192
2193 let mut status_updates = vec![];
2194 let mut updated_frontiers = BTreeMap::new();
2195
2196 let stashed_responses = std::mem::take(&mut self.stashed_responses);
2198 for resp in stashed_responses {
2199 match resp {
2200 (_replica_id, StorageResponse::FrontierUpper(id, upper)) => {
2201 self.update_write_frontier(id, &upper);
2202 updated_frontiers.insert(id, upper);
2203 }
2204 (replica_id, StorageResponse::DroppedId(id)) => {
2205 let replica_id = replica_id.expect("DroppedId from unknown replica");
2206 if let Some(remaining_replicas) = self.dropped_objects.get_mut(&id) {
2207 remaining_replicas.remove(&replica_id);
2208 if remaining_replicas.is_empty() {
2209 self.dropped_objects.remove(&id);
2210 }
2211 } else {
2212 soft_panic_or_log!("unexpected DroppedId for {id}");
2213 }
2214 }
2215 (replica_id, StorageResponse::StatisticsUpdates(source_stats, sink_stats)) => {
2216 {
2218 let replica_id = if let Some(replica_id) = replica_id {
2225 replica_id
2226 } else {
2227 tracing::error!(
2228 ?source_stats,
2229 "missing replica_id for source statistics update"
2230 );
2231 continue;
2232 };
2233
2234 let mut shared_stats = self.source_statistics.lock().expect("poisoned");
2235
2236 for stat in source_stats {
2237 let collection_id = stat.id.clone();
2238
2239 if self.collection(collection_id).is_err() {
2240 continue;
2243 }
2244
2245 let entry = shared_stats
2246 .source_statistics
2247 .entry((stat.id, Some(replica_id)));
2248
2249 match entry {
2250 btree_map::Entry::Vacant(vacant_entry) => {
2251 let mut stats = ControllerSourceStatistics::new(
2252 collection_id,
2253 Some(replica_id),
2254 );
2255 stats.incorporate(stat);
2256 vacant_entry.insert(stats);
2257 }
2258 btree_map::Entry::Occupied(mut occupied_entry) => {
2259 occupied_entry.get_mut().incorporate(stat);
2260 }
2261 }
2262 }
2263 }
2264
2265 {
2266 let replica_id = if let Some(replica_id) = replica_id {
2277 replica_id
2278 } else {
2279 tracing::error!(
2280 ?sink_stats,
2281 "missing replica_id for sink statistics update"
2282 );
2283 continue;
2284 };
2285
2286 let mut shared_stats = self.sink_statistics.lock().expect("poisoned");
2287
2288 for stat in sink_stats {
2289 let collection_id = stat.id.clone();
2290
2291 if self.collection(collection_id).is_err() {
2292 continue;
2295 }
2296
2297 let entry = shared_stats.entry((stat.id, Some(replica_id)));
2298
2299 match entry {
2300 btree_map::Entry::Vacant(vacant_entry) => {
2301 let mut stats =
2302 ControllerSinkStatistics::new(collection_id, replica_id);
2303 stats.incorporate(stat);
2304 vacant_entry.insert(stats);
2305 }
2306 btree_map::Entry::Occupied(mut occupied_entry) => {
2307 occupied_entry.get_mut().incorporate(stat);
2308 }
2309 }
2310 }
2311 }
2312 }
2313 (replica_id, StorageResponse::StatusUpdate(mut status_update)) => {
2314 match status_update.status {
2330 Status::Running => {
2331 let collection = self.collections.get_mut(&status_update.id);
2332 match collection {
2333 Some(collection) => {
2334 match collection.extra_state {
2335 CollectionStateExtra::Ingestion(
2336 ref mut ingestion_state,
2337 ) => {
2338 if ingestion_state.hydrated_on.is_empty() {
2339 tracing::debug!(ingestion_id = %status_update.id, "ingestion is hydrated");
2340 }
2341 ingestion_state.hydrated_on.insert(replica_id.expect(
2342 "replica id should be present for status running",
2343 ));
2344 }
2345 CollectionStateExtra::Export(_) => {
2346 }
2348 CollectionStateExtra::None => {
2349 }
2351 }
2352 }
2353 None => (), }
2356 }
2357 Status::Paused => {
2358 let collection = self.collections.get_mut(&status_update.id);
2359 match collection {
2360 Some(collection) => {
2361 match collection.extra_state {
2362 CollectionStateExtra::Ingestion(
2363 ref mut ingestion_state,
2364 ) => {
2365 tracing::debug!(ingestion_id = %status_update.id, "ingestion is now paused");
2372 ingestion_state.hydrated_on.clear();
2373 }
2374 CollectionStateExtra::Export(_) => {
2375 }
2377 CollectionStateExtra::None => {
2378 }
2380 }
2381 }
2382 None => (), }
2385 }
2386 _ => (),
2387 }
2388
2389 if let Some(id) = replica_id {
2391 status_update.replica_id = Some(id);
2392 }
2393 status_updates.push(status_update);
2394 }
2395 (_replica_id, StorageResponse::StagedBatches(batches)) => {
2396 for (ingestion_id, batches) in batches {
2397 match self.pending_oneshot_ingestions.remove(&ingestion_id) {
2398 Some(pending) => {
2399 if let Some(instance) = self.instances.get_mut(&pending.cluster_id)
2402 {
2403 instance
2404 .send(StorageCommand::CancelOneshotIngestion(ingestion_id));
2405 }
2406 (pending.result_tx)(batches)
2408 }
2409 None => {
2410 }
2413 }
2414 }
2415 }
2416 }
2417 }
2418
2419 self.record_status_updates(status_updates);
2420
2421 let mut dropped_table_ids = Vec::new();
2423 while let Ok(dropped_id) = self.pending_table_handle_drops_rx.try_recv() {
2424 dropped_table_ids.push(dropped_id);
2425 }
2426 if !dropped_table_ids.is_empty() {
2427 self.drop_sources(storage_metadata, dropped_table_ids)?;
2428 }
2429
2430 if updated_frontiers.is_empty() {
2431 Ok(None)
2432 } else {
2433 Ok(Some(Response::FrontierUpdates(
2434 updated_frontiers.into_iter().collect(),
2435 )))
2436 }
2437 }
2438
2439 async fn inspect_persist_state(
2440 &self,
2441 id: GlobalId,
2442 ) -> Result<serde_json::Value, anyhow::Error> {
2443 let collection = &self.storage_collections.collection_metadata(id)?;
2444 let client = self
2445 .persist
2446 .open(collection.persist_location.clone())
2447 .await?;
2448 let shard_state = client
2449 .inspect_shard::<Self::Timestamp>(&collection.data_shard)
2450 .await?;
2451 let json_state = serde_json::to_value(shard_state)?;
2452 Ok(json_state)
2453 }
2454
2455 fn append_introspection_updates(
2456 &mut self,
2457 type_: IntrospectionType,
2458 updates: Vec<(Row, Diff)>,
2459 ) {
2460 let id = self.introspection_ids[&type_];
2461 let updates = updates.into_iter().map(|update| update.into()).collect();
2462 self.collection_manager.blind_write(id, updates);
2463 }
2464
2465 fn append_status_introspection_updates(
2466 &mut self,
2467 type_: IntrospectionType,
2468 updates: Vec<StatusUpdate>,
2469 ) {
2470 let id = self.introspection_ids[&type_];
2471 let updates: Vec<_> = updates.into_iter().map(|update| update.into()).collect();
2472 if !updates.is_empty() {
2473 self.collection_manager.blind_write(id, updates);
2474 }
2475 }
2476
2477 fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp) {
2478 let id = self.introspection_ids[&type_];
2479 self.collection_manager.differential_write(id, op);
2480 }
2481
2482 fn append_only_introspection_tx(
2483 &self,
2484 type_: IntrospectionType,
2485 ) -> mpsc::UnboundedSender<(
2486 Vec<AppendOnlyUpdate>,
2487 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2488 )> {
2489 let id = self.introspection_ids[&type_];
2490 self.collection_manager.append_only_write_sender(id)
2491 }
2492
2493 fn differential_introspection_tx(
2494 &self,
2495 type_: IntrospectionType,
2496 ) -> mpsc::UnboundedSender<(
2497 StorageWriteOp,
2498 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2499 )> {
2500 let id = self.introspection_ids[&type_];
2501 self.collection_manager.differential_write_sender(id)
2502 }
2503
2504 async fn real_time_recent_timestamp(
2505 &self,
2506 timestamp_objects: BTreeSet<GlobalId>,
2507 timeout: Duration,
2508 ) -> Result<
2509 BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>,
2510 StorageError<Self::Timestamp>,
2511 > {
2512 use mz_storage_types::sources::GenericSourceConnection;
2513
2514 let mut rtr_futures = BTreeMap::new();
2515
2516 for id in timestamp_objects.into_iter().filter(GlobalId::is_user) {
2518 let collection = match self.collection(id) {
2519 Ok(c) => c,
2520 Err(_) => continue,
2522 };
2523
2524 let (source_conn, remap_id) = match &collection.data_source {
2525 DataSource::Ingestion(IngestionDescription {
2526 desc: SourceDesc { connection, .. },
2527 remap_collection_id,
2528 ..
2529 }) => match connection {
2530 GenericSourceConnection::Kafka(_)
2531 | GenericSourceConnection::Postgres(_)
2532 | GenericSourceConnection::MySql(_)
2533 | GenericSourceConnection::SqlServer(_) => {
2534 (connection.clone(), *remap_collection_id)
2535 }
2536
2537 GenericSourceConnection::LoadGenerator(_) => continue,
2542 },
2543 _ => {
2545 continue;
2546 }
2547 };
2548
2549 let config = self.config().clone();
2551
2552 let read_handle = self.read_handle_for_snapshot(remap_id).await?;
2560
2561 let remap_read_hold = self
2564 .storage_collections
2565 .acquire_read_holds(vec![remap_id])
2566 .map_err(|_e| StorageError::ReadBeforeSince(remap_id))?
2567 .expect_element(|| "known to be exactly one");
2568
2569 let remap_as_of = remap_read_hold
2570 .since()
2571 .to_owned()
2572 .into_option()
2573 .ok_or(StorageError::ReadBeforeSince(remap_id))?;
2574
2575 rtr_futures.insert(
2576 id,
2577 tokio::time::timeout(timeout, async move {
2578 use mz_storage_types::sources::SourceConnection as _;
2579
2580 let as_of = Antichain::from_elem(remap_as_of);
2583 let remap_subscribe = read_handle
2584 .subscribe(as_of.clone())
2585 .await
2586 .map_err(|_| StorageError::ReadBeforeSince(remap_id))?;
2587
2588 tracing::debug!(?id, type_ = source_conn.name(), upstream = ?source_conn.external_reference(), "fetching real time recency");
2589
2590 let result = rtr::real_time_recency_ts(source_conn, id, config, as_of, remap_subscribe)
2591 .await.map_err(|e| {
2592 tracing::debug!(?id, "real time recency error: {:?}", e);
2593 e
2594 });
2595
2596 drop(remap_read_hold);
2598
2599 result
2600 }),
2601 );
2602 }
2603
2604 Ok(Box::pin(async move {
2605 let (ids, futs): (Vec<_>, Vec<_>) = rtr_futures.into_iter().unzip();
2606 ids.into_iter()
2607 .zip_eq(futures::future::join_all(futs).await)
2608 .try_fold(T::minimum(), |curr, (id, per_source_res)| {
2609 let new =
2610 per_source_res.map_err(|_e: Elapsed| StorageError::RtrTimeout(id))??;
2611 Ok::<_, StorageError<Self::Timestamp>>(std::cmp::max(curr, new))
2612 })
2613 }))
2614 }
2615}
2616
2617pub fn prepare_initialization<T>(txn: &mut dyn StorageTxn<T>) -> Result<(), StorageError<T>> {
2624 if txn.get_txn_wal_shard().is_none() {
2625 let txns_id = ShardId::new();
2626 txn.write_txn_wal_shard(txns_id)?;
2627 }
2628
2629 Ok(())
2630}
2631
2632impl<T> Controller<T>
2633where
2634 T: Timestamp
2635 + Lattice
2636 + TotalOrder
2637 + Codec64
2638 + From<EpochMillis>
2639 + TimestampManipulation
2640 + Into<Datum<'static>>,
2641 Self: StorageController<Timestamp = T>,
2642{
2643 pub async fn new(
2651 build_info: &'static BuildInfo,
2652 persist_location: PersistLocation,
2653 persist_clients: Arc<PersistClientCache>,
2654 now: NowFn,
2655 wallclock_lag: WallclockLagFn<T>,
2656 txns_metrics: Arc<TxnMetrics>,
2657 read_only: bool,
2658 metrics_registry: &MetricsRegistry,
2659 controller_metrics: ControllerMetrics,
2660 connection_context: ConnectionContext,
2661 txn: &dyn StorageTxn<T>,
2662 storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
2663 ) -> Self {
2664 let txns_client = persist_clients
2665 .open(persist_location.clone())
2666 .await
2667 .expect("location should be valid");
2668
2669 let persist_warm_task = warm_persist_state_in_background(
2670 txns_client.clone(),
2671 txn.get_collection_metadata().into_values(),
2672 );
2673 let persist_warm_task = Some(persist_warm_task.abort_on_drop());
2674
2675 let txns_id = txn
2679 .get_txn_wal_shard()
2680 .expect("must call prepare initialization before creating storage controller");
2681
2682 let persist_table_worker = if read_only {
2683 let txns_write = txns_client
2684 .open_writer(
2685 txns_id,
2686 Arc::new(TxnsCodecRow::desc()),
2687 Arc::new(UnitSchema),
2688 Diagnostics {
2689 shard_name: "txns".to_owned(),
2690 handle_purpose: "follow txns upper".to_owned(),
2691 },
2692 )
2693 .await
2694 .expect("txns schema shouldn't change");
2695 persist_handles::PersistTableWriteWorker::new_read_only_mode(txns_write)
2696 } else {
2697 let txns = TxnsHandle::open(
2698 T::minimum(),
2699 txns_client.clone(),
2700 txns_client.dyncfgs().clone(),
2701 Arc::clone(&txns_metrics),
2702 txns_id,
2703 )
2704 .await;
2705 persist_handles::PersistTableWriteWorker::new_txns(txns)
2706 };
2707 let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
2708
2709 let collection_manager = collection_mgmt::CollectionManager::new(read_only, now.clone());
2710
2711 let introspection_ids = BTreeMap::new();
2712 let introspection_tokens = Arc::new(Mutex::new(BTreeMap::new()));
2713
2714 let (statistics_interval_sender, _) =
2715 channel(mz_storage_types::parameters::STATISTICS_INTERVAL_DEFAULT);
2716
2717 let (pending_table_handle_drops_tx, pending_table_handle_drops_rx) =
2718 tokio::sync::mpsc::unbounded_channel();
2719
2720 let mut maintenance_ticker = tokio::time::interval(Duration::from_secs(1));
2721 maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2722
2723 let (instance_response_tx, instance_response_rx) = mpsc::unbounded_channel();
2724
2725 let metrics = StorageControllerMetrics::new(metrics_registry, controller_metrics);
2726
2727 let now_dt = mz_ore::now::to_datetime(now());
2728
2729 Self {
2730 build_info,
2731 collections: BTreeMap::default(),
2732 dropped_objects: Default::default(),
2733 persist_table_worker,
2734 txns_read,
2735 txns_metrics,
2736 stashed_responses: vec![],
2737 pending_table_handle_drops_tx,
2738 pending_table_handle_drops_rx,
2739 pending_oneshot_ingestions: BTreeMap::default(),
2740 collection_manager,
2741 introspection_ids,
2742 introspection_tokens,
2743 now,
2744 read_only,
2745 source_statistics: Arc::new(Mutex::new(statistics::SourceStatistics {
2746 source_statistics: BTreeMap::new(),
2747 webhook_statistics: BTreeMap::new(),
2748 })),
2749 sink_statistics: Arc::new(Mutex::new(BTreeMap::new())),
2750 statistics_interval_sender,
2751 instances: BTreeMap::new(),
2752 initialized: false,
2753 config: StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs()),
2754 persist_location,
2755 persist: persist_clients,
2756 metrics,
2757 recorded_frontiers: BTreeMap::new(),
2758 recorded_replica_frontiers: BTreeMap::new(),
2759 wallclock_lag,
2760 wallclock_lag_last_recorded: now_dt,
2761 storage_collections,
2762 migrated_storage_collections: BTreeSet::new(),
2763 maintenance_ticker,
2764 maintenance_scheduled: false,
2765 instance_response_rx,
2766 instance_response_tx,
2767 persist_warm_task,
2768 }
2769 }
2770
2771 #[instrument(level = "debug")]
2779 fn set_hold_policies(&mut self, policies: Vec<(GlobalId, ReadPolicy<T>)>) {
2780 let mut read_capability_changes = BTreeMap::default();
2781
2782 for (id, policy) in policies.into_iter() {
2783 if let Some(collection) = self.collections.get_mut(&id) {
2784 let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state
2785 {
2786 CollectionStateExtra::Ingestion(ingestion) => (
2787 ingestion.write_frontier.borrow(),
2788 &mut ingestion.derived_since,
2789 &mut ingestion.hold_policy,
2790 ),
2791 CollectionStateExtra::None => {
2792 unreachable!("set_hold_policies is only called for ingestions");
2793 }
2794 CollectionStateExtra::Export(export) => (
2795 export.write_frontier.borrow(),
2796 &mut export.derived_since,
2797 &mut export.read_policy,
2798 ),
2799 };
2800
2801 let new_derived_since = policy.frontier(write_frontier);
2802 let mut update = swap_updates(derived_since, new_derived_since);
2803 if !update.is_empty() {
2804 read_capability_changes.insert(id, update);
2805 }
2806
2807 *hold_policy = policy;
2808 }
2809 }
2810
2811 if !read_capability_changes.is_empty() {
2812 self.update_hold_capabilities(&mut read_capability_changes);
2813 }
2814 }
2815
2816 #[instrument(level = "debug", fields(updates))]
2817 fn update_write_frontier(&mut self, id: GlobalId, new_upper: &Antichain<T>) {
2818 let mut read_capability_changes = BTreeMap::default();
2819
2820 if let Some(collection) = self.collections.get_mut(&id) {
2821 let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state {
2822 CollectionStateExtra::Ingestion(ingestion) => (
2823 &mut ingestion.write_frontier,
2824 &mut ingestion.derived_since,
2825 &ingestion.hold_policy,
2826 ),
2827 CollectionStateExtra::None => {
2828 if matches!(collection.data_source, DataSource::Progress) {
2829 } else {
2831 tracing::error!(
2832 ?collection,
2833 ?new_upper,
2834 "updated write frontier for collection which is not an ingestion"
2835 );
2836 }
2837 return;
2838 }
2839 CollectionStateExtra::Export(export) => (
2840 &mut export.write_frontier,
2841 &mut export.derived_since,
2842 &export.read_policy,
2843 ),
2844 };
2845
2846 if PartialOrder::less_than(write_frontier, new_upper) {
2847 write_frontier.clone_from(new_upper);
2848 }
2849
2850 let new_derived_since = hold_policy.frontier(write_frontier.borrow());
2851 let mut update = swap_updates(derived_since, new_derived_since);
2852 if !update.is_empty() {
2853 read_capability_changes.insert(id, update);
2854 }
2855 } else if self.dropped_objects.contains_key(&id) {
2856 } else {
2859 soft_panic_or_log!("spurious upper update for {id}: {new_upper:?}");
2860 }
2861
2862 if !read_capability_changes.is_empty() {
2863 self.update_hold_capabilities(&mut read_capability_changes);
2864 }
2865 }
2866
2867 #[instrument(level = "debug", fields(updates))]
2871 fn update_hold_capabilities(&mut self, updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>) {
2872 let mut collections_net = BTreeMap::new();
2874
2875 while let Some(key) = updates.keys().rev().next().cloned() {
2880 let mut update = updates.remove(&key).unwrap();
2881
2882 if key.is_user() {
2883 debug!(id = %key, ?update, "update_hold_capability");
2884 }
2885
2886 if let Some(collection) = self.collections.get_mut(&key) {
2887 match &mut collection.extra_state {
2888 CollectionStateExtra::Ingestion(ingestion) => {
2889 let changes = ingestion.read_capabilities.update_iter(update.drain());
2890 update.extend(changes);
2891
2892 let (changes, frontier, _cluster_id) =
2893 collections_net.entry(key).or_insert_with(|| {
2894 (
2895 <ChangeBatch<_>>::new(),
2896 Antichain::new(),
2897 ingestion.instance_id,
2898 )
2899 });
2900
2901 changes.extend(update.drain());
2902 *frontier = ingestion.read_capabilities.frontier().to_owned();
2903 }
2904 CollectionStateExtra::None => {
2905 soft_panic_or_log!(
2907 "trying to update holds for collection {collection:?} which is not \
2908 an ingestion: {update:?}"
2909 );
2910 continue;
2911 }
2912 CollectionStateExtra::Export(export) => {
2913 let changes = export.read_capabilities.update_iter(update.drain());
2914 update.extend(changes);
2915
2916 let (changes, frontier, _cluster_id) =
2917 collections_net.entry(key).or_insert_with(|| {
2918 (<ChangeBatch<_>>::new(), Antichain::new(), export.cluster_id)
2919 });
2920
2921 changes.extend(update.drain());
2922 *frontier = export.read_capabilities.frontier().to_owned();
2923 }
2924 }
2925 } else {
2926 tracing::warn!(id = ?key, ?update, "update_hold_capabilities for unknown object");
2928 }
2929 }
2930
2931 for (key, (mut changes, frontier, cluster_id)) in collections_net {
2934 if !changes.is_empty() {
2935 if key.is_user() {
2936 debug!(id = %key, ?frontier, "downgrading ingestion read holds!");
2937 }
2938
2939 let collection = self
2940 .collections
2941 .get_mut(&key)
2942 .expect("missing collection state");
2943
2944 let read_holds = match &mut collection.extra_state {
2945 CollectionStateExtra::Ingestion(ingestion) => {
2946 ingestion.dependency_read_holds.as_mut_slice()
2947 }
2948 CollectionStateExtra::Export(export) => export.read_holds.as_mut_slice(),
2949 CollectionStateExtra::None => {
2950 soft_panic_or_log!(
2951 "trying to downgrade read holds for collection which is not an \
2952 ingestion: {collection:?}"
2953 );
2954 continue;
2955 }
2956 };
2957
2958 for read_hold in read_holds.iter_mut() {
2959 read_hold
2960 .try_downgrade(frontier.clone())
2961 .expect("we only advance the frontier");
2962 }
2963
2964 if let Some(instance) = self.instances.get_mut(&cluster_id) {
2966 instance.send(StorageCommand::AllowCompaction(key, frontier.clone()));
2967 } else {
2968 soft_panic_or_log!(
2969 "missing instance client for cluster {cluster_id} while we still have outstanding AllowCompaction command {frontier:?} for {key}"
2970 );
2971 }
2972 }
2973 }
2974 }
2975
2976 fn validate_collection_ids(
2978 &self,
2979 ids: impl Iterator<Item = GlobalId>,
2980 ) -> Result<(), StorageError<T>> {
2981 for id in ids {
2982 self.storage_collections.check_exists(id)?;
2983 }
2984 Ok(())
2985 }
2986
2987 fn validate_export_ids(
2989 &self,
2990 ids: impl Iterator<Item = GlobalId>,
2991 ) -> Result<(), StorageError<T>> {
2992 for id in ids {
2993 self.export(id)?;
2994 }
2995 Ok(())
2996 }
2997
2998 async fn open_data_handles(
3006 &self,
3007 id: &GlobalId,
3008 shard: ShardId,
3009 relation_desc: RelationDesc,
3010 persist_client: &PersistClient,
3011 ) -> WriteHandle<SourceData, (), T, StorageDiff> {
3012 let diagnostics = Diagnostics {
3013 shard_name: id.to_string(),
3014 handle_purpose: format!("controller data for {}", id),
3015 };
3016
3017 let mut write = persist_client
3018 .open_writer(
3019 shard,
3020 Arc::new(relation_desc),
3021 Arc::new(UnitSchema),
3022 diagnostics.clone(),
3023 )
3024 .await
3025 .expect("invalid persist usage");
3026
3027 write.fetch_recent_upper().await;
3036
3037 write
3038 }
3039
3040 fn register_introspection_collection(
3045 &mut self,
3046 id: GlobalId,
3047 introspection_type: IntrospectionType,
3048 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
3049 persist_client: PersistClient,
3050 ) -> Result<(), StorageError<T>> {
3051 tracing::info!(%id, ?introspection_type, "registering introspection collection");
3052
3053 let force_writable = self.read_only && self.migrated_storage_collections.contains(&id);
3057 if force_writable {
3058 assert!(id.is_system(), "unexpected non-system global id: {id:?}");
3059 info!("writing to migrated storage collection {id} in read-only mode");
3060 }
3061
3062 let prev = self.introspection_ids.insert(introspection_type, id);
3063 assert!(
3064 prev.is_none(),
3065 "cannot have multiple IDs for introspection type"
3066 );
3067
3068 let metadata = self.storage_collections.collection_metadata(id)?.clone();
3069
3070 let read_handle_fn = move || {
3071 let persist_client = persist_client.clone();
3072 let metadata = metadata.clone();
3073
3074 let fut = async move {
3075 let read_handle = persist_client
3076 .open_leased_reader::<SourceData, (), T, StorageDiff>(
3077 metadata.data_shard,
3078 Arc::new(metadata.relation_desc.clone()),
3079 Arc::new(UnitSchema),
3080 Diagnostics {
3081 shard_name: id.to_string(),
3082 handle_purpose: format!("snapshot {}", id),
3083 },
3084 USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3085 )
3086 .await
3087 .expect("invalid persist usage");
3088 read_handle
3089 };
3090
3091 fut.boxed()
3092 };
3093
3094 let recent_upper = write_handle.shared_upper();
3095
3096 match CollectionManagerKind::from(&introspection_type) {
3097 CollectionManagerKind::Differential => {
3102 let statistics_retention_duration =
3103 dyncfgs::STATISTICS_RETENTION_DURATION.get(self.config().config_set());
3104
3105 let introspection_config = DifferentialIntrospectionConfig {
3107 recent_upper,
3108 introspection_type,
3109 storage_collections: Arc::clone(&self.storage_collections),
3110 collection_manager: self.collection_manager.clone(),
3111 source_statistics: Arc::clone(&self.source_statistics),
3112 sink_statistics: Arc::clone(&self.sink_statistics),
3113 statistics_interval: self.config.parameters.statistics_interval.clone(),
3114 statistics_interval_receiver: self.statistics_interval_sender.subscribe(),
3115 statistics_retention_duration,
3116 metrics: self.metrics.clone(),
3117 introspection_tokens: Arc::clone(&self.introspection_tokens),
3118 };
3119 self.collection_manager.register_differential_collection(
3120 id,
3121 write_handle,
3122 read_handle_fn,
3123 force_writable,
3124 introspection_config,
3125 );
3126 }
3127 CollectionManagerKind::AppendOnly => {
3135 let introspection_config = AppendOnlyIntrospectionConfig {
3136 introspection_type,
3137 config_set: Arc::clone(self.config.config_set()),
3138 parameters: self.config.parameters.clone(),
3139 storage_collections: Arc::clone(&self.storage_collections),
3140 };
3141 self.collection_manager.register_append_only_collection(
3142 id,
3143 write_handle,
3144 force_writable,
3145 Some(introspection_config),
3146 );
3147 }
3148 }
3149
3150 Ok(())
3151 }
3152
3153 fn reconcile_dangling_statistics(&self) {
3156 self.source_statistics
3157 .lock()
3158 .expect("poisoned")
3159 .source_statistics
3160 .retain(|(k, _replica_id), _| self.storage_collections.check_exists(*k).is_ok());
3162 self.sink_statistics
3163 .lock()
3164 .expect("poisoned")
3165 .retain(|(k, _replica_id), _| self.export(*k).is_ok());
3166 }
3167
3168 #[instrument(level = "debug")]
3178 fn append_shard_mappings<I>(&self, global_ids: I, diff: Diff)
3179 where
3180 I: Iterator<Item = GlobalId>,
3181 {
3182 mz_ore::soft_assert_or_log!(
3183 diff == Diff::MINUS_ONE || diff == Diff::ONE,
3184 "use 1 for insert or -1 for delete"
3185 );
3186
3187 let id = *self
3188 .introspection_ids
3189 .get(&IntrospectionType::ShardMapping)
3190 .expect("should be registered before this call");
3191
3192 let mut updates = vec![];
3193 let mut row_buf = Row::default();
3195
3196 for global_id in global_ids {
3197 let shard_id = if let Some(collection) = self.collections.get(&global_id) {
3198 collection.collection_metadata.data_shard.clone()
3199 } else {
3200 panic!("unknown global id: {}", global_id);
3201 };
3202
3203 let mut packer = row_buf.packer();
3204 packer.push(Datum::from(global_id.to_string().as_str()));
3205 packer.push(Datum::from(shard_id.to_string().as_str()));
3206 updates.push((row_buf.clone(), diff));
3207 }
3208
3209 self.collection_manager.differential_append(id, updates);
3210 }
3211
3212 fn determine_collection_dependencies(
3214 &self,
3215 self_id: GlobalId,
3216 data_source: &DataSource<T>,
3217 ) -> Result<Vec<GlobalId>, StorageError<T>> {
3218 let dependency = match &data_source {
3219 DataSource::Introspection(_)
3220 | DataSource::Webhook
3221 | DataSource::Table { primary: None }
3222 | DataSource::Progress
3223 | DataSource::Other => vec![],
3224 DataSource::Table {
3225 primary: Some(primary),
3226 } => vec![*primary],
3227 DataSource::IngestionExport { ingestion_id, .. } => {
3228 let source_collection = self.collection(*ingestion_id)?;
3231 let ingestion_remap_collection_id = match &source_collection.data_source {
3232 DataSource::Ingestion(ingestion) => ingestion.remap_collection_id,
3233 _ => unreachable!(
3234 "SourceExport must only refer to primary sources that already exist"
3235 ),
3236 };
3237
3238 vec![self_id, ingestion_remap_collection_id]
3244 }
3245 DataSource::Ingestion(ingestion) => {
3247 vec![self_id, ingestion.remap_collection_id]
3252 }
3253 DataSource::Sink { desc } => {
3254 vec![self_id, desc.sink.from]
3256 }
3257 };
3258
3259 Ok(dependency)
3260 }
3261
3262 async fn read_handle_for_snapshot(
3263 &self,
3264 id: GlobalId,
3265 ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
3266 let metadata = self.storage_collections.collection_metadata(id)?;
3267 read_handle_for_snapshot(&self.persist, id, &metadata).await
3268 }
3269
3270 fn record_status_updates(&mut self, updates: Vec<StatusUpdate>) {
3273 if self.read_only {
3274 return;
3275 }
3276
3277 let mut sink_status_updates = vec![];
3278 let mut source_status_updates = vec![];
3279
3280 for update in updates {
3281 let id = update.id;
3282 if self.export(id).is_ok() {
3283 sink_status_updates.push(update);
3284 } else if self.storage_collections.check_exists(id).is_ok() {
3285 source_status_updates.push(update);
3286 }
3287 }
3288
3289 self.append_status_introspection_updates(
3290 IntrospectionType::SourceStatusHistory,
3291 source_status_updates,
3292 );
3293 self.append_status_introspection_updates(
3294 IntrospectionType::SinkStatusHistory,
3295 sink_status_updates,
3296 );
3297 }
3298
3299 fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, StorageError<T>> {
3300 self.collections
3301 .get(&id)
3302 .ok_or(StorageError::IdentifierMissing(id))
3303 }
3304
3305 fn run_ingestion(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3308 tracing::info!(%id, "starting ingestion");
3309
3310 let collection = self.collection(id)?;
3311 let ingestion_description = match &collection.data_source {
3312 DataSource::Ingestion(i) => i.clone(),
3313 _ => {
3314 tracing::warn!("run_ingestion called on non-ingestion ID {}", id);
3315 Err(StorageError::IdentifierInvalid(id))?
3316 }
3317 };
3318
3319 let mut source_exports = BTreeMap::new();
3321 for (export_id, export) in ingestion_description.source_exports.clone() {
3322 let export_storage_metadata = self.collection(export_id)?.collection_metadata.clone();
3323 source_exports.insert(
3324 export_id,
3325 SourceExport {
3326 storage_metadata: export_storage_metadata,
3327 details: export.details,
3328 data_config: export.data_config,
3329 },
3330 );
3331 }
3332
3333 let remap_collection = self.collection(ingestion_description.remap_collection_id)?;
3334
3335 let description = IngestionDescription::<CollectionMetadata> {
3336 source_exports,
3337 remap_metadata: remap_collection.collection_metadata.clone(),
3338 desc: ingestion_description.desc.clone(),
3340 instance_id: ingestion_description.instance_id,
3341 remap_collection_id: ingestion_description.remap_collection_id,
3342 };
3343
3344 let storage_instance_id = description.instance_id;
3345 let instance = self
3347 .instances
3348 .get_mut(&storage_instance_id)
3349 .ok_or_else(|| StorageError::IngestionInstanceMissing {
3350 storage_instance_id,
3351 ingestion_id: id,
3352 })?;
3353
3354 let augmented_ingestion = Box::new(RunIngestionCommand { id, description });
3355 instance.send(StorageCommand::RunIngestion(augmented_ingestion));
3356
3357 Ok(())
3358 }
3359
3360 fn run_export(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3363 let DataSource::Sink { desc: description } = &self.collections[&id].data_source else {
3364 return Err(StorageError::IdentifierMissing(id));
3365 };
3366
3367 let from_storage_metadata = self
3368 .storage_collections
3369 .collection_metadata(description.sink.from)?;
3370 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
3371
3372 let export_state = self.storage_collections.collection_frontiers(id)?;
3376 let mut as_of = description.sink.as_of.clone();
3377 as_of.join_assign(&export_state.implied_capability);
3378 let with_snapshot = description.sink.with_snapshot
3379 && !PartialOrder::less_than(&as_of, &export_state.write_frontier);
3380
3381 info!(
3382 sink_id = %id,
3383 from_id = %description.sink.from,
3384 write_frontier = ?export_state.write_frontier,
3385 ?as_of,
3386 ?with_snapshot,
3387 "run_export"
3388 );
3389
3390 let cmd = RunSinkCommand {
3391 id,
3392 description: StorageSinkDesc {
3393 from: description.sink.from,
3394 from_desc: description.sink.from_desc.clone(),
3395 connection: description.sink.connection.clone(),
3396 envelope: description.sink.envelope,
3397 as_of,
3398 version: description.sink.version,
3399 from_storage_metadata,
3400 with_snapshot,
3401 to_storage_metadata,
3402 },
3403 };
3404
3405 let storage_instance_id = description.instance_id.clone();
3406
3407 let instance = self
3408 .instances
3409 .get_mut(&storage_instance_id)
3410 .ok_or_else(|| StorageError::ExportInstanceMissing {
3411 storage_instance_id,
3412 export_id: id,
3413 })?;
3414
3415 instance.send(StorageCommand::RunSink(Box::new(cmd)));
3416
3417 Ok(())
3418 }
3419
3420 fn update_frontier_introspection(&mut self) {
3425 let mut global_frontiers = BTreeMap::new();
3426 let mut replica_frontiers = BTreeMap::new();
3427
3428 for collection_frontiers in self.storage_collections.active_collection_frontiers() {
3429 let id = collection_frontiers.id;
3430 let since = collection_frontiers.read_capabilities;
3431 let upper = collection_frontiers.write_frontier;
3432
3433 let instance = self
3434 .collections
3435 .get(&id)
3436 .and_then(|collection_state| match &collection_state.extra_state {
3437 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
3438 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
3439 CollectionStateExtra::None => None,
3440 })
3441 .and_then(|i| self.instances.get(&i));
3442
3443 if let Some(instance) = instance {
3444 for replica_id in instance.replica_ids() {
3445 replica_frontiers.insert((id, replica_id), upper.clone());
3446 }
3447 }
3448
3449 global_frontiers.insert(id, (since, upper));
3450 }
3451
3452 let mut global_updates = Vec::new();
3453 let mut replica_updates = Vec::new();
3454
3455 let mut push_global_update =
3456 |id: GlobalId, (since, upper): (Antichain<T>, Antichain<T>), diff: Diff| {
3457 let read_frontier = since.into_option().map_or(Datum::Null, |t| t.into());
3458 let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3459 let row = Row::pack_slice(&[
3460 Datum::String(&id.to_string()),
3461 read_frontier,
3462 write_frontier,
3463 ]);
3464 global_updates.push((row, diff));
3465 };
3466
3467 let mut push_replica_update =
3468 |(id, replica_id): (GlobalId, ReplicaId), upper: Antichain<T>, diff: Diff| {
3469 let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3470 let row = Row::pack_slice(&[
3471 Datum::String(&id.to_string()),
3472 Datum::String(&replica_id.to_string()),
3473 write_frontier,
3474 ]);
3475 replica_updates.push((row, diff));
3476 };
3477
3478 let mut old_global_frontiers =
3479 std::mem::replace(&mut self.recorded_frontiers, global_frontiers);
3480 for (&id, new) in &self.recorded_frontiers {
3481 match old_global_frontiers.remove(&id) {
3482 Some(old) if &old != new => {
3483 push_global_update(id, new.clone(), Diff::ONE);
3484 push_global_update(id, old, Diff::MINUS_ONE);
3485 }
3486 Some(_) => (),
3487 None => push_global_update(id, new.clone(), Diff::ONE),
3488 }
3489 }
3490 for (id, old) in old_global_frontiers {
3491 push_global_update(id, old, Diff::MINUS_ONE);
3492 }
3493
3494 let mut old_replica_frontiers =
3495 std::mem::replace(&mut self.recorded_replica_frontiers, replica_frontiers);
3496 for (&key, new) in &self.recorded_replica_frontiers {
3497 match old_replica_frontiers.remove(&key) {
3498 Some(old) if &old != new => {
3499 push_replica_update(key, new.clone(), Diff::ONE);
3500 push_replica_update(key, old, Diff::MINUS_ONE);
3501 }
3502 Some(_) => (),
3503 None => push_replica_update(key, new.clone(), Diff::ONE),
3504 }
3505 }
3506 for (key, old) in old_replica_frontiers {
3507 push_replica_update(key, old, Diff::MINUS_ONE);
3508 }
3509
3510 let id = self.introspection_ids[&IntrospectionType::Frontiers];
3511 self.collection_manager
3512 .differential_append(id, global_updates);
3513
3514 let id = self.introspection_ids[&IntrospectionType::ReplicaFrontiers];
3515 self.collection_manager
3516 .differential_append(id, replica_updates);
3517 }
3518
3519 fn refresh_wallclock_lag(&mut self) {
3538 let now_ms = (self.now)();
3539 let histogram_period =
3540 WallclockLagHistogramPeriod::from_epoch_millis(now_ms, self.config.config_set());
3541
3542 let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
3543 Some(ts) => (self.wallclock_lag)(ts.clone()),
3544 None => Duration::ZERO,
3545 };
3546
3547 for frontiers in self.storage_collections.active_collection_frontiers() {
3548 let id = frontiers.id;
3549 let Some(collection) = self.collections.get_mut(&id) else {
3550 continue;
3551 };
3552
3553 let collection_unreadable =
3554 PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
3555 let lag = if collection_unreadable {
3556 WallclockLag::Undefined
3557 } else {
3558 let lag = frontier_lag(&frontiers.write_frontier);
3559 WallclockLag::Seconds(lag.as_secs())
3560 };
3561
3562 collection.wallclock_lag_max = collection.wallclock_lag_max.max(lag);
3563
3564 let secs = lag.unwrap_seconds_or(u64::MAX);
3567 collection.wallclock_lag_metrics.observe(secs);
3568
3569 if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
3570 let bucket = lag.map_seconds(|secs| secs.next_power_of_two());
3571
3572 let instance_id = match &collection.extra_state {
3573 CollectionStateExtra::Ingestion(i) => Some(i.instance_id),
3574 CollectionStateExtra::Export(e) => Some(e.cluster_id()),
3575 CollectionStateExtra::None => None,
3576 };
3577 let workload_class = instance_id
3578 .and_then(|id| self.instances.get(&id))
3579 .and_then(|i| i.workload_class.clone());
3580 let labels = match workload_class {
3581 Some(wc) => [("workload_class", wc.clone())].into(),
3582 None => BTreeMap::new(),
3583 };
3584
3585 let key = (histogram_period, bucket, labels);
3586 *stash.entry(key).or_default() += Diff::ONE;
3587 }
3588 }
3589
3590 self.maybe_record_wallclock_lag();
3592 }
3593
3594 fn maybe_record_wallclock_lag(&mut self) {
3602 if self.read_only {
3603 return;
3604 }
3605
3606 let duration_trunc = |datetime: DateTime<_>, interval| {
3607 let td = TimeDelta::from_std(interval).ok()?;
3608 datetime.duration_trunc(td).ok()
3609 };
3610
3611 let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(self.config.config_set());
3612 let now_dt = mz_ore::now::to_datetime((self.now)());
3613 let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
3614 soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
3615 let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
3616 duration_trunc(now_dt, *default).unwrap()
3617 });
3618 if now_trunc <= self.wallclock_lag_last_recorded {
3619 return;
3620 }
3621
3622 let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
3623
3624 let mut history_updates = Vec::new();
3625 let mut histogram_updates = Vec::new();
3626 let mut row_buf = Row::default();
3627 for frontiers in self.storage_collections.active_collection_frontiers() {
3628 let id = frontiers.id;
3629 let Some(collection) = self.collections.get_mut(&id) else {
3630 continue;
3631 };
3632
3633 let max_lag = std::mem::replace(&mut collection.wallclock_lag_max, WallclockLag::MIN);
3634 let row = Row::pack_slice(&[
3635 Datum::String(&id.to_string()),
3636 Datum::Null,
3637 max_lag.into_interval_datum(),
3638 Datum::TimestampTz(now_ts),
3639 ]);
3640 history_updates.push((row, Diff::ONE));
3641
3642 let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
3643 continue;
3644 };
3645
3646 for ((period, lag, labels), count) in std::mem::take(stash) {
3647 let mut packer = row_buf.packer();
3648 packer.extend([
3649 Datum::TimestampTz(period.start),
3650 Datum::TimestampTz(period.end),
3651 Datum::String(&id.to_string()),
3652 lag.into_uint64_datum(),
3653 ]);
3654 let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
3655 packer.push_dict(labels);
3656
3657 histogram_updates.push((row_buf.clone(), count));
3658 }
3659 }
3660
3661 if !history_updates.is_empty() {
3662 self.append_introspection_updates(
3663 IntrospectionType::WallclockLagHistory,
3664 history_updates,
3665 );
3666 }
3667 if !histogram_updates.is_empty() {
3668 self.append_introspection_updates(
3669 IntrospectionType::WallclockLagHistogram,
3670 histogram_updates,
3671 );
3672 }
3673
3674 self.wallclock_lag_last_recorded = now_trunc;
3675 }
3676
3677 fn maintain(&mut self) {
3682 self.update_frontier_introspection();
3683 self.refresh_wallclock_lag();
3684
3685 for instance in self.instances.values_mut() {
3687 instance.refresh_state_metrics();
3688 }
3689 }
3690}
3691
3692impl From<&IntrospectionType> for CollectionManagerKind {
3693 fn from(value: &IntrospectionType) -> Self {
3694 match value {
3695 IntrospectionType::ShardMapping
3696 | IntrospectionType::Frontiers
3697 | IntrospectionType::ReplicaFrontiers
3698 | IntrospectionType::StorageSourceStatistics
3699 | IntrospectionType::StorageSinkStatistics
3700 | IntrospectionType::ComputeDependencies
3701 | IntrospectionType::ComputeOperatorHydrationStatus
3702 | IntrospectionType::ComputeMaterializedViewRefreshes
3703 | IntrospectionType::ComputeErrorCounts
3704 | IntrospectionType::ComputeHydrationTimes => CollectionManagerKind::Differential,
3705
3706 IntrospectionType::SourceStatusHistory
3707 | IntrospectionType::SinkStatusHistory
3708 | IntrospectionType::PrivatelinkConnectionStatusHistory
3709 | IntrospectionType::ReplicaStatusHistory
3710 | IntrospectionType::ReplicaMetricsHistory
3711 | IntrospectionType::WallclockLagHistory
3712 | IntrospectionType::WallclockLagHistogram
3713 | IntrospectionType::PreparedStatementHistory
3714 | IntrospectionType::StatementExecutionHistory
3715 | IntrospectionType::SessionHistory
3716 | IntrospectionType::StatementLifecycleHistory
3717 | IntrospectionType::SqlText => CollectionManagerKind::AppendOnly,
3718 }
3719 }
3720}
3721
3722async fn snapshot_statistics<T>(
3728 id: GlobalId,
3729 upper: Antichain<T>,
3730 storage_collections: &Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
3731) -> Vec<Row>
3732where
3733 T: Codec64 + From<EpochMillis> + TimestampManipulation,
3734{
3735 match upper.as_option() {
3736 Some(f) if f > &T::minimum() => {
3737 let as_of = f.step_back().unwrap();
3738
3739 let snapshot = storage_collections.snapshot(id, as_of).await.unwrap();
3740 snapshot
3741 .into_iter()
3742 .map(|(row, diff)| {
3743 assert_eq!(diff, 1);
3744 row
3745 })
3746 .collect()
3747 }
3748 _ => Vec::new(),
3751 }
3752}
3753
3754async fn read_handle_for_snapshot<T>(
3755 persist: &PersistClientCache,
3756 id: GlobalId,
3757 metadata: &CollectionMetadata,
3758) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>>
3759where
3760 T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
3761{
3762 let persist_client = persist
3763 .open(metadata.persist_location.clone())
3764 .await
3765 .unwrap();
3766
3767 let read_handle = persist_client
3772 .open_leased_reader::<SourceData, (), _, _>(
3773 metadata.data_shard,
3774 Arc::new(metadata.relation_desc.clone()),
3775 Arc::new(UnitSchema),
3776 Diagnostics {
3777 shard_name: id.to_string(),
3778 handle_purpose: format!("snapshot {}", id),
3779 },
3780 USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3781 )
3782 .await
3783 .expect("invalid persist usage");
3784 Ok(read_handle)
3785}
3786
3787#[derive(Debug)]
3789struct CollectionState<T: TimelyTimestamp> {
3790 pub data_source: DataSource<T>,
3792
3793 pub collection_metadata: CollectionMetadata,
3794
3795 pub extra_state: CollectionStateExtra<T>,
3796
3797 wallclock_lag_max: WallclockLag,
3799 wallclock_lag_histogram_stash: Option<
3806 BTreeMap<
3807 (
3808 WallclockLagHistogramPeriod,
3809 WallclockLag,
3810 BTreeMap<&'static str, String>,
3811 ),
3812 Diff,
3813 >,
3814 >,
3815 wallclock_lag_metrics: WallclockLagMetrics,
3817}
3818
3819impl<T: TimelyTimestamp> CollectionState<T> {
3820 fn new(
3821 data_source: DataSource<T>,
3822 collection_metadata: CollectionMetadata,
3823 extra_state: CollectionStateExtra<T>,
3824 wallclock_lag_metrics: WallclockLagMetrics,
3825 ) -> Self {
3826 let wallclock_lag_histogram_stash = match &data_source {
3830 DataSource::Other => None,
3831 _ => Some(Default::default()),
3832 };
3833
3834 Self {
3835 data_source,
3836 collection_metadata,
3837 extra_state,
3838 wallclock_lag_max: WallclockLag::MIN,
3839 wallclock_lag_histogram_stash,
3840 wallclock_lag_metrics,
3841 }
3842 }
3843}
3844
3845#[derive(Debug)]
3847enum CollectionStateExtra<T: TimelyTimestamp> {
3848 Ingestion(IngestionState<T>),
3849 Export(ExportState<T>),
3850 None,
3851}
3852
3853#[derive(Debug)]
3855struct IngestionState<T: TimelyTimestamp> {
3856 pub read_capabilities: MutableAntichain<T>,
3858
3859 pub derived_since: Antichain<T>,
3862
3863 pub dependency_read_holds: Vec<ReadHold<T>>,
3865
3866 pub write_frontier: Antichain<T>,
3868
3869 pub hold_policy: ReadPolicy<T>,
3876
3877 pub instance_id: StorageInstanceId,
3879
3880 pub hydrated_on: BTreeSet<ReplicaId>,
3882}
3883
3884struct StatusHistoryDesc<K> {
3889 retention_policy: StatusHistoryRetentionPolicy,
3890 extract_key: Box<dyn Fn(&[Datum]) -> K + Send>,
3891 extract_time: Box<dyn Fn(&[Datum]) -> CheckedTimestamp<DateTime<Utc>> + Send>,
3892}
3893enum StatusHistoryRetentionPolicy {
3894 LastN(usize),
3896 TimeWindow(Duration),
3898}
3899
3900fn source_status_history_desc(
3901 params: &StorageParameters,
3902) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3903 let desc = &MZ_SOURCE_STATUS_HISTORY_DESC;
3904 let (source_id_idx, _) = desc.get_by_name(&"source_id".into()).expect("exists");
3905 let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3906 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3907
3908 StatusHistoryDesc {
3909 retention_policy: StatusHistoryRetentionPolicy::LastN(
3910 params.keep_n_source_status_history_entries,
3911 ),
3912 extract_key: Box::new(move |datums| {
3913 (
3914 GlobalId::from_str(datums[source_id_idx].unwrap_str()).expect("GlobalId column"),
3915 if datums[replica_id_idx].is_null() {
3916 None
3917 } else {
3918 Some(
3919 ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3920 .expect("ReplicaId column"),
3921 )
3922 },
3923 )
3924 }),
3925 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3926 }
3927}
3928
3929fn sink_status_history_desc(
3930 params: &StorageParameters,
3931) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3932 let desc = &MZ_SINK_STATUS_HISTORY_DESC;
3933 let (sink_id_idx, _) = desc.get_by_name(&"sink_id".into()).expect("exists");
3934 let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3935 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3936
3937 StatusHistoryDesc {
3938 retention_policy: StatusHistoryRetentionPolicy::LastN(
3939 params.keep_n_sink_status_history_entries,
3940 ),
3941 extract_key: Box::new(move |datums| {
3942 (
3943 GlobalId::from_str(datums[sink_id_idx].unwrap_str()).expect("GlobalId column"),
3944 if datums[replica_id_idx].is_null() {
3945 None
3946 } else {
3947 Some(
3948 ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3949 .expect("ReplicaId column"),
3950 )
3951 },
3952 )
3953 }),
3954 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3955 }
3956}
3957
3958fn privatelink_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<GlobalId> {
3959 let desc = &MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC;
3960 let (key_idx, _) = desc.get_by_name(&"connection_id".into()).expect("exists");
3961 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3962
3963 StatusHistoryDesc {
3964 retention_policy: StatusHistoryRetentionPolicy::LastN(
3965 params.keep_n_privatelink_status_history_entries,
3966 ),
3967 extract_key: Box::new(move |datums| {
3968 GlobalId::from_str(datums[key_idx].unwrap_str()).expect("GlobalId column")
3969 }),
3970 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3971 }
3972}
3973
3974fn replica_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<(GlobalId, u64)> {
3975 let desc = &REPLICA_STATUS_HISTORY_DESC;
3976 let (replica_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3977 let (process_idx, _) = desc.get_by_name(&"process_id".into()).expect("exists");
3978 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3979
3980 StatusHistoryDesc {
3981 retention_policy: StatusHistoryRetentionPolicy::TimeWindow(
3982 params.replica_status_history_retention_window,
3983 ),
3984 extract_key: Box::new(move |datums| {
3985 (
3986 GlobalId::from_str(datums[replica_idx].unwrap_str()).expect("GlobalId column"),
3987 datums[process_idx].unwrap_uint64(),
3988 )
3989 }),
3990 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3991 }
3992}
3993
3994fn swap_updates<T: Timestamp>(
3996 from: &mut Antichain<T>,
3997 mut replace_with: Antichain<T>,
3998) -> ChangeBatch<T> {
3999 let mut update = ChangeBatch::new();
4000 if PartialOrder::less_equal(from, &replace_with) {
4001 update.extend(replace_with.iter().map(|time| (time.clone(), 1)));
4002 std::mem::swap(from, &mut replace_with);
4003 update.extend(replace_with.iter().map(|time| (time.clone(), -1)));
4004 }
4005 update
4006}