1use std::any::Any;
13use std::collections::btree_map;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt::Debug;
16use std::str::FromStr;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use crate::collection_mgmt::{
21 AppendOnlyIntrospectionConfig, CollectionManagerKind, DifferentialIntrospectionConfig,
22};
23use crate::instance::{Instance, ReplicaConfig};
24use async_trait::async_trait;
25use chrono::{DateTime, DurationRound, TimeDelta, Utc};
26use derivative::Derivative;
27use differential_dataflow::lattice::Lattice;
28use futures::FutureExt;
29use futures::StreamExt;
30use itertools::Itertools;
31use mz_build_info::BuildInfo;
32use mz_cluster_client::client::ClusterReplicaLocation;
33use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
34use mz_cluster_client::{ReplicaId, WallclockLagFn};
35use mz_controller_types::dyncfgs::{
36 ENABLE_0DT_DEPLOYMENT_SOURCES, WALLCLOCK_LAG_RECORDING_INTERVAL,
37};
38use mz_ore::collections::CollectionExt;
39use mz_ore::metrics::MetricsRegistry;
40use mz_ore::now::NowFn;
41use mz_ore::task::AbortOnDropHandle;
42use mz_ore::{assert_none, halt, instrument, soft_panic_or_log};
43use mz_persist_client::batch::ProtoBatch;
44use mz_persist_client::cache::PersistClientCache;
45use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
46use mz_persist_client::critical::Opaque;
47use mz_persist_client::read::ReadHandle;
48use mz_persist_client::schema::CaESchema;
49use mz_persist_client::write::WriteHandle;
50use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
51use mz_persist_types::codec_impls::UnitSchema;
52use mz_repr::adt::timestamp::CheckedTimestamp;
53use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, Timestamp};
54use mz_storage_client::client::{
55 AppendOnlyUpdate, RunIngestionCommand, RunOneshotIngestion, RunSinkCommand, Status,
56 StatusUpdate, StorageCommand, StorageResponse, TableData,
57};
58use mz_storage_client::controller::{
59 BoxFuture, CollectionDescription, DataSource, ExportDescription, ExportState,
60 IntrospectionType, MonotonicAppender, PersistEpoch, Response, StorageController,
61 StorageMetadata, StorageTxn, StorageWriteOp, WallclockLag, WallclockLagHistogramPeriod,
62};
63use mz_storage_client::healthcheck::{
64 MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC,
65 MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_STATUS_HISTORY_DESC,
66};
67use mz_storage_client::metrics::StorageControllerMetrics;
68use mz_storage_client::statistics::{
69 ControllerSinkStatistics, ControllerSourceStatistics, WebhookStatistics,
70};
71use mz_storage_client::storage_collections::StorageCollections;
72use mz_storage_types::configuration::StorageConfiguration;
73use mz_storage_types::connections::ConnectionContext;
74use mz_storage_types::connections::inline::InlinedConnection;
75use mz_storage_types::controller::{AlterError, CollectionMetadata, StorageError, TxnsCodecRow};
76use mz_storage_types::errors::CollectionMissing;
77use mz_storage_types::instances::StorageInstanceId;
78use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
79use mz_storage_types::parameters::StorageParameters;
80use mz_storage_types::read_holds::ReadHold;
81use mz_storage_types::read_policy::ReadPolicy;
82use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
83use mz_storage_types::sources::{
84 GenericSourceConnection, IngestionDescription, SourceConnection, SourceData, SourceDesc,
85 SourceExport, SourceExportDataConfig,
86};
87use mz_storage_types::{AlterCompatible, StorageDiff, dyncfgs};
88use mz_txn_wal::metrics::Metrics as TxnMetrics;
89use mz_txn_wal::txn_read::TxnsRead;
90use mz_txn_wal::txns::TxnsHandle;
91use timely::order::PartialOrder;
92use timely::progress::frontier::MutableAntichain;
93use timely::progress::{Antichain, ChangeBatch};
94use tokio::sync::watch::{Sender, channel};
95use tokio::sync::{mpsc, oneshot};
96use tokio::time::MissedTickBehavior;
97use tokio::time::error::Elapsed;
98use tracing::{debug, info, warn};
99
100mod collection_mgmt;
101mod history;
102mod instance;
103mod persist_handles;
104mod rtr;
105mod statistics;
106
107#[derive(Derivative)]
108#[derivative(Debug)]
109struct PendingOneshotIngestion {
110 #[derivative(Debug = "ignore")]
112 result_tx: OneshotResultCallback<ProtoBatch>,
113 cluster_id: StorageInstanceId,
115}
116
117impl PendingOneshotIngestion {
118 pub(crate) fn cancel(self) {
122 (self.result_tx)(vec![Err("canceled".to_string())])
123 }
124}
125
126#[derive(Derivative)]
128#[derivative(Debug)]
129pub struct Controller {
130 build_info: &'static BuildInfo,
132 now: NowFn,
134
135 read_only: bool,
141
142 pub(crate) collections: BTreeMap<GlobalId, CollectionState>,
147
148 dropped_objects: BTreeMap<GlobalId, BTreeSet<ReplicaId>>,
157
158 pub(crate) persist_table_worker: persist_handles::PersistTableWriteWorker,
160 txns_read: TxnsRead<Timestamp>,
162 txns_metrics: Arc<TxnMetrics>,
163 stashed_responses: Vec<(Option<ReplicaId>, StorageResponse)>,
164 #[derivative(Debug = "ignore")]
166 pending_table_handle_drops_tx: mpsc::UnboundedSender<GlobalId>,
167 #[derivative(Debug = "ignore")]
169 pending_table_handle_drops_rx: mpsc::UnboundedReceiver<GlobalId>,
170 #[derivative(Debug = "ignore")]
172 pending_oneshot_ingestions: BTreeMap<uuid::Uuid, PendingOneshotIngestion>,
173
174 pub(crate) collection_manager: collection_mgmt::CollectionManager,
176
177 pub(crate) introspection_ids: BTreeMap<IntrospectionType, GlobalId>,
179 introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
184
185 source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
190 sink_statistics: Arc<Mutex<BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSinkStatistics>>>,
193 statistics_interval_sender: Sender<Duration>,
195
196 instances: BTreeMap<StorageInstanceId, Instance>,
198 initialized: bool,
200 config: StorageConfiguration,
202 persist_location: PersistLocation,
204 persist: Arc<PersistClientCache>,
206 metrics: StorageControllerMetrics,
208 recorded_frontiers: BTreeMap<GlobalId, (Antichain<Timestamp>, Antichain<Timestamp>)>,
211 recorded_replica_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<Timestamp>>,
214
215 #[derivative(Debug = "ignore")]
217 wallclock_lag: WallclockLagFn<Timestamp>,
218 wallclock_lag_last_recorded: DateTime<Utc>,
220
221 storage_collections: Arc<dyn StorageCollections + Send + Sync>,
223 migrated_storage_collections: BTreeSet<GlobalId>,
225
226 maintenance_ticker: tokio::time::Interval,
228 maintenance_scheduled: bool,
230
231 instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse)>,
233 instance_response_rx: mpsc::UnboundedReceiver<(Option<ReplicaId>, StorageResponse)>,
235
236 persist_warm_task: Option<AbortOnDropHandle<Box<dyn Debug + Send>>>,
238}
239
240fn warm_persist_state_in_background(
245 client: PersistClient,
246 shard_ids: impl Iterator<Item = ShardId> + Send + 'static,
247) -> mz_ore::task::JoinHandle<Box<dyn Debug + Send>> {
248 const MAX_CONCURRENT_WARMS: usize = 16;
250 let logic = async move {
251 let fetchers: Vec<_> = tokio_stream::iter(shard_ids)
252 .map(|shard_id| {
253 let client = client.clone();
254 async move {
255 client
256 .create_batch_fetcher::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
257 shard_id,
258 Arc::new(RelationDesc::empty()),
259 Arc::new(UnitSchema),
260 true,
261 Diagnostics::from_purpose("warm persist load state"),
262 )
263 .await
264 }
265 })
266 .buffer_unordered(MAX_CONCURRENT_WARMS)
267 .collect()
268 .await;
269 let fetchers: Box<dyn Debug + Send> = Box::new(fetchers);
270 fetchers
271 };
272 mz_ore::task::spawn(|| "warm_persist_load_state", logic)
273}
274
275#[async_trait(?Send)]
276impl StorageController for Controller {
277 fn initialization_complete(&mut self) {
278 self.reconcile_dangling_statistics();
279 self.initialized = true;
280
281 for instance in self.instances.values_mut() {
282 instance.send(StorageCommand::InitializationComplete);
283 }
284 }
285
286 fn update_parameters(&mut self, config_params: StorageParameters) {
287 self.storage_collections
288 .update_parameters(config_params.clone());
289
290 self.persist.cfg().apply_from(&config_params.dyncfg_updates);
293
294 for instance in self.instances.values_mut() {
295 let params = Box::new(config_params.clone());
296 instance.send(StorageCommand::UpdateConfiguration(params));
297 }
298 self.config.update(config_params);
299 self.statistics_interval_sender
300 .send_replace(self.config.parameters.statistics_interval);
301 self.collection_manager.update_user_batch_duration(
302 self.config
303 .parameters
304 .user_storage_managed_collections_batch_duration,
305 );
306 }
307
308 fn config(&self) -> &StorageConfiguration {
310 &self.config
311 }
312
313 fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing> {
314 self.storage_collections.collection_metadata(id)
315 }
316
317 fn collection_hydrated(&self, collection_id: GlobalId) -> Result<bool, StorageError> {
318 let collection = self.collection(collection_id)?;
319
320 let instance_id = match &collection.data_source {
321 DataSource::Ingestion(ingestion_description) => ingestion_description.instance_id,
322 DataSource::IngestionExport { ingestion_id, .. } => {
323 let ingestion_state = self.collections.get(ingestion_id).expect("known to exist");
324
325 let instance_id = match &ingestion_state.data_source {
326 DataSource::Ingestion(ingestion_desc) => ingestion_desc.instance_id,
327 _ => unreachable!("SourceExport must only refer to primary source"),
328 };
329
330 instance_id
331 }
332 _ => return Ok(true),
333 };
334
335 let instance = self.instances.get(&instance_id).ok_or_else(|| {
336 StorageError::IngestionInstanceMissing {
337 storage_instance_id: instance_id,
338 ingestion_id: collection_id,
339 }
340 })?;
341
342 if instance.replica_ids().next().is_none() {
343 return Ok(true);
346 }
347
348 match &collection.extra_state {
349 CollectionStateExtra::Ingestion(ingestion_state) => {
350 Ok(ingestion_state.hydrated_on.len() >= 1)
352 }
353 CollectionStateExtra::Export(_) => {
354 Ok(true)
359 }
360 CollectionStateExtra::None => {
361 Ok(true)
365 }
366 }
367 }
368
369 #[mz_ore::instrument(level = "debug")]
370 fn collections_hydrated_on_replicas(
371 &self,
372 target_replica_ids: Option<Vec<ReplicaId>>,
373 target_cluster_id: &StorageInstanceId,
374 exclude_collections: &BTreeSet<GlobalId>,
375 ) -> Result<bool, StorageError> {
376 if target_replica_ids.as_ref().is_some_and(|v| v.is_empty()) {
379 return Ok(true);
380 }
381
382 let target_replicas: Option<BTreeSet<ReplicaId>> =
385 target_replica_ids.map(|ids| ids.into_iter().collect());
386
387 let mut all_hydrated = true;
388 for (collection_id, collection_state) in self.collections.iter() {
389 if collection_id.is_transient() || exclude_collections.contains(collection_id) {
390 continue;
391 }
392 let hydrated = match &collection_state.extra_state {
393 CollectionStateExtra::Ingestion(state) => {
394 if &state.instance_id != target_cluster_id {
395 continue;
396 }
397 match &target_replicas {
398 Some(target_replicas) => !state.hydrated_on.is_disjoint(target_replicas),
399 None => {
400 state.hydrated_on.len() >= 1
403 }
404 }
405 }
406 CollectionStateExtra::Export(_) => {
407 true
412 }
413 CollectionStateExtra::None => {
414 true
418 }
419 };
420 if !hydrated {
421 tracing::info!(%collection_id, "collection is not hydrated on any replica");
422 all_hydrated = false;
423 }
426 }
427 Ok(all_hydrated)
428 }
429
430 fn collection_frontiers(
431 &self,
432 id: GlobalId,
433 ) -> Result<(Antichain<Timestamp>, Antichain<Timestamp>), CollectionMissing> {
434 let frontiers = self.storage_collections.collection_frontiers(id)?;
435 Ok((frontiers.implied_capability, frontiers.write_frontier))
436 }
437
438 fn collections_frontiers(
439 &self,
440 mut ids: Vec<GlobalId>,
441 ) -> Result<Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)>, CollectionMissing>
442 {
443 let mut result = vec![];
444 ids.retain(|&id| match self.export(id) {
449 Ok(export) => {
450 result.push((
451 id,
452 export.input_hold().since().clone(),
453 export.write_frontier.clone(),
454 ));
455 false
456 }
457 Err(_) => true,
458 });
459 result.extend(
460 self.storage_collections
461 .collections_frontiers(ids)?
462 .into_iter()
463 .map(|frontiers| {
464 (
465 frontiers.id,
466 frontiers.implied_capability,
467 frontiers.write_frontier,
468 )
469 }),
470 );
471
472 Ok(result)
473 }
474
475 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
476 self.storage_collections.active_collection_metadatas()
477 }
478
479 fn active_ingestion_exports(
480 &self,
481 instance_id: StorageInstanceId,
482 ) -> Box<dyn Iterator<Item = &GlobalId> + '_> {
483 let active_storage_collections: BTreeMap<_, _> = self
484 .storage_collections
485 .active_collection_frontiers()
486 .into_iter()
487 .map(|c| (c.id, c))
488 .collect();
489
490 let active_exports = self.instances[&instance_id]
491 .active_ingestion_exports()
492 .filter(move |id| {
493 let frontiers = active_storage_collections.get(id);
494 match frontiers {
495 Some(frontiers) => !frontiers.write_frontier.is_empty(),
496 None => {
497 false
499 }
500 }
501 });
502
503 Box::new(active_exports)
504 }
505
506 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError> {
507 self.storage_collections.check_exists(id)
508 }
509
510 fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>) {
511 let metrics = self.metrics.for_instance(id);
512 let mut instance = Instance::new(
513 workload_class,
514 metrics,
515 self.now.clone(),
516 self.instance_response_tx.clone(),
517 );
518 if self.initialized {
519 instance.send(StorageCommand::InitializationComplete);
520 }
521 if !self.read_only {
522 instance.send(StorageCommand::AllowWrites);
523 }
524
525 let params = Box::new(self.config.parameters.clone());
526 instance.send(StorageCommand::UpdateConfiguration(params));
527
528 let old_instance = self.instances.insert(id, instance);
529 assert_none!(old_instance, "storage instance {id} already exists");
530 }
531
532 fn drop_instance(&mut self, id: StorageInstanceId) {
533 let instance = self.instances.remove(&id);
534 assert!(instance.is_some(), "storage instance {id} does not exist");
535 }
536
537 fn update_instance_workload_class(
538 &mut self,
539 id: StorageInstanceId,
540 workload_class: Option<String>,
541 ) {
542 let instance = self
543 .instances
544 .get_mut(&id)
545 .unwrap_or_else(|| panic!("instance {id} does not exist"));
546
547 instance.workload_class = workload_class;
548 }
549
550 fn connect_replica(
551 &mut self,
552 instance_id: StorageInstanceId,
553 replica_id: ReplicaId,
554 location: ClusterReplicaLocation,
555 ) {
556 let instance = self
557 .instances
558 .get_mut(&instance_id)
559 .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
560
561 let config = ReplicaConfig {
562 build_info: self.build_info,
563 location,
564 grpc_client: self.config.parameters.grpc_client.clone(),
565 };
566 instance.add_replica(replica_id, config);
567 }
568
569 fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId) {
570 let instance = self
571 .instances
572 .get_mut(&instance_id)
573 .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
574
575 let status_now = mz_ore::now::to_datetime((self.now)());
576 let mut source_status_updates = vec![];
577 let mut sink_status_updates = vec![];
578
579 let make_update = |id, object_type| StatusUpdate {
580 id,
581 status: Status::Paused,
582 timestamp: status_now,
583 error: None,
584 hints: BTreeSet::from([format!(
585 "The replica running this {object_type} has been dropped"
586 )]),
587 namespaced_errors: Default::default(),
588 replica_id: Some(replica_id),
589 };
590
591 for ingestion_id in instance.active_ingestions() {
592 if let Some(active_replicas) = self.dropped_objects.get_mut(ingestion_id) {
593 active_replicas.remove(&replica_id);
594 if active_replicas.is_empty() {
595 self.dropped_objects.remove(ingestion_id);
596 }
597 }
598
599 let ingestion = self
600 .collections
601 .get_mut(ingestion_id)
602 .expect("instance contains unknown ingestion");
603
604 let ingestion_description = match &ingestion.data_source {
605 DataSource::Ingestion(ingestion_description) => ingestion_description.clone(),
606 _ => panic!(
607 "unexpected data source for ingestion: {:?}",
608 ingestion.data_source
609 ),
610 };
611
612 let old_style_ingestion = *ingestion_id != ingestion_description.remap_collection_id;
613 let subsource_ids = ingestion_description.collection_ids().filter(|id| {
614 let should_discard =
619 old_style_ingestion && id == &ingestion_description.remap_collection_id;
620 !should_discard
621 });
622 for id in subsource_ids {
623 source_status_updates.push(make_update(id, "source"));
624 }
625 }
626
627 for id in instance.active_exports() {
628 if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
629 active_replicas.remove(&replica_id);
630 if active_replicas.is_empty() {
631 self.dropped_objects.remove(id);
632 }
633 }
634
635 sink_status_updates.push(make_update(*id, "sink"));
636 }
637
638 instance.drop_replica(replica_id);
639
640 if !self.read_only {
641 if !source_status_updates.is_empty() {
642 self.append_status_introspection_updates(
643 IntrospectionType::SourceStatusHistory,
644 source_status_updates,
645 );
646 }
647 if !sink_status_updates.is_empty() {
648 self.append_status_introspection_updates(
649 IntrospectionType::SinkStatusHistory,
650 sink_status_updates,
651 );
652 }
653 }
654 }
655
656 async fn evolve_nullability_for_bootstrap(
657 &mut self,
658 storage_metadata: &StorageMetadata,
659 collections: Vec<(GlobalId, RelationDesc)>,
660 ) -> Result<(), StorageError> {
661 let persist_client = self
662 .persist
663 .open(self.persist_location.clone())
664 .await
665 .unwrap();
666
667 for (global_id, relation_desc) in collections {
668 let shard_id = storage_metadata.get_collection_shard(global_id)?;
669 let diagnostics = Diagnostics {
670 shard_name: global_id.to_string(),
671 handle_purpose: "evolve nullability for bootstrap".to_string(),
672 };
673 let latest_schema = persist_client
674 .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics)
675 .await
676 .expect("invalid persist usage");
677 let Some((schema_id, current_schema, _)) = latest_schema else {
678 tracing::debug!(?global_id, "no schema registered");
679 continue;
680 };
681 tracing::debug!(?global_id, ?current_schema, new_schema = ?relation_desc, "migrating schema");
682
683 let diagnostics = Diagnostics {
684 shard_name: global_id.to_string(),
685 handle_purpose: "evolve nullability for bootstrap".to_string(),
686 };
687 let evolve_result = persist_client
688 .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
689 shard_id,
690 schema_id,
691 &relation_desc,
692 &UnitSchema,
693 diagnostics,
694 )
695 .await
696 .expect("invalid persist usage");
697 match evolve_result {
698 CaESchema::Ok(_) => (),
699 CaESchema::ExpectedMismatch {
700 schema_id,
701 key,
702 val: _,
703 } => {
704 return Err(StorageError::PersistSchemaEvolveRace {
705 global_id,
706 shard_id,
707 schema_id,
708 relation_desc: key,
709 });
710 }
711 CaESchema::Incompatible => {
712 return Err(StorageError::PersistInvalidSchemaEvolve {
713 global_id,
714 shard_id,
715 });
716 }
717 };
718 }
719
720 Ok(())
721 }
722
723 #[instrument(name = "storage::create_collections")]
742 async fn create_collections_for_bootstrap(
743 &mut self,
744 storage_metadata: &StorageMetadata,
745 register_ts: Option<Timestamp>,
746 mut collections: Vec<(GlobalId, CollectionDescription)>,
747 migrated_storage_collections: &BTreeSet<GlobalId>,
748 ) -> Result<(), StorageError> {
749 self.migrated_storage_collections
750 .extend(migrated_storage_collections.iter().cloned());
751
752 self.storage_collections
753 .create_collections_for_bootstrap(
754 storage_metadata,
755 register_ts,
756 collections.clone(),
757 migrated_storage_collections,
758 )
759 .await?;
760
761 drop(self.persist_warm_task.take());
764
765 collections.sort_by_key(|(id, _)| *id);
770 collections.dedup();
771 for pos in 1..collections.len() {
772 if collections[pos - 1].0 == collections[pos].0 {
773 return Err(StorageError::CollectionIdReused(collections[pos].0));
774 }
775 }
776
777 let enriched_with_metadata = collections
779 .into_iter()
780 .map(|(id, description)| {
781 let data_shard = storage_metadata.get_collection_shard(id)?;
782
783 let txns_shard = description
786 .data_source
787 .in_txns()
788 .then(|| *self.txns_read.txns_id());
789
790 let metadata = CollectionMetadata {
791 persist_location: self.persist_location.clone(),
792 data_shard,
793 relation_desc: description.desc.clone(),
794 txns_shard,
795 };
796
797 Ok((id, description, metadata))
798 })
799 .collect_vec();
800
801 let persist_client = self
803 .persist
804 .open(self.persist_location.clone())
805 .await
806 .unwrap();
807 let persist_client = &persist_client;
808
809 use futures::stream::{StreamExt, TryStreamExt};
812 let this = &*self;
813 let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
814 .map(|data: Result<_, StorageError>| {
815 async move {
816 let (id, description, metadata) = data?;
817
818 debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
821
822 let write = this
823 .open_data_handles(
824 &id,
825 metadata.data_shard,
826 metadata.relation_desc.clone(),
827 persist_client,
828 )
829 .await;
830
831 Ok::<_, StorageError>((id, description, write, metadata))
832 }
833 })
834 .buffer_unordered(50)
836 .try_collect()
849 .await?;
850
851 let mut to_execute = BTreeSet::new();
854 let mut new_collections = BTreeSet::new();
859 let mut table_registers = Vec::with_capacity(to_register.len());
860
861 to_register.sort_by_key(|(id, ..)| *id);
863
864 let (tables_to_register, collections_to_register): (Vec<_>, Vec<_>) = to_register
870 .into_iter()
871 .partition(|(_id, desc, ..)| desc.data_source == DataSource::Table);
872 let to_register = tables_to_register
873 .into_iter()
874 .rev()
875 .chain(collections_to_register.into_iter());
876
877 let mut new_webhook_statistic_entries = BTreeSet::new();
881
882 for (id, description, write, metadata) in to_register {
883 let is_in_txns = |id, metadata: &CollectionMetadata| {
884 metadata.txns_shard.is_some()
885 && !(self.read_only && migrated_storage_collections.contains(&id))
886 };
887
888 to_execute.insert(id);
889 new_collections.insert(id);
890
891 let write_frontier = write.upper();
892
893 let storage_dependencies = self.determine_collection_dependencies(id, &description)?;
895
896 let dependency_read_holds = self
897 .storage_collections
898 .acquire_read_holds(storage_dependencies)
899 .expect("can acquire read holds");
900
901 let mut dependency_since = Antichain::from_elem(Timestamp::MIN);
902 for read_hold in dependency_read_holds.iter() {
903 dependency_since.join_assign(read_hold.since());
904 }
905
906 let data_source = description.data_source;
907
908 if !dependency_read_holds.is_empty()
917 && !is_in_txns(id, &metadata)
918 && !matches!(&data_source, DataSource::Sink { .. })
919 {
920 if dependency_since.is_empty() {
926 halt!(
927 "dependency since frontier is empty while dependent upper \
928 is not empty (dependent id={id}, write_frontier={:?}, dependency_read_holds={:?}), \
929 this indicates concurrent deletion of a collection",
930 write_frontier,
931 dependency_read_holds,
932 );
933 }
934
935 if description.primary.is_none() {
953 mz_ore::soft_assert_or_log!(
954 write_frontier.elements() == &[Timestamp::MIN]
955 || write_frontier.is_empty()
956 || PartialOrder::less_than(&dependency_since, write_frontier),
957 "dependency since has advanced past dependent ({id}) upper \n
958 dependent ({id}): upper {:?} \n
959 dependency since {:?} \n
960 dependency read holds: {:?}",
961 write_frontier,
962 dependency_since,
963 dependency_read_holds,
964 );
965 }
966 }
967
968 let mut extra_state = CollectionStateExtra::None;
970 let mut maybe_instance_id = None;
971 match &data_source {
972 DataSource::Introspection(typ) => {
973 debug!(
974 ?data_source, meta = ?metadata,
975 "registering {id} with persist monotonic worker",
976 );
977 self.register_introspection_collection(
983 id,
984 *typ,
985 write,
986 persist_client.clone(),
987 )?;
988 }
989 DataSource::Webhook => {
990 debug!(
991 ?data_source, meta = ?metadata,
992 "registering {id} with persist monotonic worker",
993 );
994 new_webhook_statistic_entries.insert(id);
997 self.collection_manager
1003 .register_append_only_collection(id, write, false, None);
1004 }
1005 DataSource::IngestionExport {
1006 ingestion_id,
1007 details,
1008 data_config,
1009 } => {
1010 debug!(
1011 ?data_source, meta = ?metadata,
1012 "not registering {id} with a controller persist worker",
1013 );
1014 let ingestion_state = self
1016 .collections
1017 .get_mut(ingestion_id)
1018 .expect("known to exist");
1019
1020 let instance_id = match &mut ingestion_state.data_source {
1021 DataSource::Ingestion(ingestion_desc) => {
1022 ingestion_desc.source_exports.insert(
1023 id,
1024 SourceExport {
1025 storage_metadata: (),
1026 details: details.clone(),
1027 data_config: data_config.clone(),
1028 },
1029 );
1030
1031 ingestion_desc.instance_id
1036 }
1037 _ => unreachable!(
1038 "SourceExport must only refer to primary sources that already exist"
1039 ),
1040 };
1041
1042 to_execute.remove(&id);
1044 to_execute.insert(*ingestion_id);
1045
1046 let ingestion_state = IngestionState {
1047 read_capabilities: MutableAntichain::from(dependency_since.clone()),
1048 dependency_read_holds,
1049 derived_since: dependency_since,
1050 write_frontier: Antichain::from_elem(Timestamp::MIN),
1051 hold_policy: ReadPolicy::step_back(),
1052 instance_id,
1053 hydrated_on: BTreeSet::new(),
1054 };
1055
1056 extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1057 maybe_instance_id = Some(instance_id);
1058 }
1059 DataSource::Table => {
1060 debug!(
1061 ?data_source, meta = ?metadata,
1062 "registering {id} with persist table worker",
1063 );
1064 table_registers.push((id, write));
1065 }
1066 DataSource::Progress | DataSource::Other => {
1067 debug!(
1068 ?data_source, meta = ?metadata,
1069 "not registering {id} with a controller persist worker",
1070 );
1071 }
1072 DataSource::Ingestion(ingestion_desc) => {
1073 debug!(
1074 ?data_source, meta = ?metadata,
1075 "not registering {id} with a controller persist worker",
1076 );
1077
1078 let mut dependency_since = Antichain::from_elem(Timestamp::MIN);
1079 for read_hold in dependency_read_holds.iter() {
1080 dependency_since.join_assign(read_hold.since());
1081 }
1082
1083 let ingestion_state = IngestionState {
1084 read_capabilities: MutableAntichain::from(dependency_since.clone()),
1085 dependency_read_holds,
1086 derived_since: dependency_since,
1087 write_frontier: Antichain::from_elem(Timestamp::MIN),
1088 hold_policy: ReadPolicy::step_back(),
1089 instance_id: ingestion_desc.instance_id,
1090 hydrated_on: BTreeSet::new(),
1091 };
1092
1093 extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1094 maybe_instance_id = Some(ingestion_desc.instance_id);
1095 }
1096 DataSource::Sink { desc } => {
1097 let mut dependency_since = Antichain::from_elem(Timestamp::MIN);
1098 for read_hold in dependency_read_holds.iter() {
1099 dependency_since.join_assign(read_hold.since());
1100 }
1101
1102 let [self_hold, read_hold] =
1103 dependency_read_holds.try_into().expect("two holds");
1104
1105 let state = ExportState::new(
1106 desc.instance_id,
1107 read_hold,
1108 self_hold,
1109 write_frontier.clone(),
1110 ReadPolicy::step_back(),
1111 );
1112 maybe_instance_id = Some(state.cluster_id);
1113 extra_state = CollectionStateExtra::Export(state);
1114 }
1115 }
1116
1117 let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(id, maybe_instance_id);
1118 let collection_state =
1119 CollectionState::new(data_source, metadata, extra_state, wallclock_lag_metrics);
1120
1121 self.collections.insert(id, collection_state);
1122 }
1123
1124 {
1125 let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1126
1127 for id in new_webhook_statistic_entries {
1130 source_statistics.webhook_statistics.entry(id).or_default();
1131 }
1132
1133 }
1137
1138 if !table_registers.is_empty() {
1140 let register_ts = register_ts
1141 .expect("caller should have provided a register_ts when creating a table");
1142
1143 if self.read_only {
1144 table_registers
1154 .retain(|(id, _write_handle)| migrated_storage_collections.contains(id));
1155
1156 self.persist_table_worker
1157 .register(register_ts, table_registers)
1158 .await
1159 .expect("table worker unexpectedly shut down");
1160 } else {
1161 self.persist_table_worker
1162 .register(register_ts, table_registers)
1163 .await
1164 .expect("table worker unexpectedly shut down");
1165 }
1166 }
1167
1168 self.append_shard_mappings(new_collections.into_iter(), Diff::ONE);
1169
1170 for id in to_execute {
1172 match &self.collection(id)?.data_source {
1173 DataSource::Ingestion(ingestion) => {
1174 if !self.read_only
1175 || (ENABLE_0DT_DEPLOYMENT_SOURCES.get(self.config.config_set())
1176 && ingestion.desc.connection.supports_read_only())
1177 {
1178 self.run_ingestion(id)?;
1179 }
1180 }
1181 DataSource::IngestionExport { .. } => unreachable!(
1182 "ingestion exports do not execute directly, but instead schedule their source to be re-executed"
1183 ),
1184 DataSource::Introspection(_)
1185 | DataSource::Webhook
1186 | DataSource::Table
1187 | DataSource::Progress
1188 | DataSource::Other => {}
1189 DataSource::Sink { .. } => {
1190 if !self.read_only {
1191 self.run_export(id)?;
1192 }
1193 }
1194 };
1195 }
1196
1197 Ok(())
1198 }
1199
1200 fn check_alter_ingestion_source_desc(
1201 &mut self,
1202 ingestion_id: GlobalId,
1203 source_desc: &SourceDesc,
1204 ) -> Result<(), StorageError> {
1205 let source_collection = self.collection(ingestion_id)?;
1206 let data_source = &source_collection.data_source;
1207 match &data_source {
1208 DataSource::Ingestion(cur_ingestion) => {
1209 cur_ingestion
1210 .desc
1211 .alter_compatible(ingestion_id, source_desc)?;
1212 }
1213 o => {
1214 tracing::info!(
1215 "{ingestion_id} inalterable because its data source is {:?} and not an ingestion",
1216 o
1217 );
1218 Err(AlterError { id: ingestion_id })?
1219 }
1220 }
1221
1222 Ok(())
1223 }
1224
1225 async fn alter_ingestion_source_desc(
1226 &mut self,
1227 ingestion_ids: BTreeMap<GlobalId, SourceDesc>,
1228 ) -> Result<(), StorageError> {
1229 let mut ingestions_to_run = BTreeSet::new();
1230
1231 for (id, new_desc) in ingestion_ids {
1232 let collection = self
1233 .collections
1234 .get_mut(&id)
1235 .ok_or_else(|| StorageError::IdentifierMissing(id))?;
1236
1237 match &mut collection.data_source {
1238 DataSource::Ingestion(ingestion) => {
1239 if ingestion.desc != new_desc {
1240 tracing::info!(
1241 from = ?ingestion.desc,
1242 to = ?new_desc,
1243 "alter_ingestion_source_desc, updating"
1244 );
1245 ingestion.desc = new_desc;
1246 ingestions_to_run.insert(id);
1247 }
1248 }
1249 o => {
1250 tracing::warn!("alter_ingestion_source_desc called on {:?}", o);
1251 Err(StorageError::IdentifierInvalid(id))?;
1252 }
1253 }
1254 }
1255
1256 for id in ingestions_to_run {
1257 self.run_ingestion(id)?;
1258 }
1259 Ok(())
1260 }
1261
1262 async fn alter_ingestion_connections(
1263 &mut self,
1264 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
1265 ) -> Result<(), StorageError> {
1266 let mut ingestions_to_run = BTreeSet::new();
1267
1268 for (id, conn) in source_connections {
1269 let collection = self
1270 .collections
1271 .get_mut(&id)
1272 .ok_or_else(|| StorageError::IdentifierMissing(id))?;
1273
1274 match &mut collection.data_source {
1275 DataSource::Ingestion(ingestion) => {
1276 if ingestion.desc.connection != conn {
1279 tracing::info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
1280 ingestion.desc.connection = conn;
1281 ingestions_to_run.insert(id);
1282 } else {
1283 tracing::warn!(
1284 "update_source_connection called on {id} but the \
1285 connection was the same"
1286 );
1287 }
1288 }
1289 o => {
1290 tracing::warn!("update_source_connection called on {:?}", o);
1291 Err(StorageError::IdentifierInvalid(id))?;
1292 }
1293 }
1294 }
1295
1296 for id in ingestions_to_run {
1297 self.run_ingestion(id)?;
1298 }
1299 Ok(())
1300 }
1301
1302 async fn alter_ingestion_export_data_configs(
1303 &mut self,
1304 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
1305 ) -> Result<(), StorageError> {
1306 let mut ingestions_to_run = BTreeSet::new();
1307
1308 for (source_export_id, new_data_config) in source_exports {
1309 let source_export_collection = self
1312 .collections
1313 .get_mut(&source_export_id)
1314 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1315 let ingestion_id = match &mut source_export_collection.data_source {
1316 DataSource::IngestionExport {
1317 ingestion_id,
1318 details: _,
1319 data_config,
1320 } => {
1321 *data_config = new_data_config.clone();
1322 *ingestion_id
1323 }
1324 o => {
1325 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1326 Err(StorageError::IdentifierInvalid(source_export_id))?
1327 }
1328 };
1329 let ingestion_collection = self
1332 .collections
1333 .get_mut(&ingestion_id)
1334 .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
1335
1336 match &mut ingestion_collection.data_source {
1337 DataSource::Ingestion(ingestion_desc) => {
1338 let source_export = ingestion_desc
1339 .source_exports
1340 .get_mut(&source_export_id)
1341 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1342
1343 if source_export.data_config != new_data_config {
1346 tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
1347 source_export.data_config = new_data_config;
1348
1349 ingestions_to_run.insert(ingestion_id);
1350 } else {
1351 tracing::warn!(
1352 "alter_ingestion_export_data_configs called on \
1353 export {source_export_id} of {ingestion_id} but \
1354 the data config was the same"
1355 );
1356 }
1357 }
1358 o => {
1359 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1360 Err(StorageError::IdentifierInvalid(ingestion_id))?
1361 }
1362 }
1363 }
1364
1365 for id in ingestions_to_run {
1366 self.run_ingestion(id)?;
1367 }
1368 Ok(())
1369 }
1370
1371 async fn alter_table_desc(
1372 &mut self,
1373 existing_collection: GlobalId,
1374 new_collection: GlobalId,
1375 new_desc: RelationDesc,
1376 expected_version: RelationVersion,
1377 register_ts: Timestamp,
1378 ) -> Result<(), StorageError> {
1379 let data_shard = {
1380 let Controller {
1381 collections,
1382 storage_collections,
1383 ..
1384 } = self;
1385
1386 let existing = collections
1387 .get(&existing_collection)
1388 .ok_or(StorageError::IdentifierMissing(existing_collection))?;
1389 if existing.data_source != DataSource::Table {
1390 return Err(StorageError::IdentifierInvalid(existing_collection));
1391 }
1392
1393 storage_collections
1395 .alter_table_desc(
1396 existing_collection,
1397 new_collection,
1398 new_desc.clone(),
1399 expected_version,
1400 )
1401 .await?;
1402
1403 existing.collection_metadata.data_shard.clone()
1404 };
1405
1406 let persist_client = self
1407 .persist
1408 .open(self.persist_location.clone())
1409 .await
1410 .expect("invalid persist location");
1411 let write_handle = self
1412 .open_data_handles(
1413 &existing_collection,
1414 data_shard,
1415 new_desc.clone(),
1416 &persist_client,
1417 )
1418 .await;
1419
1420 let collection_meta = CollectionMetadata {
1421 persist_location: self.persist_location.clone(),
1422 data_shard,
1423 relation_desc: new_desc.clone(),
1424 txns_shard: Some(self.txns_read.txns_id().clone()),
1426 };
1427 let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(new_collection, None);
1429 let collection_state = CollectionState::new(
1430 DataSource::Table,
1431 collection_meta,
1432 CollectionStateExtra::None,
1433 wallclock_lag_metrics,
1434 );
1435
1436 self.collections.insert(new_collection, collection_state);
1439
1440 self.persist_table_worker
1441 .register(register_ts, vec![(new_collection, write_handle)])
1442 .await
1443 .expect("table worker unexpectedly shut down");
1444
1445 self.append_shard_mappings([new_collection].into_iter(), Diff::ONE);
1446
1447 Ok(())
1448 }
1449
1450 fn export(&self, id: GlobalId) -> Result<&ExportState, StorageError> {
1451 self.collections
1452 .get(&id)
1453 .and_then(|c| match &c.extra_state {
1454 CollectionStateExtra::Export(state) => Some(state),
1455 _ => None,
1456 })
1457 .ok_or(StorageError::IdentifierMissing(id))
1458 }
1459
1460 fn export_mut(&mut self, id: GlobalId) -> Result<&mut ExportState, StorageError> {
1461 self.collections
1462 .get_mut(&id)
1463 .and_then(|c| match &mut c.extra_state {
1464 CollectionStateExtra::Export(state) => Some(state),
1465 _ => None,
1466 })
1467 .ok_or(StorageError::IdentifierMissing(id))
1468 }
1469
1470 async fn create_oneshot_ingestion(
1472 &mut self,
1473 ingestion_id: uuid::Uuid,
1474 collection_id: GlobalId,
1475 instance_id: StorageInstanceId,
1476 request: OneshotIngestionRequest,
1477 result_tx: OneshotResultCallback<ProtoBatch>,
1478 ) -> Result<(), StorageError> {
1479 let collection_meta = self
1480 .collections
1481 .get(&collection_id)
1482 .ok_or_else(|| StorageError::IdentifierMissing(collection_id))?
1483 .collection_metadata
1484 .clone();
1485 let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1486 StorageError::Generic(anyhow::anyhow!("missing cluster {instance_id}"))
1488 })?;
1489 let oneshot_cmd = RunOneshotIngestion {
1490 ingestion_id,
1491 collection_id,
1492 collection_meta,
1493 request,
1494 };
1495
1496 if !self.read_only {
1497 instance.send(StorageCommand::RunOneshotIngestion(Box::new(oneshot_cmd)));
1498 let pending = PendingOneshotIngestion {
1499 result_tx,
1500 cluster_id: instance_id,
1501 };
1502 let novel = self
1503 .pending_oneshot_ingestions
1504 .insert(ingestion_id, pending);
1505 assert_none!(novel);
1506 Ok(())
1507 } else {
1508 Err(StorageError::ReadOnly)
1509 }
1510 }
1511
1512 fn cancel_oneshot_ingestion(&mut self, ingestion_id: uuid::Uuid) -> Result<(), StorageError> {
1513 if self.read_only {
1514 return Err(StorageError::ReadOnly);
1515 }
1516
1517 let pending = self
1518 .pending_oneshot_ingestions
1519 .remove(&ingestion_id)
1520 .ok_or_else(|| {
1521 StorageError::Generic(anyhow::anyhow!("missing oneshot ingestion {ingestion_id}"))
1523 })?;
1524
1525 match self.instances.get_mut(&pending.cluster_id) {
1526 Some(instance) => {
1527 instance.send(StorageCommand::CancelOneshotIngestion(ingestion_id));
1528 }
1529 None => {
1530 mz_ore::soft_panic_or_log!(
1531 "canceling oneshot ingestion on non-existent cluster, ingestion {:?}, instance {}",
1532 ingestion_id,
1533 pending.cluster_id,
1534 );
1535 }
1536 }
1537 pending.cancel();
1539
1540 Ok(())
1541 }
1542
1543 async fn alter_export(
1544 &mut self,
1545 id: GlobalId,
1546 new_description: ExportDescription,
1547 ) -> Result<(), StorageError> {
1548 let from_id = new_description.sink.from;
1549
1550 let desired_read_holds = vec![from_id.clone(), id.clone()];
1553 let [input_hold, self_hold] = self
1554 .storage_collections
1555 .acquire_read_holds(desired_read_holds)
1556 .expect("missing dependency")
1557 .try_into()
1558 .expect("expected number of holds");
1559 let from_storage_metadata = self.storage_collections.collection_metadata(from_id)?;
1560 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1561
1562 let cur_export = self.export_mut(id)?;
1564 let input_readable = cur_export
1565 .write_frontier
1566 .iter()
1567 .all(|t| input_hold.since().less_than(t));
1568 if !input_readable {
1569 return Err(StorageError::ReadBeforeSince(from_id));
1570 }
1571
1572 let new_export = ExportState {
1573 read_capabilities: cur_export.read_capabilities.clone(),
1574 cluster_id: new_description.instance_id,
1575 derived_since: cur_export.derived_since.clone(),
1576 read_holds: [input_hold, self_hold],
1577 read_policy: cur_export.read_policy.clone(),
1578 write_frontier: cur_export.write_frontier.clone(),
1579 };
1580 *cur_export = new_export;
1581
1582 let with_snapshot = new_description.sink.with_snapshot
1589 && !PartialOrder::less_than(&new_description.sink.as_of, &cur_export.write_frontier);
1590
1591 let cmd = RunSinkCommand {
1592 id,
1593 description: StorageSinkDesc {
1594 from: from_id,
1595 from_desc: new_description.sink.from_desc,
1596 connection: new_description.sink.connection,
1597 envelope: new_description.sink.envelope,
1598 as_of: new_description.sink.as_of,
1599 version: new_description.sink.version,
1600 from_storage_metadata,
1601 with_snapshot,
1602 to_storage_metadata,
1603 commit_interval: new_description.sink.commit_interval,
1604 },
1605 };
1606
1607 let instance = self
1609 .instances
1610 .get_mut(&new_description.instance_id)
1611 .ok_or_else(|| StorageError::ExportInstanceMissing {
1612 storage_instance_id: new_description.instance_id,
1613 export_id: id,
1614 })?;
1615
1616 instance.send(StorageCommand::RunSink(Box::new(cmd)));
1617 Ok(())
1618 }
1619
1620 async fn alter_export_connections(
1622 &mut self,
1623 exports: BTreeMap<GlobalId, StorageSinkConnection>,
1624 ) -> Result<(), StorageError> {
1625 let mut updates_by_instance =
1626 BTreeMap::<StorageInstanceId, Vec<(RunSinkCommand, ExportDescription)>>::new();
1627
1628 for (id, connection) in exports {
1629 let (mut new_export_description, as_of): (ExportDescription, _) = {
1637 let export = &self.collections[&id];
1638 let DataSource::Sink { desc } = &export.data_source else {
1639 panic!("export exists")
1640 };
1641 let CollectionStateExtra::Export(state) = &export.extra_state else {
1642 panic!("export exists")
1643 };
1644 let export_description = desc.clone();
1645 let as_of = state.input_hold().since().clone();
1646
1647 (export_description, as_of)
1648 };
1649 let current_sink = new_export_description.sink.clone();
1650
1651 new_export_description.sink.connection = connection;
1652
1653 current_sink.alter_compatible(id, &new_export_description.sink)?;
1655
1656 let from_storage_metadata = self
1657 .storage_collections
1658 .collection_metadata(new_export_description.sink.from)?;
1659 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1660
1661 let cmd = RunSinkCommand {
1662 id,
1663 description: StorageSinkDesc {
1664 from: new_export_description.sink.from,
1665 from_desc: new_export_description.sink.from_desc.clone(),
1666 connection: new_export_description.sink.connection.clone(),
1667 envelope: new_export_description.sink.envelope,
1668 with_snapshot: new_export_description.sink.with_snapshot,
1669 version: new_export_description.sink.version,
1670 as_of: as_of.to_owned(),
1681 from_storage_metadata,
1682 to_storage_metadata,
1683 commit_interval: new_export_description.sink.commit_interval,
1684 },
1685 };
1686
1687 let update = updates_by_instance
1688 .entry(new_export_description.instance_id)
1689 .or_default();
1690 update.push((cmd, new_export_description));
1691 }
1692
1693 for (instance_id, updates) in updates_by_instance {
1694 let mut export_updates = BTreeMap::new();
1695 let mut cmds = Vec::with_capacity(updates.len());
1696
1697 for (cmd, export_state) in updates {
1698 export_updates.insert(cmd.id, export_state);
1699 cmds.push(cmd);
1700 }
1701
1702 let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1704 StorageError::ExportInstanceMissing {
1705 storage_instance_id: instance_id,
1706 export_id: *export_updates
1707 .keys()
1708 .next()
1709 .expect("set of exports not empty"),
1710 }
1711 })?;
1712
1713 for cmd in cmds {
1714 instance.send(StorageCommand::RunSink(Box::new(cmd)));
1715 }
1716
1717 for (id, new_export_description) in export_updates {
1719 let Some(state) = self.collections.get_mut(&id) else {
1720 panic!("export known to exist")
1721 };
1722 let DataSource::Sink { desc } = &mut state.data_source else {
1723 panic!("export known to exist")
1724 };
1725 *desc = new_export_description;
1726 }
1727 }
1728
1729 Ok(())
1730 }
1731
1732 fn drop_tables(
1747 &mut self,
1748 storage_metadata: &StorageMetadata,
1749 identifiers: Vec<GlobalId>,
1750 ts: Timestamp,
1751 ) -> Result<(), StorageError> {
1752 let (table_write_ids, data_source_ids): (Vec<_>, Vec<_>) = identifiers
1754 .into_iter()
1755 .partition(|id| match self.collections[id].data_source {
1756 DataSource::Table => true,
1757 DataSource::IngestionExport { .. } | DataSource::Webhook => false,
1758 _ => panic!("identifier is not a table: {}", id),
1759 });
1760
1761 if table_write_ids.len() > 0 {
1763 let drop_notif = self
1764 .persist_table_worker
1765 .drop_handles(table_write_ids.clone(), ts);
1766 let tx = self.pending_table_handle_drops_tx.clone();
1767 mz_ore::task::spawn(|| "table-cleanup".to_string(), async move {
1768 drop_notif.await;
1769 for identifier in table_write_ids {
1770 let _ = tx.send(identifier);
1771 }
1772 });
1773 }
1774
1775 if data_source_ids.len() > 0 {
1777 self.validate_collection_ids(data_source_ids.iter().cloned())?;
1778 self.drop_sources_unvalidated(storage_metadata, data_source_ids)?;
1779 }
1780
1781 Ok(())
1782 }
1783
1784 fn drop_sources(
1785 &mut self,
1786 storage_metadata: &StorageMetadata,
1787 identifiers: Vec<GlobalId>,
1788 ) -> Result<(), StorageError> {
1789 self.validate_collection_ids(identifiers.iter().cloned())?;
1790 self.drop_sources_unvalidated(storage_metadata, identifiers)
1791 }
1792
1793 fn drop_sources_unvalidated(
1794 &mut self,
1795 storage_metadata: &StorageMetadata,
1796 ids: Vec<GlobalId>,
1797 ) -> Result<(), StorageError> {
1798 let mut ingestions_to_execute = BTreeSet::new();
1801 let mut ingestions_to_drop = BTreeSet::new();
1802 let mut source_statistics_to_drop = Vec::new();
1803
1804 let mut collections_to_drop = Vec::new();
1808
1809 for id in ids.iter() {
1810 let collection_state = self.collections.get(id);
1811
1812 if let Some(collection_state) = collection_state {
1813 match collection_state.data_source {
1814 DataSource::Webhook => {
1815 let fut = self.collection_manager.unregister_collection(*id);
1818 mz_ore::task::spawn(|| format!("storage-webhook-cleanup-{id}"), fut);
1819
1820 collections_to_drop.push(*id);
1821 source_statistics_to_drop.push(*id);
1822 }
1823 DataSource::Ingestion(_) => {
1824 ingestions_to_drop.insert(*id);
1825 source_statistics_to_drop.push(*id);
1826 }
1827 DataSource::IngestionExport { ingestion_id, .. } => {
1828 ingestions_to_execute.insert(ingestion_id);
1835
1836 let ingestion_state = match self.collections.get_mut(&ingestion_id) {
1838 Some(ingestion_collection) => ingestion_collection,
1839 None => {
1841 tracing::error!(
1842 "primary source {ingestion_id} seemingly dropped before subsource {id}"
1843 );
1844 continue;
1845 }
1846 };
1847
1848 match &mut ingestion_state.data_source {
1849 DataSource::Ingestion(ingestion_desc) => {
1850 let removed = ingestion_desc.source_exports.remove(id);
1851 mz_ore::soft_assert_or_log!(
1852 removed.is_some(),
1853 "dropped subsource {id} already removed from source exports"
1854 );
1855 }
1856 _ => unreachable!(
1857 "SourceExport must only refer to primary sources that already exist"
1858 ),
1859 };
1860
1861 ingestions_to_drop.insert(*id);
1865 source_statistics_to_drop.push(*id);
1866 }
1867 DataSource::Progress | DataSource::Table | DataSource::Other => {
1868 collections_to_drop.push(*id);
1869 }
1870 DataSource::Introspection(_) | DataSource::Sink { .. } => {
1871 soft_panic_or_log!(
1874 "drop_sources called on a {:?} (id={id}))",
1875 collection_state.data_source,
1876 );
1877 }
1878 }
1879 }
1880 }
1881
1882 ingestions_to_execute.retain(|id| !ingestions_to_drop.contains(id));
1884 for ingestion_id in ingestions_to_execute {
1885 self.run_ingestion(ingestion_id)?;
1886 }
1887
1888 let ingestion_policies = ingestions_to_drop
1895 .iter()
1896 .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
1897 .collect();
1898
1899 tracing::debug!(
1900 ?ingestion_policies,
1901 "dropping sources by setting read hold policies"
1902 );
1903 self.set_hold_policies(ingestion_policies);
1904
1905 let shards_to_update: BTreeSet<_> = ingestions_to_drop
1907 .iter()
1908 .chain(collections_to_drop.iter())
1909 .cloned()
1910 .collect();
1911 self.append_shard_mappings(shards_to_update.into_iter(), Diff::MINUS_ONE);
1912
1913 let status_now = mz_ore::now::to_datetime((self.now)());
1914 let mut status_updates = vec![];
1915 for id in ingestions_to_drop.iter() {
1916 status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
1917 }
1918
1919 if !self.read_only {
1920 self.append_status_introspection_updates(
1921 IntrospectionType::SourceStatusHistory,
1922 status_updates,
1923 );
1924 }
1925
1926 {
1927 let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1928 for id in source_statistics_to_drop {
1929 source_statistics
1930 .source_statistics
1931 .retain(|(stats_id, _), _| stats_id != &id);
1932 source_statistics
1933 .webhook_statistics
1934 .retain(|stats_id, _| stats_id != &id);
1935 }
1936 }
1937
1938 for id in ingestions_to_drop.iter().chain(collections_to_drop.iter()) {
1940 tracing::info!(%id, "dropping collection state");
1941 let collection = self
1942 .collections
1943 .remove(id)
1944 .expect("list populated after checking that self.collections contains it");
1945
1946 let instance = match &collection.extra_state {
1947 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
1948 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
1949 CollectionStateExtra::None => None,
1950 }
1951 .and_then(|i| self.instances.get(&i));
1952
1953 if let Some(instance) = instance {
1957 let active_replicas = instance.get_active_replicas_for_object(id);
1958 if !active_replicas.is_empty() {
1959 match &collection.data_source {
1966 DataSource::Ingestion(ingestion_desc) => {
1967 if *id != ingestion_desc.remap_collection_id {
1968 self.dropped_objects.insert(
1969 ingestion_desc.remap_collection_id,
1970 active_replicas.clone(),
1971 );
1972 }
1973 }
1974 _ => {}
1975 }
1976
1977 self.dropped_objects.insert(*id, active_replicas);
1978 }
1979 }
1980 }
1981
1982 self.storage_collections
1984 .drop_collections_unvalidated(storage_metadata, ids);
1985
1986 Ok(())
1987 }
1988
1989 fn drop_sinks(
1991 &mut self,
1992 storage_metadata: &StorageMetadata,
1993 identifiers: Vec<GlobalId>,
1994 ) -> Result<(), StorageError> {
1995 self.validate_export_ids(identifiers.iter().cloned())?;
1996 self.drop_sinks_unvalidated(storage_metadata, identifiers);
1997 Ok(())
1998 }
1999
2000 fn drop_sinks_unvalidated(
2001 &mut self,
2002 storage_metadata: &StorageMetadata,
2003 mut sinks_to_drop: Vec<GlobalId>,
2004 ) {
2005 sinks_to_drop.retain(|id| self.export(*id).is_ok());
2007
2008 let drop_policy = sinks_to_drop
2015 .iter()
2016 .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
2017 .collect();
2018
2019 tracing::debug!(
2020 ?drop_policy,
2021 "dropping sources by setting read hold policies"
2022 );
2023 self.set_hold_policies(drop_policy);
2024
2025 let status_now = mz_ore::now::to_datetime((self.now)());
2032
2033 let mut status_updates = vec![];
2035 {
2036 let mut sink_statistics = self.sink_statistics.lock().expect("poisoned");
2037 for id in sinks_to_drop.iter() {
2038 status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
2039 sink_statistics.retain(|(stats_id, _), _| stats_id != id);
2040 }
2041 }
2042
2043 if !self.read_only {
2044 self.append_status_introspection_updates(
2045 IntrospectionType::SinkStatusHistory,
2046 status_updates,
2047 );
2048 }
2049
2050 for id in sinks_to_drop.iter() {
2052 tracing::info!(%id, "dropping export state");
2053 let collection = self
2054 .collections
2055 .remove(id)
2056 .expect("list populated after checking that self.collections contains it");
2057
2058 let instance = match &collection.extra_state {
2059 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
2060 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
2061 CollectionStateExtra::None => None,
2062 }
2063 .and_then(|i| self.instances.get(&i));
2064
2065 if let Some(instance) = instance {
2069 let active_replicas = instance.get_active_replicas_for_object(id);
2070 if !active_replicas.is_empty() {
2071 self.dropped_objects.insert(*id, active_replicas);
2072 }
2073 }
2074 }
2075
2076 self.storage_collections
2078 .drop_collections_unvalidated(storage_metadata, sinks_to_drop);
2079 }
2080
2081 #[instrument(level = "debug")]
2082 fn append_table(
2083 &mut self,
2084 write_ts: Timestamp,
2085 advance_to: Timestamp,
2086 commands: Vec<(GlobalId, Vec<TableData>)>,
2087 ) -> Result<tokio::sync::oneshot::Receiver<Result<(), StorageError>>, StorageError> {
2088 if self.read_only {
2089 if !commands
2092 .iter()
2093 .all(|(id, _)| id.is_system() && self.migrated_storage_collections.contains(id))
2094 {
2095 return Err(StorageError::ReadOnly);
2096 }
2097 }
2098
2099 for (id, updates) in commands.iter() {
2101 if !updates.is_empty() {
2102 if !write_ts.less_than(&advance_to) {
2103 return Err(StorageError::UpdateBeyondUpper(*id));
2104 }
2105 }
2106 }
2107
2108 Ok(self
2109 .persist_table_worker
2110 .append(write_ts, advance_to, commands))
2111 }
2112
2113 fn monotonic_appender(&self, id: GlobalId) -> Result<MonotonicAppender, StorageError> {
2114 self.collection_manager.monotonic_appender(id)
2115 }
2116
2117 fn webhook_statistics(&self, id: GlobalId) -> Result<Arc<WebhookStatistics>, StorageError> {
2118 let source_statistics = self.source_statistics.lock().expect("poisoned");
2120 source_statistics
2121 .webhook_statistics
2122 .get(&id)
2123 .cloned()
2124 .ok_or(StorageError::IdentifierMissing(id))
2125 }
2126
2127 async fn ready(&mut self) {
2128 if self.maintenance_scheduled {
2129 return;
2130 }
2131
2132 if !self.pending_table_handle_drops_rx.is_empty() {
2133 return;
2134 }
2135
2136 tokio::select! {
2137 Some(m) = self.instance_response_rx.recv() => {
2138 self.stashed_responses.push(m);
2139 while let Ok(m) = self.instance_response_rx.try_recv() {
2140 self.stashed_responses.push(m);
2141 }
2142 }
2143 _ = self.maintenance_ticker.tick() => {
2144 self.maintenance_scheduled = true;
2145 },
2146 };
2147 }
2148
2149 #[instrument(level = "debug")]
2150 fn process(
2151 &mut self,
2152 storage_metadata: &StorageMetadata,
2153 ) -> Result<Option<Response>, anyhow::Error> {
2154 if self.maintenance_scheduled {
2156 self.maintain();
2157 self.maintenance_scheduled = false;
2158 }
2159
2160 for instance in self.instances.values_mut() {
2161 instance.rehydrate_failed_replicas();
2162 }
2163
2164 let mut status_updates = vec![];
2165 let mut updated_frontiers = BTreeMap::new();
2166
2167 let stashed_responses = std::mem::take(&mut self.stashed_responses);
2169 for resp in stashed_responses {
2170 match resp {
2171 (_replica_id, StorageResponse::FrontierUpper(id, upper)) => {
2172 self.update_write_frontier(id, &upper);
2173 updated_frontiers.insert(id, upper);
2174 }
2175 (replica_id, StorageResponse::DroppedId(id)) => {
2176 let replica_id = replica_id.expect("DroppedId from unknown replica");
2177 if let Some(remaining_replicas) = self.dropped_objects.get_mut(&id) {
2178 remaining_replicas.remove(&replica_id);
2179 if remaining_replicas.is_empty() {
2180 self.dropped_objects.remove(&id);
2181 }
2182 } else {
2183 soft_panic_or_log!("unexpected DroppedId for {id}");
2184 }
2185 }
2186 (replica_id, StorageResponse::StatisticsUpdates(source_stats, sink_stats)) => {
2187 {
2189 let replica_id = if let Some(replica_id) = replica_id {
2196 replica_id
2197 } else {
2198 tracing::error!(
2199 ?source_stats,
2200 "missing replica_id for source statistics update"
2201 );
2202 continue;
2203 };
2204
2205 let mut shared_stats = self.source_statistics.lock().expect("poisoned");
2206
2207 for stat in source_stats {
2208 let collection_id = stat.id.clone();
2209
2210 if self.collection(collection_id).is_err() {
2211 continue;
2214 }
2215
2216 let entry = shared_stats
2217 .source_statistics
2218 .entry((stat.id, Some(replica_id)));
2219
2220 match entry {
2221 btree_map::Entry::Vacant(vacant_entry) => {
2222 let mut stats = ControllerSourceStatistics::new(
2223 collection_id,
2224 Some(replica_id),
2225 );
2226 stats.incorporate(stat);
2227 vacant_entry.insert(stats);
2228 }
2229 btree_map::Entry::Occupied(mut occupied_entry) => {
2230 occupied_entry.get_mut().incorporate(stat);
2231 }
2232 }
2233 }
2234 }
2235
2236 {
2237 let replica_id = if let Some(replica_id) = replica_id {
2248 replica_id
2249 } else {
2250 tracing::error!(
2251 ?sink_stats,
2252 "missing replica_id for sink statistics update"
2253 );
2254 continue;
2255 };
2256
2257 let mut shared_stats = self.sink_statistics.lock().expect("poisoned");
2258
2259 for stat in sink_stats {
2260 let collection_id = stat.id.clone();
2261
2262 if self.collection(collection_id).is_err() {
2263 continue;
2266 }
2267
2268 let entry = shared_stats.entry((stat.id, Some(replica_id)));
2269
2270 match entry {
2271 btree_map::Entry::Vacant(vacant_entry) => {
2272 let mut stats =
2273 ControllerSinkStatistics::new(collection_id, replica_id);
2274 stats.incorporate(stat);
2275 vacant_entry.insert(stats);
2276 }
2277 btree_map::Entry::Occupied(mut occupied_entry) => {
2278 occupied_entry.get_mut().incorporate(stat);
2279 }
2280 }
2281 }
2282 }
2283 }
2284 (replica_id, StorageResponse::StatusUpdate(mut status_update)) => {
2285 match status_update.status {
2301 Status::Running => {
2302 let collection = self.collections.get_mut(&status_update.id);
2303 match collection {
2304 Some(collection) => {
2305 match collection.extra_state {
2306 CollectionStateExtra::Ingestion(
2307 ref mut ingestion_state,
2308 ) => {
2309 if ingestion_state.hydrated_on.is_empty() {
2310 tracing::debug!(ingestion_id = %status_update.id, "ingestion is hydrated");
2311 }
2312 ingestion_state.hydrated_on.insert(replica_id.expect(
2313 "replica id should be present for status running",
2314 ));
2315 }
2316 CollectionStateExtra::Export(_) => {
2317 }
2319 CollectionStateExtra::None => {
2320 }
2322 }
2323 }
2324 None => (), }
2327 }
2328 Status::Paused => {
2329 let collection = self.collections.get_mut(&status_update.id);
2330 match collection {
2331 Some(collection) => {
2332 match collection.extra_state {
2333 CollectionStateExtra::Ingestion(
2334 ref mut ingestion_state,
2335 ) => {
2336 tracing::debug!(ingestion_id = %status_update.id, "ingestion is now paused");
2343 ingestion_state.hydrated_on.clear();
2344 }
2345 CollectionStateExtra::Export(_) => {
2346 }
2348 CollectionStateExtra::None => {
2349 }
2351 }
2352 }
2353 None => (), }
2356 }
2357 _ => (),
2358 }
2359
2360 if let Some(id) = replica_id {
2362 status_update.replica_id = Some(id);
2363 }
2364 status_updates.push(status_update);
2365 }
2366 (_replica_id, StorageResponse::StagedBatches(batches)) => {
2367 for (ingestion_id, batches) in batches {
2368 match self.pending_oneshot_ingestions.remove(&ingestion_id) {
2369 Some(pending) => {
2370 if let Some(instance) = self.instances.get_mut(&pending.cluster_id)
2373 {
2374 instance
2375 .send(StorageCommand::CancelOneshotIngestion(ingestion_id));
2376 }
2377 (pending.result_tx)(batches)
2379 }
2380 None => {
2381 }
2384 }
2385 }
2386 }
2387 }
2388 }
2389
2390 self.record_status_updates(status_updates);
2391
2392 let mut dropped_table_ids = Vec::new();
2394 while let Ok(dropped_id) = self.pending_table_handle_drops_rx.try_recv() {
2395 dropped_table_ids.push(dropped_id);
2396 }
2397 if !dropped_table_ids.is_empty() {
2398 self.drop_sources(storage_metadata, dropped_table_ids)?;
2399 }
2400
2401 if updated_frontiers.is_empty() {
2402 Ok(None)
2403 } else {
2404 Ok(Some(Response::FrontierUpdates(
2405 updated_frontiers.into_iter().collect(),
2406 )))
2407 }
2408 }
2409
2410 async fn inspect_persist_state(
2411 &self,
2412 id: GlobalId,
2413 ) -> Result<serde_json::Value, anyhow::Error> {
2414 let collection = &self.storage_collections.collection_metadata(id)?;
2415 let client = self
2416 .persist
2417 .open(collection.persist_location.clone())
2418 .await?;
2419 let shard_state = client
2420 .inspect_shard::<Timestamp>(&collection.data_shard)
2421 .await?;
2422 let json_state = serde_json::to_value(shard_state)?;
2423 Ok(json_state)
2424 }
2425
2426 fn append_introspection_updates(
2427 &mut self,
2428 type_: IntrospectionType,
2429 updates: Vec<(Row, Diff)>,
2430 ) {
2431 let id = self.introspection_ids[&type_];
2432 let updates = updates.into_iter().map(|update| update.into()).collect();
2433 self.collection_manager.blind_write(id, updates);
2434 }
2435
2436 fn append_status_introspection_updates(
2437 &mut self,
2438 type_: IntrospectionType,
2439 updates: Vec<StatusUpdate>,
2440 ) {
2441 let id = self.introspection_ids[&type_];
2442 let updates: Vec<_> = updates.into_iter().map(|update| update.into()).collect();
2443 if !updates.is_empty() {
2444 self.collection_manager.blind_write(id, updates);
2445 }
2446 }
2447
2448 fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp) {
2449 let id = self.introspection_ids[&type_];
2450 self.collection_manager.differential_write(id, op);
2451 }
2452
2453 fn append_only_introspection_tx(
2454 &self,
2455 type_: IntrospectionType,
2456 ) -> mpsc::UnboundedSender<(
2457 Vec<AppendOnlyUpdate>,
2458 oneshot::Sender<Result<(), StorageError>>,
2459 )> {
2460 let id = self.introspection_ids[&type_];
2461 self.collection_manager.append_only_write_sender(id)
2462 }
2463
2464 fn differential_introspection_tx(
2465 &self,
2466 type_: IntrospectionType,
2467 ) -> mpsc::UnboundedSender<(StorageWriteOp, oneshot::Sender<Result<(), StorageError>>)> {
2468 let id = self.introspection_ids[&type_];
2469 self.collection_manager.differential_write_sender(id)
2470 }
2471
2472 async fn real_time_recent_timestamp(
2473 &self,
2474 timestamp_objects: BTreeSet<GlobalId>,
2475 timeout: Duration,
2476 ) -> Result<BoxFuture<Result<Timestamp, StorageError>>, StorageError> {
2477 use mz_storage_types::sources::GenericSourceConnection;
2478
2479 let mut rtr_futures = BTreeMap::new();
2480
2481 for id in timestamp_objects.into_iter().filter(GlobalId::is_user) {
2483 let collection = match self.collection(id) {
2484 Ok(c) => c,
2485 Err(_) => continue,
2487 };
2488
2489 let (source_conn, remap_id) = match &collection.data_source {
2490 DataSource::Ingestion(IngestionDescription {
2491 desc: SourceDesc { connection, .. },
2492 remap_collection_id,
2493 ..
2494 }) => match connection {
2495 GenericSourceConnection::Kafka(_)
2496 | GenericSourceConnection::Postgres(_)
2497 | GenericSourceConnection::MySql(_)
2498 | GenericSourceConnection::SqlServer(_) => {
2499 (connection.clone(), *remap_collection_id)
2500 }
2501
2502 GenericSourceConnection::LoadGenerator(_) => continue,
2507 },
2508 _ => {
2510 continue;
2511 }
2512 };
2513
2514 let config = self.config().clone();
2516
2517 let read_handle = self.read_handle_for_snapshot(remap_id).await?;
2525
2526 let remap_read_hold = self
2529 .storage_collections
2530 .acquire_read_holds(vec![remap_id])
2531 .map_err(|_e| StorageError::ReadBeforeSince(remap_id))?
2532 .expect_element(|| "known to be exactly one");
2533
2534 let remap_as_of = remap_read_hold
2535 .since()
2536 .to_owned()
2537 .into_option()
2538 .ok_or(StorageError::ReadBeforeSince(remap_id))?;
2539
2540 rtr_futures.insert(
2541 id,
2542 tokio::time::timeout(timeout, async move {
2543 use mz_storage_types::sources::SourceConnection as _;
2544
2545 let as_of = Antichain::from_elem(remap_as_of);
2548 let remap_subscribe = read_handle
2549 .subscribe(as_of.clone())
2550 .await
2551 .map_err(|_| StorageError::ReadBeforeSince(remap_id))?;
2552
2553 tracing::debug!(?id, type_ = source_conn.name(), upstream = ?source_conn.external_reference(), "fetching real time recency");
2554
2555 let result = rtr::real_time_recency_ts(
2556 source_conn,
2557 id,
2558 config,
2559 as_of,
2560 remap_subscribe,
2561 )
2562 .await
2563 .map_err(|e| {
2564 tracing::debug!(?id, "real time recency error: {:?}", e);
2565 e
2566 });
2567
2568 drop(remap_read_hold);
2570
2571 result
2572 }),
2573 );
2574 }
2575
2576 Ok(Box::pin(async move {
2577 let (ids, futs): (Vec<_>, Vec<_>) = rtr_futures.into_iter().unzip();
2578 ids.into_iter()
2579 .zip_eq(futures::future::join_all(futs).await)
2580 .try_fold(Timestamp::MIN, |curr, (id, per_source_res)| {
2581 let new =
2582 per_source_res.map_err(|_e: Elapsed| StorageError::RtrTimeout(id))??;
2583 Ok::<_, StorageError>(std::cmp::max(curr, new))
2584 })
2585 }))
2586 }
2587
2588 fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2589 let Self {
2591 build_info: _,
2592 now: _,
2593 read_only,
2594 collections,
2595 dropped_objects,
2596 persist_table_worker: _,
2597 txns_read: _,
2598 txns_metrics: _,
2599 stashed_responses,
2600 pending_table_handle_drops_tx: _,
2601 pending_table_handle_drops_rx: _,
2602 pending_oneshot_ingestions,
2603 collection_manager: _,
2604 introspection_ids,
2605 introspection_tokens: _,
2606 source_statistics: _,
2607 sink_statistics: _,
2608 statistics_interval_sender: _,
2609 instances,
2610 initialized,
2611 config,
2612 persist_location,
2613 persist: _,
2614 metrics: _,
2615 recorded_frontiers,
2616 recorded_replica_frontiers,
2617 wallclock_lag: _,
2618 wallclock_lag_last_recorded,
2619 storage_collections: _,
2620 migrated_storage_collections,
2621 maintenance_ticker: _,
2622 maintenance_scheduled,
2623 instance_response_tx: _,
2624 instance_response_rx: _,
2625 persist_warm_task: _,
2626 } = self;
2627
2628 let collections: BTreeMap<_, _> = collections
2629 .iter()
2630 .map(|(id, c)| (id.to_string(), format!("{c:?}")))
2631 .collect();
2632 let dropped_objects: BTreeMap<_, _> = dropped_objects
2633 .iter()
2634 .map(|(id, rs)| (id.to_string(), format!("{rs:?}")))
2635 .collect();
2636 let stashed_responses: Vec<_> =
2637 stashed_responses.iter().map(|r| format!("{r:?}")).collect();
2638 let pending_oneshot_ingestions: BTreeMap<_, _> = pending_oneshot_ingestions
2639 .iter()
2640 .map(|(uuid, i)| (uuid.to_string(), format!("{i:?}")))
2641 .collect();
2642 let introspection_ids: BTreeMap<_, _> = introspection_ids
2643 .iter()
2644 .map(|(typ, id)| (format!("{typ:?}"), id.to_string()))
2645 .collect();
2646 let instances: BTreeMap<_, _> = instances
2647 .iter()
2648 .map(|(id, i)| (id.to_string(), format!("{i:?}")))
2649 .collect();
2650 let recorded_frontiers: BTreeMap<_, _> = recorded_frontiers
2651 .iter()
2652 .map(|(id, fs)| (id.to_string(), format!("{fs:?}")))
2653 .collect();
2654 let recorded_replica_frontiers: Vec<_> = recorded_replica_frontiers
2655 .iter()
2656 .map(|((gid, rid), f)| (gid.to_string(), rid.to_string(), format!("{f:?}")))
2657 .collect();
2658 let migrated_storage_collections: Vec<_> = migrated_storage_collections
2659 .iter()
2660 .map(|id| id.to_string())
2661 .collect();
2662
2663 Ok(serde_json::json!({
2664 "read_only": read_only,
2665 "collections": collections,
2666 "dropped_objects": dropped_objects,
2667 "stashed_responses": stashed_responses,
2668 "pending_oneshot_ingestions": pending_oneshot_ingestions,
2669 "introspection_ids": introspection_ids,
2670 "instances": instances,
2671 "initialized": initialized,
2672 "config": format!("{config:?}"),
2673 "persist_location": format!("{persist_location:?}"),
2674 "recorded_frontiers": recorded_frontiers,
2675 "recorded_replica_frontiers": recorded_replica_frontiers,
2676 "wallclock_lag_last_recorded": format!("{wallclock_lag_last_recorded:?}"),
2677 "migrated_storage_collections": migrated_storage_collections,
2678 "maintenance_scheduled": maintenance_scheduled,
2679 }))
2680 }
2681}
2682
2683pub fn prepare_initialization(txn: &mut dyn StorageTxn) -> Result<(), StorageError> {
2690 if txn.get_txn_wal_shard().is_none() {
2691 let txns_id = ShardId::new();
2692 txn.write_txn_wal_shard(txns_id)?;
2693 }
2694
2695 Ok(())
2696}
2697
2698impl Controller
2699where
2700 Self: StorageController,
2701{
2702 pub async fn new(
2710 build_info: &'static BuildInfo,
2711 persist_location: PersistLocation,
2712 persist_clients: Arc<PersistClientCache>,
2713 now: NowFn,
2714 wallclock_lag: WallclockLagFn<Timestamp>,
2715 txns_metrics: Arc<TxnMetrics>,
2716 read_only: bool,
2717 metrics_registry: &MetricsRegistry,
2718 controller_metrics: ControllerMetrics,
2719 connection_context: ConnectionContext,
2720 txn: &dyn StorageTxn,
2721 storage_collections: Arc<dyn StorageCollections + Send + Sync>,
2722 ) -> Self {
2723 let txns_client = persist_clients
2724 .open(persist_location.clone())
2725 .await
2726 .expect("location should be valid");
2727
2728 let persist_warm_task = warm_persist_state_in_background(
2729 txns_client.clone(),
2730 txn.get_collection_metadata().into_values(),
2731 );
2732 let persist_warm_task = Some(persist_warm_task.abort_on_drop());
2733
2734 let txns_id = txn
2738 .get_txn_wal_shard()
2739 .expect("must call prepare initialization before creating storage controller");
2740
2741 let persist_table_worker = if read_only {
2742 let txns_write = txns_client
2743 .open_writer(
2744 txns_id,
2745 Arc::new(TxnsCodecRow::desc()),
2746 Arc::new(UnitSchema),
2747 Diagnostics {
2748 shard_name: "txns".to_owned(),
2749 handle_purpose: "follow txns upper".to_owned(),
2750 },
2751 )
2752 .await
2753 .expect("txns schema shouldn't change");
2754 persist_handles::PersistTableWriteWorker::new_read_only_mode(txns_write)
2755 } else {
2756 let mut txns = TxnsHandle::open(
2757 Timestamp::MIN,
2758 txns_client.clone(),
2759 txns_client.dyncfgs().clone(),
2760 Arc::clone(&txns_metrics),
2761 txns_id,
2762 Opaque::encode(&PersistEpoch::default()),
2763 )
2764 .await;
2765 txns.upgrade_version().await;
2766 persist_handles::PersistTableWriteWorker::new_txns(txns)
2767 };
2768 let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
2769
2770 let collection_manager = collection_mgmt::CollectionManager::new(read_only, now.clone());
2771
2772 let introspection_ids = BTreeMap::new();
2773 let introspection_tokens = Arc::new(Mutex::new(BTreeMap::new()));
2774
2775 let (statistics_interval_sender, _) =
2776 channel(mz_storage_types::parameters::STATISTICS_INTERVAL_DEFAULT);
2777
2778 let (pending_table_handle_drops_tx, pending_table_handle_drops_rx) =
2779 tokio::sync::mpsc::unbounded_channel();
2780
2781 let mut maintenance_ticker = tokio::time::interval(Duration::from_secs(1));
2782 maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2783
2784 let (instance_response_tx, instance_response_rx) = mpsc::unbounded_channel();
2785
2786 let metrics = StorageControllerMetrics::new(metrics_registry, controller_metrics);
2787
2788 let now_dt = mz_ore::now::to_datetime(now());
2789
2790 Self {
2791 build_info,
2792 collections: BTreeMap::default(),
2793 dropped_objects: Default::default(),
2794 persist_table_worker,
2795 txns_read,
2796 txns_metrics,
2797 stashed_responses: vec![],
2798 pending_table_handle_drops_tx,
2799 pending_table_handle_drops_rx,
2800 pending_oneshot_ingestions: BTreeMap::default(),
2801 collection_manager,
2802 introspection_ids,
2803 introspection_tokens,
2804 now,
2805 read_only,
2806 source_statistics: Arc::new(Mutex::new(statistics::SourceStatistics {
2807 source_statistics: BTreeMap::new(),
2808 webhook_statistics: BTreeMap::new(),
2809 })),
2810 sink_statistics: Arc::new(Mutex::new(BTreeMap::new())),
2811 statistics_interval_sender,
2812 instances: BTreeMap::new(),
2813 initialized: false,
2814 config: StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs()),
2815 persist_location,
2816 persist: persist_clients,
2817 metrics,
2818 recorded_frontiers: BTreeMap::new(),
2819 recorded_replica_frontiers: BTreeMap::new(),
2820 wallclock_lag,
2821 wallclock_lag_last_recorded: now_dt,
2822 storage_collections,
2823 migrated_storage_collections: BTreeSet::new(),
2824 maintenance_ticker,
2825 maintenance_scheduled: false,
2826 instance_response_rx,
2827 instance_response_tx,
2828 persist_warm_task,
2829 }
2830 }
2831
2832 #[instrument(level = "debug")]
2840 fn set_hold_policies(&mut self, policies: Vec<(GlobalId, ReadPolicy)>) {
2841 let mut read_capability_changes = BTreeMap::default();
2842
2843 for (id, policy) in policies.into_iter() {
2844 if let Some(collection) = self.collections.get_mut(&id) {
2845 let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state
2846 {
2847 CollectionStateExtra::Ingestion(ingestion) => (
2848 ingestion.write_frontier.borrow(),
2849 &mut ingestion.derived_since,
2850 &mut ingestion.hold_policy,
2851 ),
2852 CollectionStateExtra::None => {
2853 unreachable!("set_hold_policies is only called for ingestions");
2854 }
2855 CollectionStateExtra::Export(export) => (
2856 export.write_frontier.borrow(),
2857 &mut export.derived_since,
2858 &mut export.read_policy,
2859 ),
2860 };
2861
2862 let new_derived_since = policy.frontier(write_frontier);
2863 let mut update = swap_updates(derived_since, new_derived_since);
2864 if !update.is_empty() {
2865 read_capability_changes.insert(id, update);
2866 }
2867
2868 *hold_policy = policy;
2869 }
2870 }
2871
2872 if !read_capability_changes.is_empty() {
2873 self.update_hold_capabilities(&mut read_capability_changes);
2874 }
2875 }
2876
2877 #[instrument(level = "debug", fields(updates))]
2878 fn update_write_frontier(&mut self, id: GlobalId, new_upper: &Antichain<Timestamp>) {
2879 let mut read_capability_changes = BTreeMap::default();
2880
2881 if let Some(collection) = self.collections.get_mut(&id) {
2882 let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state {
2883 CollectionStateExtra::Ingestion(ingestion) => (
2884 &mut ingestion.write_frontier,
2885 &mut ingestion.derived_since,
2886 &ingestion.hold_policy,
2887 ),
2888 CollectionStateExtra::None => {
2889 if matches!(collection.data_source, DataSource::Progress) {
2890 } else {
2892 tracing::error!(
2893 ?collection,
2894 ?new_upper,
2895 "updated write frontier for collection which is not an ingestion"
2896 );
2897 }
2898 return;
2899 }
2900 CollectionStateExtra::Export(export) => (
2901 &mut export.write_frontier,
2902 &mut export.derived_since,
2903 &export.read_policy,
2904 ),
2905 };
2906
2907 if PartialOrder::less_than(write_frontier, new_upper) {
2908 write_frontier.clone_from(new_upper);
2909 }
2910
2911 let new_derived_since = hold_policy.frontier(write_frontier.borrow());
2912 let mut update = swap_updates(derived_since, new_derived_since);
2913 if !update.is_empty() {
2914 read_capability_changes.insert(id, update);
2915 }
2916 } else if self.dropped_objects.contains_key(&id) {
2917 } else {
2920 soft_panic_or_log!("spurious upper update for {id}: {new_upper:?}");
2921 }
2922
2923 if !read_capability_changes.is_empty() {
2924 self.update_hold_capabilities(&mut read_capability_changes);
2925 }
2926 }
2927
2928 #[instrument(level = "debug", fields(updates))]
2932 fn update_hold_capabilities(
2933 &mut self,
2934 updates: &mut BTreeMap<GlobalId, ChangeBatch<Timestamp>>,
2935 ) {
2936 let mut collections_net = BTreeMap::new();
2938
2939 while let Some(key) = updates.keys().rev().next().cloned() {
2944 let mut update = updates.remove(&key).unwrap();
2945
2946 if key.is_user() {
2947 debug!(id = %key, ?update, "update_hold_capability");
2948 }
2949
2950 if let Some(collection) = self.collections.get_mut(&key) {
2951 match &mut collection.extra_state {
2952 CollectionStateExtra::Ingestion(ingestion) => {
2953 let changes = ingestion.read_capabilities.update_iter(update.drain());
2954 update.extend(changes);
2955
2956 let (changes, frontier, _cluster_id) =
2957 collections_net.entry(key).or_insert_with(|| {
2958 (
2959 <ChangeBatch<_>>::new(),
2960 Antichain::new(),
2961 ingestion.instance_id,
2962 )
2963 });
2964
2965 changes.extend(update.drain());
2966 *frontier = ingestion.read_capabilities.frontier().to_owned();
2967 }
2968 CollectionStateExtra::None => {
2969 soft_panic_or_log!(
2971 "trying to update holds for collection {collection:?} which is not \
2972 an ingestion: {update:?}"
2973 );
2974 continue;
2975 }
2976 CollectionStateExtra::Export(export) => {
2977 let changes = export.read_capabilities.update_iter(update.drain());
2978 update.extend(changes);
2979
2980 let (changes, frontier, _cluster_id) =
2981 collections_net.entry(key).or_insert_with(|| {
2982 (<ChangeBatch<_>>::new(), Antichain::new(), export.cluster_id)
2983 });
2984
2985 changes.extend(update.drain());
2986 *frontier = export.read_capabilities.frontier().to_owned();
2987 }
2988 }
2989 } else {
2990 tracing::warn!(id = ?key, ?update, "update_hold_capabilities for unknown object");
2992 }
2993 }
2994
2995 for (key, (mut changes, frontier, cluster_id)) in collections_net {
2998 if !changes.is_empty() {
2999 if key.is_user() {
3000 debug!(id = %key, ?frontier, "downgrading ingestion read holds!");
3001 }
3002
3003 let collection = self
3004 .collections
3005 .get_mut(&key)
3006 .expect("missing collection state");
3007
3008 let read_holds = match &mut collection.extra_state {
3009 CollectionStateExtra::Ingestion(ingestion) => {
3010 ingestion.dependency_read_holds.as_mut_slice()
3011 }
3012 CollectionStateExtra::Export(export) => export.read_holds.as_mut_slice(),
3013 CollectionStateExtra::None => {
3014 soft_panic_or_log!(
3015 "trying to downgrade read holds for collection which is not an \
3016 ingestion: {collection:?}"
3017 );
3018 continue;
3019 }
3020 };
3021
3022 for read_hold in read_holds.iter_mut() {
3023 read_hold
3024 .try_downgrade(frontier.clone())
3025 .expect("we only advance the frontier");
3026 }
3027
3028 if let Some(instance) = self.instances.get_mut(&cluster_id) {
3030 instance.send(StorageCommand::AllowCompaction(key, frontier.clone()));
3031 } else {
3032 soft_panic_or_log!(
3033 "missing instance client for cluster {cluster_id} while we still have outstanding AllowCompaction command {frontier:?} for {key}"
3034 );
3035 }
3036 }
3037 }
3038 }
3039
3040 fn validate_collection_ids(
3042 &self,
3043 ids: impl Iterator<Item = GlobalId>,
3044 ) -> Result<(), StorageError> {
3045 for id in ids {
3046 self.storage_collections.check_exists(id)?;
3047 }
3048 Ok(())
3049 }
3050
3051 fn validate_export_ids(&self, ids: impl Iterator<Item = GlobalId>) -> Result<(), StorageError> {
3053 for id in ids {
3054 self.export(id)?;
3055 }
3056 Ok(())
3057 }
3058
3059 async fn open_data_handles(
3067 &self,
3068 id: &GlobalId,
3069 shard: ShardId,
3070 relation_desc: RelationDesc,
3071 persist_client: &PersistClient,
3072 ) -> WriteHandle<SourceData, (), Timestamp, StorageDiff> {
3073 let diagnostics = Diagnostics {
3074 shard_name: id.to_string(),
3075 handle_purpose: format!("controller data for {}", id),
3076 };
3077
3078 let mut write = persist_client
3079 .open_writer(
3080 shard,
3081 Arc::new(relation_desc),
3082 Arc::new(UnitSchema),
3083 diagnostics.clone(),
3084 )
3085 .await
3086 .expect("invalid persist usage");
3087
3088 write.fetch_recent_upper().await;
3097
3098 write
3099 }
3100
3101 fn register_introspection_collection(
3106 &mut self,
3107 id: GlobalId,
3108 introspection_type: IntrospectionType,
3109 write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
3110 persist_client: PersistClient,
3111 ) -> Result<(), StorageError> {
3112 tracing::info!(%id, ?introspection_type, "registering introspection collection");
3113
3114 let force_writable = self.read_only && self.migrated_storage_collections.contains(&id);
3118 if force_writable {
3119 assert!(id.is_system(), "unexpected non-system global id: {id:?}");
3120 info!("writing to migrated storage collection {id} in read-only mode");
3121 }
3122
3123 let prev = self.introspection_ids.insert(introspection_type, id);
3124 assert!(
3125 prev.is_none(),
3126 "cannot have multiple IDs for introspection type"
3127 );
3128
3129 let metadata = self.storage_collections.collection_metadata(id)?.clone();
3130
3131 let read_handle_fn = move || {
3132 let persist_client = persist_client.clone();
3133 let metadata = metadata.clone();
3134
3135 let fut = async move {
3136 let read_handle = persist_client
3137 .open_leased_reader::<SourceData, (), Timestamp, StorageDiff>(
3138 metadata.data_shard,
3139 Arc::new(metadata.relation_desc.clone()),
3140 Arc::new(UnitSchema),
3141 Diagnostics {
3142 shard_name: id.to_string(),
3143 handle_purpose: format!("snapshot {}", id),
3144 },
3145 USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3146 )
3147 .await
3148 .expect("invalid persist usage");
3149 read_handle
3150 };
3151
3152 fut.boxed()
3153 };
3154
3155 let recent_upper = write_handle.shared_upper();
3156
3157 match CollectionManagerKind::from(&introspection_type) {
3158 CollectionManagerKind::Differential => {
3163 let statistics_retention_duration =
3164 dyncfgs::STATISTICS_RETENTION_DURATION.get(self.config().config_set());
3165
3166 let introspection_config = DifferentialIntrospectionConfig {
3168 recent_upper,
3169 introspection_type,
3170 storage_collections: Arc::clone(&self.storage_collections),
3171 collection_manager: self.collection_manager.clone(),
3172 source_statistics: Arc::clone(&self.source_statistics),
3173 sink_statistics: Arc::clone(&self.sink_statistics),
3174 statistics_interval: self.config.parameters.statistics_interval.clone(),
3175 statistics_interval_receiver: self.statistics_interval_sender.subscribe(),
3176 statistics_retention_duration,
3177 metrics: self.metrics.clone(),
3178 introspection_tokens: Arc::clone(&self.introspection_tokens),
3179 };
3180 self.collection_manager.register_differential_collection(
3181 id,
3182 write_handle,
3183 read_handle_fn,
3184 force_writable,
3185 introspection_config,
3186 );
3187 }
3188 CollectionManagerKind::AppendOnly => {
3196 let introspection_config = AppendOnlyIntrospectionConfig {
3197 introspection_type,
3198 config_set: Arc::clone(self.config.config_set()),
3199 parameters: self.config.parameters.clone(),
3200 storage_collections: Arc::clone(&self.storage_collections),
3201 };
3202 self.collection_manager.register_append_only_collection(
3203 id,
3204 write_handle,
3205 force_writable,
3206 Some(introspection_config),
3207 );
3208 }
3209 }
3210
3211 Ok(())
3212 }
3213
3214 fn reconcile_dangling_statistics(&self) {
3217 self.source_statistics
3218 .lock()
3219 .expect("poisoned")
3220 .source_statistics
3221 .retain(|(k, _replica_id), _| self.storage_collections.check_exists(*k).is_ok());
3223 self.sink_statistics
3224 .lock()
3225 .expect("poisoned")
3226 .retain(|(k, _replica_id), _| self.export(*k).is_ok());
3227 }
3228
3229 #[instrument(level = "debug")]
3239 fn append_shard_mappings<I>(&self, global_ids: I, diff: Diff)
3240 where
3241 I: Iterator<Item = GlobalId>,
3242 {
3243 mz_ore::soft_assert_or_log!(
3244 diff == Diff::MINUS_ONE || diff == Diff::ONE,
3245 "use 1 for insert or -1 for delete"
3246 );
3247
3248 let id = *self
3249 .introspection_ids
3250 .get(&IntrospectionType::ShardMapping)
3251 .expect("should be registered before this call");
3252
3253 let mut updates = vec![];
3254 let mut row_buf = Row::default();
3256
3257 for global_id in global_ids {
3258 let shard_id = if let Some(collection) = self.collections.get(&global_id) {
3259 collection.collection_metadata.data_shard.clone()
3260 } else {
3261 panic!("unknown global id: {}", global_id);
3262 };
3263
3264 let mut packer = row_buf.packer();
3265 packer.push(Datum::from(global_id.to_string().as_str()));
3266 packer.push(Datum::from(shard_id.to_string().as_str()));
3267 updates.push((row_buf.clone(), diff));
3268 }
3269
3270 self.collection_manager.differential_append(id, updates);
3271 }
3272
3273 fn determine_collection_dependencies(
3275 &self,
3276 self_id: GlobalId,
3277 collection_desc: &CollectionDescription,
3278 ) -> Result<Vec<GlobalId>, StorageError> {
3279 let mut dependencies = Vec::new();
3280
3281 if let Some(id) = collection_desc.primary {
3282 dependencies.push(id);
3283 }
3284
3285 match &collection_desc.data_source {
3286 DataSource::Introspection(_)
3287 | DataSource::Webhook
3288 | DataSource::Table
3289 | DataSource::Progress
3290 | DataSource::Other => (),
3291 DataSource::IngestionExport { ingestion_id, .. } => {
3292 let source_collection = self.collection(*ingestion_id)?;
3295 let ingestion_remap_collection_id = match &source_collection.data_source {
3296 DataSource::Ingestion(ingestion) => ingestion.remap_collection_id,
3297 _ => unreachable!(
3298 "SourceExport must only refer to primary sources that already exist"
3299 ),
3300 };
3301
3302 dependencies.extend([self_id, ingestion_remap_collection_id]);
3308 }
3309 DataSource::Ingestion(ingestion) => {
3311 dependencies.push(self_id);
3316 if self_id != ingestion.remap_collection_id {
3317 dependencies.push(ingestion.remap_collection_id);
3318 }
3319 }
3320 DataSource::Sink { desc } => {
3321 dependencies.extend([self_id, desc.sink.from]);
3323 }
3324 };
3325
3326 Ok(dependencies)
3327 }
3328
3329 async fn read_handle_for_snapshot(
3330 &self,
3331 id: GlobalId,
3332 ) -> Result<ReadHandle<SourceData, (), Timestamp, StorageDiff>, StorageError> {
3333 let metadata = self.storage_collections.collection_metadata(id)?;
3334 read_handle_for_snapshot(&self.persist, id, &metadata).await
3335 }
3336
3337 fn record_status_updates(&mut self, updates: Vec<StatusUpdate>) {
3340 if self.read_only {
3341 return;
3342 }
3343
3344 let mut sink_status_updates = vec![];
3345 let mut source_status_updates = vec![];
3346
3347 for update in updates {
3348 let id = update.id;
3349 if self.export(id).is_ok() {
3350 sink_status_updates.push(update);
3351 } else if self.storage_collections.check_exists(id).is_ok() {
3352 source_status_updates.push(update);
3353 }
3354 }
3355
3356 self.append_status_introspection_updates(
3357 IntrospectionType::SourceStatusHistory,
3358 source_status_updates,
3359 );
3360 self.append_status_introspection_updates(
3361 IntrospectionType::SinkStatusHistory,
3362 sink_status_updates,
3363 );
3364 }
3365
3366 fn collection(&self, id: GlobalId) -> Result<&CollectionState, StorageError> {
3367 self.collections
3368 .get(&id)
3369 .ok_or(StorageError::IdentifierMissing(id))
3370 }
3371
3372 fn run_ingestion(&mut self, id: GlobalId) -> Result<(), StorageError> {
3375 tracing::info!(%id, "starting ingestion");
3376
3377 let collection = self.collection(id)?;
3378 let ingestion_description = match &collection.data_source {
3379 DataSource::Ingestion(i) => i.clone(),
3380 _ => {
3381 tracing::warn!("run_ingestion called on non-ingestion ID {}", id);
3382 Err(StorageError::IdentifierInvalid(id))?
3383 }
3384 };
3385
3386 let mut source_exports = BTreeMap::new();
3388 for (export_id, export) in ingestion_description.source_exports.clone() {
3389 let export_storage_metadata = self.collection(export_id)?.collection_metadata.clone();
3390 source_exports.insert(
3391 export_id,
3392 SourceExport {
3393 storage_metadata: export_storage_metadata,
3394 details: export.details,
3395 data_config: export.data_config,
3396 },
3397 );
3398 }
3399
3400 let remap_collection = self.collection(ingestion_description.remap_collection_id)?;
3401
3402 let description = IngestionDescription::<CollectionMetadata> {
3403 source_exports,
3404 remap_metadata: remap_collection.collection_metadata.clone(),
3405 desc: ingestion_description.desc.clone(),
3407 instance_id: ingestion_description.instance_id,
3408 remap_collection_id: ingestion_description.remap_collection_id,
3409 };
3410
3411 let storage_instance_id = description.instance_id;
3412 let instance = self
3414 .instances
3415 .get_mut(&storage_instance_id)
3416 .ok_or_else(|| StorageError::IngestionInstanceMissing {
3417 storage_instance_id,
3418 ingestion_id: id,
3419 })?;
3420
3421 let augmented_ingestion = Box::new(RunIngestionCommand { id, description });
3422 instance.send(StorageCommand::RunIngestion(augmented_ingestion));
3423
3424 Ok(())
3425 }
3426
3427 fn run_export(&mut self, id: GlobalId) -> Result<(), StorageError> {
3430 let DataSource::Sink { desc: description } = &self.collections[&id].data_source else {
3431 return Err(StorageError::IdentifierMissing(id));
3432 };
3433
3434 let from_storage_metadata = self
3435 .storage_collections
3436 .collection_metadata(description.sink.from)?;
3437 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
3438
3439 let export_state = self.storage_collections.collection_frontiers(id)?;
3443 let mut as_of = description.sink.as_of.clone();
3444 as_of.join_assign(&export_state.implied_capability);
3445 let with_snapshot = description.sink.with_snapshot
3446 && !PartialOrder::less_than(&as_of, &export_state.write_frontier);
3447
3448 info!(
3449 sink_id = %id,
3450 from_id = %description.sink.from,
3451 write_frontier = ?export_state.write_frontier,
3452 ?as_of,
3453 ?with_snapshot,
3454 "run_export"
3455 );
3456
3457 let cmd = RunSinkCommand {
3458 id,
3459 description: StorageSinkDesc {
3460 from: description.sink.from,
3461 from_desc: description.sink.from_desc.clone(),
3462 connection: description.sink.connection.clone(),
3463 envelope: description.sink.envelope,
3464 as_of,
3465 version: description.sink.version,
3466 from_storage_metadata,
3467 with_snapshot,
3468 to_storage_metadata,
3469 commit_interval: description.sink.commit_interval,
3470 },
3471 };
3472
3473 let storage_instance_id = description.instance_id.clone();
3474
3475 let instance = self
3476 .instances
3477 .get_mut(&storage_instance_id)
3478 .ok_or_else(|| StorageError::ExportInstanceMissing {
3479 storage_instance_id,
3480 export_id: id,
3481 })?;
3482
3483 instance.send(StorageCommand::RunSink(Box::new(cmd)));
3484
3485 Ok(())
3486 }
3487
3488 fn update_frontier_introspection(&mut self) {
3493 let mut global_frontiers = BTreeMap::new();
3494 let mut replica_frontiers = BTreeMap::new();
3495
3496 for collection_frontiers in self.storage_collections.active_collection_frontiers() {
3497 let id = collection_frontiers.id;
3498 let since = collection_frontiers.read_capabilities;
3499 let upper = collection_frontiers.write_frontier;
3500
3501 let instance = self
3502 .collections
3503 .get(&id)
3504 .and_then(|collection_state| match &collection_state.extra_state {
3505 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
3506 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
3507 CollectionStateExtra::None => None,
3508 })
3509 .and_then(|i| self.instances.get(&i));
3510
3511 if let Some(instance) = instance {
3512 for replica_id in instance.replica_ids() {
3513 replica_frontiers.insert((id, replica_id), upper.clone());
3514 }
3515 }
3516
3517 global_frontiers.insert(id, (since, upper));
3518 }
3519
3520 let mut global_updates = Vec::new();
3521 let mut replica_updates = Vec::new();
3522
3523 let mut push_global_update =
3524 |id: GlobalId,
3525 (since, upper): (Antichain<Timestamp>, Antichain<Timestamp>),
3526 diff: Diff| {
3527 let read_frontier = since.into_option().map_or(Datum::Null, |t| t.into());
3528 let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3529 let row = Row::pack_slice(&[
3530 Datum::String(&id.to_string()),
3531 read_frontier,
3532 write_frontier,
3533 ]);
3534 global_updates.push((row, diff));
3535 };
3536
3537 let mut push_replica_update =
3538 |(id, replica_id): (GlobalId, ReplicaId), upper: Antichain<Timestamp>, diff: Diff| {
3539 let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3540 let row = Row::pack_slice(&[
3541 Datum::String(&id.to_string()),
3542 Datum::String(&replica_id.to_string()),
3543 write_frontier,
3544 ]);
3545 replica_updates.push((row, diff));
3546 };
3547
3548 let mut old_global_frontiers =
3549 std::mem::replace(&mut self.recorded_frontiers, global_frontiers);
3550 for (&id, new) in &self.recorded_frontiers {
3551 match old_global_frontiers.remove(&id) {
3552 Some(old) if &old != new => {
3553 push_global_update(id, new.clone(), Diff::ONE);
3554 push_global_update(id, old, Diff::MINUS_ONE);
3555 }
3556 Some(_) => (),
3557 None => push_global_update(id, new.clone(), Diff::ONE),
3558 }
3559 }
3560 for (id, old) in old_global_frontiers {
3561 push_global_update(id, old, Diff::MINUS_ONE);
3562 }
3563
3564 let mut old_replica_frontiers =
3565 std::mem::replace(&mut self.recorded_replica_frontiers, replica_frontiers);
3566 for (&key, new) in &self.recorded_replica_frontiers {
3567 match old_replica_frontiers.remove(&key) {
3568 Some(old) if &old != new => {
3569 push_replica_update(key, new.clone(), Diff::ONE);
3570 push_replica_update(key, old, Diff::MINUS_ONE);
3571 }
3572 Some(_) => (),
3573 None => push_replica_update(key, new.clone(), Diff::ONE),
3574 }
3575 }
3576 for (key, old) in old_replica_frontiers {
3577 push_replica_update(key, old, Diff::MINUS_ONE);
3578 }
3579
3580 let id = self.introspection_ids[&IntrospectionType::Frontiers];
3581 self.collection_manager
3582 .differential_append(id, global_updates);
3583
3584 let id = self.introspection_ids[&IntrospectionType::ReplicaFrontiers];
3585 self.collection_manager
3586 .differential_append(id, replica_updates);
3587 }
3588
3589 fn refresh_wallclock_lag(&mut self) {
3608 let now_ms = (self.now)();
3609 let histogram_period =
3610 WallclockLagHistogramPeriod::from_epoch_millis(now_ms, self.config.config_set());
3611
3612 let frontier_lag = |frontier: &Antichain<Timestamp>| match frontier.as_option() {
3613 Some(ts) => (self.wallclock_lag)(*ts),
3614 None => Duration::ZERO,
3615 };
3616
3617 for frontiers in self.storage_collections.active_collection_frontiers() {
3618 let id = frontiers.id;
3619 let Some(collection) = self.collections.get_mut(&id) else {
3620 continue;
3621 };
3622
3623 let collection_unreadable =
3624 PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
3625 let lag = if collection_unreadable {
3626 WallclockLag::Undefined
3627 } else {
3628 let lag = frontier_lag(&frontiers.write_frontier);
3629 WallclockLag::Seconds(lag.as_secs())
3630 };
3631
3632 collection.wallclock_lag_max = collection.wallclock_lag_max.max(lag);
3633
3634 let secs = lag.unwrap_seconds_or(u64::MAX);
3637 collection.wallclock_lag_metrics.observe(secs);
3638
3639 if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
3640 let bucket = lag.map_seconds(|secs| secs.next_power_of_two());
3641
3642 let instance_id = match &collection.extra_state {
3643 CollectionStateExtra::Ingestion(i) => Some(i.instance_id),
3644 CollectionStateExtra::Export(e) => Some(e.cluster_id()),
3645 CollectionStateExtra::None => None,
3646 };
3647 let workload_class = instance_id
3648 .and_then(|id| self.instances.get(&id))
3649 .and_then(|i| i.workload_class.clone());
3650 let labels = match workload_class {
3651 Some(wc) => [("workload_class", wc.clone())].into(),
3652 None => BTreeMap::new(),
3653 };
3654
3655 let key = (histogram_period, bucket, labels);
3656 *stash.entry(key).or_default() += Diff::ONE;
3657 }
3658 }
3659
3660 self.maybe_record_wallclock_lag();
3662 }
3663
3664 fn maybe_record_wallclock_lag(&mut self) {
3672 if self.read_only {
3673 return;
3674 }
3675
3676 let duration_trunc = |datetime: DateTime<_>, interval| {
3677 let td = TimeDelta::from_std(interval).ok()?;
3678 datetime.duration_trunc(td).ok()
3679 };
3680
3681 let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(self.config.config_set());
3682 let now_dt = mz_ore::now::to_datetime((self.now)());
3683 let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
3684 soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
3685 let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
3686 duration_trunc(now_dt, *default).unwrap()
3687 });
3688 if now_trunc <= self.wallclock_lag_last_recorded {
3689 return;
3690 }
3691
3692 let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
3693
3694 let mut history_updates = Vec::new();
3695 let mut histogram_updates = Vec::new();
3696 let mut row_buf = Row::default();
3697 for frontiers in self.storage_collections.active_collection_frontiers() {
3698 let id = frontiers.id;
3699 let Some(collection) = self.collections.get_mut(&id) else {
3700 continue;
3701 };
3702
3703 let max_lag = std::mem::replace(&mut collection.wallclock_lag_max, WallclockLag::MIN);
3704 let row = Row::pack_slice(&[
3705 Datum::String(&id.to_string()),
3706 Datum::Null,
3707 max_lag.into_interval_datum(),
3708 Datum::TimestampTz(now_ts),
3709 ]);
3710 history_updates.push((row, Diff::ONE));
3711
3712 let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
3713 continue;
3714 };
3715
3716 for ((period, lag, labels), count) in std::mem::take(stash) {
3717 let mut packer = row_buf.packer();
3718 packer.extend([
3719 Datum::TimestampTz(period.start),
3720 Datum::TimestampTz(period.end),
3721 Datum::String(&id.to_string()),
3722 lag.into_uint64_datum(),
3723 ]);
3724 let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
3725 packer.push_dict(labels);
3726
3727 histogram_updates.push((row_buf.clone(), count));
3728 }
3729 }
3730
3731 if !history_updates.is_empty() {
3732 self.append_introspection_updates(
3733 IntrospectionType::WallclockLagHistory,
3734 history_updates,
3735 );
3736 }
3737 if !histogram_updates.is_empty() {
3738 self.append_introspection_updates(
3739 IntrospectionType::WallclockLagHistogram,
3740 histogram_updates,
3741 );
3742 }
3743
3744 self.wallclock_lag_last_recorded = now_trunc;
3745 }
3746
3747 fn maintain(&mut self) {
3752 self.update_frontier_introspection();
3753 self.refresh_wallclock_lag();
3754
3755 for instance in self.instances.values_mut() {
3757 instance.refresh_state_metrics();
3758 }
3759 }
3760}
3761
3762impl From<&IntrospectionType> for CollectionManagerKind {
3763 fn from(value: &IntrospectionType) -> Self {
3764 match value {
3765 IntrospectionType::ShardMapping
3766 | IntrospectionType::Frontiers
3767 | IntrospectionType::ReplicaFrontiers
3768 | IntrospectionType::StorageSourceStatistics
3769 | IntrospectionType::StorageSinkStatistics
3770 | IntrospectionType::ComputeDependencies
3771 | IntrospectionType::ComputeOperatorHydrationStatus
3772 | IntrospectionType::ComputeMaterializedViewRefreshes
3773 | IntrospectionType::ComputeErrorCounts
3774 | IntrospectionType::ComputeHydrationTimes => CollectionManagerKind::Differential,
3775
3776 IntrospectionType::SourceStatusHistory
3777 | IntrospectionType::SinkStatusHistory
3778 | IntrospectionType::PrivatelinkConnectionStatusHistory
3779 | IntrospectionType::ReplicaStatusHistory
3780 | IntrospectionType::ReplicaMetricsHistory
3781 | IntrospectionType::WallclockLagHistory
3782 | IntrospectionType::WallclockLagHistogram
3783 | IntrospectionType::PreparedStatementHistory
3784 | IntrospectionType::StatementExecutionHistory
3785 | IntrospectionType::SessionHistory
3786 | IntrospectionType::StatementLifecycleHistory
3787 | IntrospectionType::SqlText => CollectionManagerKind::AppendOnly,
3788 }
3789 }
3790}
3791
3792async fn snapshot_statistics(
3798 id: GlobalId,
3799 upper: Antichain<Timestamp>,
3800 storage_collections: &Arc<dyn StorageCollections + Send + Sync>,
3801) -> Vec<Row> {
3802 match upper.as_option() {
3803 Some(f) if f > &Timestamp::MIN => {
3804 let as_of = f.step_back().unwrap();
3805
3806 let snapshot = storage_collections.snapshot(id, as_of).await.unwrap();
3807 snapshot
3808 .into_iter()
3809 .map(|(row, diff)| {
3810 assert_eq!(diff, 1);
3811 row
3812 })
3813 .collect()
3814 }
3815 _ => Vec::new(),
3818 }
3819}
3820
3821async fn read_handle_for_snapshot(
3822 persist: &PersistClientCache,
3823 id: GlobalId,
3824 metadata: &CollectionMetadata,
3825) -> Result<ReadHandle<SourceData, (), Timestamp, StorageDiff>, StorageError> {
3826 let persist_client = persist
3827 .open(metadata.persist_location.clone())
3828 .await
3829 .unwrap();
3830
3831 let read_handle = persist_client
3836 .open_leased_reader::<SourceData, (), _, _>(
3837 metadata.data_shard,
3838 Arc::new(metadata.relation_desc.clone()),
3839 Arc::new(UnitSchema),
3840 Diagnostics {
3841 shard_name: id.to_string(),
3842 handle_purpose: format!("snapshot {}", id),
3843 },
3844 USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3845 )
3846 .await
3847 .expect("invalid persist usage");
3848 Ok(read_handle)
3849}
3850
3851#[derive(Debug)]
3853struct CollectionState {
3854 pub data_source: DataSource,
3856
3857 pub collection_metadata: CollectionMetadata,
3858
3859 pub extra_state: CollectionStateExtra,
3860
3861 wallclock_lag_max: WallclockLag,
3863 wallclock_lag_histogram_stash: Option<
3870 BTreeMap<
3871 (
3872 WallclockLagHistogramPeriod,
3873 WallclockLag,
3874 BTreeMap<&'static str, String>,
3875 ),
3876 Diff,
3877 >,
3878 >,
3879 wallclock_lag_metrics: WallclockLagMetrics,
3881}
3882
3883impl CollectionState {
3884 fn new(
3885 data_source: DataSource,
3886 collection_metadata: CollectionMetadata,
3887 extra_state: CollectionStateExtra,
3888 wallclock_lag_metrics: WallclockLagMetrics,
3889 ) -> Self {
3890 let wallclock_lag_histogram_stash = match &data_source {
3894 DataSource::Other => None,
3895 _ => Some(Default::default()),
3896 };
3897
3898 Self {
3899 data_source,
3900 collection_metadata,
3901 extra_state,
3902 wallclock_lag_max: WallclockLag::MIN,
3903 wallclock_lag_histogram_stash,
3904 wallclock_lag_metrics,
3905 }
3906 }
3907}
3908
3909#[derive(Debug)]
3911enum CollectionStateExtra {
3912 Ingestion(IngestionState),
3913 Export(ExportState),
3914 None,
3915}
3916
3917#[derive(Debug)]
3919struct IngestionState {
3920 pub read_capabilities: MutableAntichain<Timestamp>,
3922
3923 pub derived_since: Antichain<Timestamp>,
3926
3927 pub dependency_read_holds: Vec<ReadHold>,
3929
3930 pub write_frontier: Antichain<Timestamp>,
3932
3933 pub hold_policy: ReadPolicy,
3940
3941 pub instance_id: StorageInstanceId,
3943
3944 pub hydrated_on: BTreeSet<ReplicaId>,
3946}
3947
3948struct StatusHistoryDesc<K> {
3953 retention_policy: StatusHistoryRetentionPolicy,
3954 extract_key: Box<dyn Fn(&[Datum]) -> K + Send>,
3955 extract_time: Box<dyn Fn(&[Datum]) -> CheckedTimestamp<DateTime<Utc>> + Send>,
3956}
3957enum StatusHistoryRetentionPolicy {
3958 LastN(usize),
3960 TimeWindow(Duration),
3962}
3963
3964fn source_status_history_desc(
3965 params: &StorageParameters,
3966) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3967 let desc = &MZ_SOURCE_STATUS_HISTORY_DESC;
3968 let (source_id_idx, _) = desc.get_by_name(&"source_id".into()).expect("exists");
3969 let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3970 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3971
3972 StatusHistoryDesc {
3973 retention_policy: StatusHistoryRetentionPolicy::LastN(
3974 params.keep_n_source_status_history_entries,
3975 ),
3976 extract_key: Box::new(move |datums| {
3977 (
3978 GlobalId::from_str(datums[source_id_idx].unwrap_str()).expect("GlobalId column"),
3979 if datums[replica_id_idx].is_null() {
3980 None
3981 } else {
3982 Some(
3983 ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3984 .expect("ReplicaId column"),
3985 )
3986 },
3987 )
3988 }),
3989 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3990 }
3991}
3992
3993fn sink_status_history_desc(
3994 params: &StorageParameters,
3995) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3996 let desc = &MZ_SINK_STATUS_HISTORY_DESC;
3997 let (sink_id_idx, _) = desc.get_by_name(&"sink_id".into()).expect("exists");
3998 let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3999 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4000
4001 StatusHistoryDesc {
4002 retention_policy: StatusHistoryRetentionPolicy::LastN(
4003 params.keep_n_sink_status_history_entries,
4004 ),
4005 extract_key: Box::new(move |datums| {
4006 (
4007 GlobalId::from_str(datums[sink_id_idx].unwrap_str()).expect("GlobalId column"),
4008 if datums[replica_id_idx].is_null() {
4009 None
4010 } else {
4011 Some(
4012 ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
4013 .expect("ReplicaId column"),
4014 )
4015 },
4016 )
4017 }),
4018 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4019 }
4020}
4021
4022fn privatelink_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<GlobalId> {
4023 let desc = &MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC;
4024 let (key_idx, _) = desc.get_by_name(&"connection_id".into()).expect("exists");
4025 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4026
4027 StatusHistoryDesc {
4028 retention_policy: StatusHistoryRetentionPolicy::LastN(
4029 params.keep_n_privatelink_status_history_entries,
4030 ),
4031 extract_key: Box::new(move |datums| {
4032 GlobalId::from_str(datums[key_idx].unwrap_str()).expect("GlobalId column")
4033 }),
4034 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4035 }
4036}
4037
4038fn replica_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<(GlobalId, u64)> {
4039 let desc = &REPLICA_STATUS_HISTORY_DESC;
4040 let (replica_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
4041 let (process_idx, _) = desc.get_by_name(&"process_id".into()).expect("exists");
4042 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
4043
4044 StatusHistoryDesc {
4045 retention_policy: StatusHistoryRetentionPolicy::TimeWindow(
4046 params.replica_status_history_retention_window,
4047 ),
4048 extract_key: Box::new(move |datums| {
4049 (
4050 GlobalId::from_str(datums[replica_idx].unwrap_str()).expect("GlobalId column"),
4051 datums[process_idx].unwrap_uint64(),
4052 )
4053 }),
4054 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
4055 }
4056}
4057
4058fn swap_updates(
4060 from: &mut Antichain<Timestamp>,
4061 mut replace_with: Antichain<Timestamp>,
4062) -> ChangeBatch<Timestamp> {
4063 let mut update = ChangeBatch::new();
4064 if PartialOrder::less_equal(from, &replace_with) {
4065 update.extend(replace_with.iter().map(|time| (*time, 1)));
4066 std::mem::swap(from, &mut replace_with);
4067 update.extend(replace_with.iter().map(|time| (*time, -1)));
4068 }
4069 update
4070}