1use std::any::Any;
13use std::collections::btree_map;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt::{Debug, Display};
16use std::str::FromStr;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use async_trait::async_trait;
21use chrono::{DateTime, DurationRound, TimeDelta, Utc};
22use derivative::Derivative;
23use differential_dataflow::lattice::Lattice;
24use futures::FutureExt;
25use futures::StreamExt;
26use itertools::Itertools;
27use mz_build_info::BuildInfo;
28use mz_cluster_client::client::ClusterReplicaLocation;
29use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
30use mz_cluster_client::{ReplicaId, WallclockLagFn};
31use mz_controller_types::dyncfgs::{
32 ENABLE_0DT_DEPLOYMENT_SOURCES, WALLCLOCK_LAG_RECORDING_INTERVAL,
33};
34use mz_ore::collections::CollectionExt;
35use mz_ore::metrics::MetricsRegistry;
36use mz_ore::now::{EpochMillis, NowFn};
37use mz_ore::task::AbortOnDropHandle;
38use mz_ore::{assert_none, halt, instrument, soft_panic_or_log};
39use mz_persist_client::batch::ProtoBatch;
40use mz_persist_client::cache::PersistClientCache;
41use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
42use mz_persist_client::read::ReadHandle;
43use mz_persist_client::schema::CaESchema;
44use mz_persist_client::write::WriteHandle;
45use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
46use mz_persist_types::Codec64;
47use mz_persist_types::codec_impls::UnitSchema;
48use mz_repr::adt::timestamp::CheckedTimestamp;
49use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
50use mz_storage_client::client::{
51 AppendOnlyUpdate, RunIngestionCommand, RunOneshotIngestion, RunSinkCommand, Status,
52 StatusUpdate, StorageCommand, StorageResponse, TableData,
53};
54use mz_storage_client::controller::{
55 BoxFuture, CollectionDescription, DataSource, ExportDescription, ExportState,
56 IntrospectionType, MonotonicAppender, PersistEpoch, Response, StorageController,
57 StorageMetadata, StorageTxn, StorageWriteOp, WallclockLag, WallclockLagHistogramPeriod,
58};
59use mz_storage_client::healthcheck::{
60 MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC,
61 MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_STATUS_HISTORY_DESC,
62};
63use mz_storage_client::metrics::StorageControllerMetrics;
64use mz_storage_client::statistics::{
65 ControllerSinkStatistics, ControllerSourceStatistics, WebhookStatistics,
66};
67use mz_storage_client::storage_collections::StorageCollections;
68use mz_storage_types::configuration::StorageConfiguration;
69use mz_storage_types::connections::ConnectionContext;
70use mz_storage_types::connections::inline::InlinedConnection;
71use mz_storage_types::controller::{AlterError, CollectionMetadata, StorageError, TxnsCodecRow};
72use mz_storage_types::errors::CollectionMissing;
73use mz_storage_types::instances::StorageInstanceId;
74use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
75use mz_storage_types::parameters::StorageParameters;
76use mz_storage_types::read_holds::ReadHold;
77use mz_storage_types::read_policy::ReadPolicy;
78use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
79use mz_storage_types::sources::{
80 GenericSourceConnection, IngestionDescription, SourceConnection, SourceData, SourceDesc,
81 SourceExport, SourceExportDataConfig,
82};
83use mz_storage_types::{AlterCompatible, StorageDiff, dyncfgs};
84use mz_txn_wal::metrics::Metrics as TxnMetrics;
85use mz_txn_wal::txn_read::TxnsRead;
86use mz_txn_wal::txns::TxnsHandle;
87use timely::order::{PartialOrder, TotalOrder};
88use timely::progress::Timestamp as TimelyTimestamp;
89use timely::progress::frontier::MutableAntichain;
90use timely::progress::{Antichain, ChangeBatch, Timestamp};
91use tokio::sync::watch::{Sender, channel};
92use tokio::sync::{mpsc, oneshot};
93use tokio::time::MissedTickBehavior;
94use tokio::time::error::Elapsed;
95use tracing::{debug, info, warn};
96
97use crate::collection_mgmt::{
98 AppendOnlyIntrospectionConfig, CollectionManagerKind, DifferentialIntrospectionConfig,
99};
100use crate::instance::{Instance, ReplicaConfig};
101
102mod collection_mgmt;
103mod history;
104mod instance;
105mod persist_handles;
106mod rtr;
107mod statistics;
108
109#[derive(Derivative)]
110#[derivative(Debug)]
111struct PendingOneshotIngestion {
112 #[derivative(Debug = "ignore")]
114 result_tx: OneshotResultCallback<ProtoBatch>,
115 cluster_id: StorageInstanceId,
117}
118
119impl PendingOneshotIngestion {
120 pub(crate) fn cancel(self) {
124 (self.result_tx)(vec![Err("canceled".to_string())])
125 }
126}
127
128#[derive(Derivative)]
130#[derivative(Debug)]
131pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation>
132{
133 build_info: &'static BuildInfo,
135 now: NowFn,
137
138 read_only: bool,
144
145 pub(crate) collections: BTreeMap<GlobalId, CollectionState<T>>,
150
151 dropped_objects: BTreeMap<GlobalId, BTreeSet<ReplicaId>>,
160
161 pub(crate) persist_table_worker: persist_handles::PersistTableWriteWorker<T>,
163 txns_read: TxnsRead<T>,
165 txns_metrics: Arc<TxnMetrics>,
166 stashed_responses: Vec<(Option<ReplicaId>, StorageResponse<T>)>,
167 #[derivative(Debug = "ignore")]
169 pending_table_handle_drops_tx: mpsc::UnboundedSender<GlobalId>,
170 #[derivative(Debug = "ignore")]
172 pending_table_handle_drops_rx: mpsc::UnboundedReceiver<GlobalId>,
173 #[derivative(Debug = "ignore")]
175 pending_oneshot_ingestions: BTreeMap<uuid::Uuid, PendingOneshotIngestion>,
176
177 pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
179
180 pub(crate) introspection_ids: BTreeMap<IntrospectionType, GlobalId>,
182 introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
187
188 source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
193 sink_statistics: Arc<Mutex<BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSinkStatistics>>>,
196 statistics_interval_sender: Sender<Duration>,
198
199 instances: BTreeMap<StorageInstanceId, Instance<T>>,
201 initialized: bool,
203 config: StorageConfiguration,
205 persist_location: PersistLocation,
207 persist: Arc<PersistClientCache>,
209 metrics: StorageControllerMetrics,
211 recorded_frontiers: BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)>,
214 recorded_replica_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<T>>,
217
218 #[derivative(Debug = "ignore")]
220 wallclock_lag: WallclockLagFn<T>,
221 wallclock_lag_last_recorded: DateTime<Utc>,
223
224 storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
226 migrated_storage_collections: BTreeSet<GlobalId>,
228
229 maintenance_ticker: tokio::time::Interval,
231 maintenance_scheduled: bool,
233
234 instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
236 instance_response_rx: mpsc::UnboundedReceiver<(Option<ReplicaId>, StorageResponse<T>)>,
238
239 persist_warm_task: Option<AbortOnDropHandle<Box<dyn Debug + Send>>>,
241}
242
243fn warm_persist_state_in_background(
248 client: PersistClient,
249 shard_ids: impl Iterator<Item = ShardId> + Send + 'static,
250) -> mz_ore::task::JoinHandle<Box<dyn Debug + Send>> {
251 const MAX_CONCURRENT_WARMS: usize = 16;
253 let logic = async move {
254 let fetchers: Vec<_> = tokio_stream::iter(shard_ids)
255 .map(|shard_id| {
256 let client = client.clone();
257 async move {
258 client
259 .create_batch_fetcher::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
260 shard_id,
261 Arc::new(RelationDesc::empty()),
262 Arc::new(UnitSchema),
263 true,
264 Diagnostics::from_purpose("warm persist load state"),
265 )
266 .await
267 }
268 })
269 .buffer_unordered(MAX_CONCURRENT_WARMS)
270 .collect()
271 .await;
272 let fetchers: Box<dyn Debug + Send> = Box::new(fetchers);
273 fetchers
274 };
275 mz_ore::task::spawn(|| "warm_persist_load_state", logic)
276}
277
278#[async_trait(?Send)]
279impl<T> StorageController for Controller<T>
280where
281 T: Timestamp
282 + Lattice
283 + TotalOrder
284 + Codec64
285 + From<EpochMillis>
286 + TimestampManipulation
287 + Into<Datum<'static>>
288 + Display,
289{
290 type Timestamp = T;
291
292 fn initialization_complete(&mut self) {
293 self.reconcile_dangling_statistics();
294 self.initialized = true;
295
296 for instance in self.instances.values_mut() {
297 instance.send(StorageCommand::InitializationComplete);
298 }
299 }
300
301 fn update_parameters(&mut self, config_params: StorageParameters) {
302 self.storage_collections
303 .update_parameters(config_params.clone());
304
305 self.persist.cfg().apply_from(&config_params.dyncfg_updates);
308
309 for instance in self.instances.values_mut() {
310 let params = Box::new(config_params.clone());
311 instance.send(StorageCommand::UpdateConfiguration(params));
312 }
313 self.config.update(config_params);
314 self.statistics_interval_sender
315 .send_replace(self.config.parameters.statistics_interval);
316 self.collection_manager.update_user_batch_duration(
317 self.config
318 .parameters
319 .user_storage_managed_collections_batch_duration,
320 );
321 }
322
323 fn config(&self) -> &StorageConfiguration {
325 &self.config
326 }
327
328 fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing> {
329 self.storage_collections.collection_metadata(id)
330 }
331
332 fn collection_hydrated(
333 &self,
334 collection_id: GlobalId,
335 ) -> Result<bool, StorageError<Self::Timestamp>> {
336 let collection = self.collection(collection_id)?;
337
338 let instance_id = match &collection.data_source {
339 DataSource::Ingestion(ingestion_description) => ingestion_description.instance_id,
340 DataSource::IngestionExport { ingestion_id, .. } => {
341 let ingestion_state = self.collections.get(ingestion_id).expect("known to exist");
342
343 let instance_id = match &ingestion_state.data_source {
344 DataSource::Ingestion(ingestion_desc) => ingestion_desc.instance_id,
345 _ => unreachable!("SourceExport must only refer to primary source"),
346 };
347
348 instance_id
349 }
350 _ => return Ok(true),
351 };
352
353 let instance = self.instances.get(&instance_id).ok_or_else(|| {
354 StorageError::IngestionInstanceMissing {
355 storage_instance_id: instance_id,
356 ingestion_id: collection_id,
357 }
358 })?;
359
360 if instance.replica_ids().next().is_none() {
361 return Ok(true);
364 }
365
366 match &collection.extra_state {
367 CollectionStateExtra::Ingestion(ingestion_state) => {
368 Ok(ingestion_state.hydrated_on.len() >= 1)
370 }
371 CollectionStateExtra::Export(_) => {
372 Ok(true)
377 }
378 CollectionStateExtra::None => {
379 Ok(true)
383 }
384 }
385 }
386
387 #[mz_ore::instrument(level = "debug")]
388 fn collections_hydrated_on_replicas(
389 &self,
390 target_replica_ids: Option<Vec<ReplicaId>>,
391 target_cluster_id: &StorageInstanceId,
392 exclude_collections: &BTreeSet<GlobalId>,
393 ) -> Result<bool, StorageError<Self::Timestamp>> {
394 if target_replica_ids.as_ref().is_some_and(|v| v.is_empty()) {
397 return Ok(true);
398 }
399
400 let target_replicas: Option<BTreeSet<ReplicaId>> =
403 target_replica_ids.map(|ids| ids.into_iter().collect());
404
405 let mut all_hydrated = true;
406 for (collection_id, collection_state) in self.collections.iter() {
407 if collection_id.is_transient() || exclude_collections.contains(collection_id) {
408 continue;
409 }
410 let hydrated = match &collection_state.extra_state {
411 CollectionStateExtra::Ingestion(state) => {
412 if &state.instance_id != target_cluster_id {
413 continue;
414 }
415 match &target_replicas {
416 Some(target_replicas) => !state.hydrated_on.is_disjoint(target_replicas),
417 None => {
418 state.hydrated_on.len() >= 1
421 }
422 }
423 }
424 CollectionStateExtra::Export(_) => {
425 true
430 }
431 CollectionStateExtra::None => {
432 true
436 }
437 };
438 if !hydrated {
439 tracing::info!(%collection_id, "collection is not hydrated on any replica");
440 all_hydrated = false;
441 }
444 }
445 Ok(all_hydrated)
446 }
447
448 fn collection_frontiers(
449 &self,
450 id: GlobalId,
451 ) -> Result<(Antichain<Self::Timestamp>, Antichain<Self::Timestamp>), CollectionMissing> {
452 let frontiers = self.storage_collections.collection_frontiers(id)?;
453 Ok((frontiers.implied_capability, frontiers.write_frontier))
454 }
455
456 fn collections_frontiers(
457 &self,
458 mut ids: Vec<GlobalId>,
459 ) -> Result<Vec<(GlobalId, Antichain<T>, Antichain<T>)>, CollectionMissing> {
460 let mut result = vec![];
461 ids.retain(|&id| match self.export(id) {
466 Ok(export) => {
467 result.push((
468 id,
469 export.input_hold().since().clone(),
470 export.write_frontier.clone(),
471 ));
472 false
473 }
474 Err(_) => true,
475 });
476 result.extend(
477 self.storage_collections
478 .collections_frontiers(ids)?
479 .into_iter()
480 .map(|frontiers| {
481 (
482 frontiers.id,
483 frontiers.implied_capability,
484 frontiers.write_frontier,
485 )
486 }),
487 );
488
489 Ok(result)
490 }
491
492 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
493 self.storage_collections.active_collection_metadatas()
494 }
495
496 fn active_ingestion_exports(
497 &self,
498 instance_id: StorageInstanceId,
499 ) -> Box<dyn Iterator<Item = &GlobalId> + '_> {
500 let active_storage_collections: BTreeMap<_, _> = self
501 .storage_collections
502 .active_collection_frontiers()
503 .into_iter()
504 .map(|c| (c.id, c))
505 .collect();
506
507 let active_exports = self.instances[&instance_id]
508 .active_ingestion_exports()
509 .filter(move |id| {
510 let frontiers = active_storage_collections.get(id);
511 match frontiers {
512 Some(frontiers) => !frontiers.write_frontier.is_empty(),
513 None => {
514 false
516 }
517 }
518 });
519
520 Box::new(active_exports)
521 }
522
523 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
524 self.storage_collections.check_exists(id)
525 }
526
527 fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>) {
528 let metrics = self.metrics.for_instance(id);
529 let mut instance = Instance::new(
530 workload_class,
531 metrics,
532 self.now.clone(),
533 self.instance_response_tx.clone(),
534 );
535 if self.initialized {
536 instance.send(StorageCommand::InitializationComplete);
537 }
538 if !self.read_only {
539 instance.send(StorageCommand::AllowWrites);
540 }
541
542 let params = Box::new(self.config.parameters.clone());
543 instance.send(StorageCommand::UpdateConfiguration(params));
544
545 let old_instance = self.instances.insert(id, instance);
546 assert_none!(old_instance, "storage instance {id} already exists");
547 }
548
549 fn drop_instance(&mut self, id: StorageInstanceId) {
550 let instance = self.instances.remove(&id);
551 assert!(instance.is_some(), "storage instance {id} does not exist");
552 }
553
554 fn update_instance_workload_class(
555 &mut self,
556 id: StorageInstanceId,
557 workload_class: Option<String>,
558 ) {
559 let instance = self
560 .instances
561 .get_mut(&id)
562 .unwrap_or_else(|| panic!("instance {id} does not exist"));
563
564 instance.workload_class = workload_class;
565 }
566
567 fn connect_replica(
568 &mut self,
569 instance_id: StorageInstanceId,
570 replica_id: ReplicaId,
571 location: ClusterReplicaLocation,
572 ) {
573 let instance = self
574 .instances
575 .get_mut(&instance_id)
576 .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
577
578 let config = ReplicaConfig {
579 build_info: self.build_info,
580 location,
581 grpc_client: self.config.parameters.grpc_client.clone(),
582 };
583 instance.add_replica(replica_id, config);
584 }
585
586 fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId) {
587 let instance = self
588 .instances
589 .get_mut(&instance_id)
590 .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
591
592 let status_now = mz_ore::now::to_datetime((self.now)());
593 let mut source_status_updates = vec![];
594 let mut sink_status_updates = vec![];
595
596 let make_update = |id, object_type| StatusUpdate {
597 id,
598 status: Status::Paused,
599 timestamp: status_now,
600 error: None,
601 hints: BTreeSet::from([format!(
602 "The replica running this {object_type} has been dropped"
603 )]),
604 namespaced_errors: Default::default(),
605 replica_id: Some(replica_id),
606 };
607
608 for ingestion_id in instance.active_ingestions() {
609 if let Some(active_replicas) = self.dropped_objects.get_mut(ingestion_id) {
610 active_replicas.remove(&replica_id);
611 if active_replicas.is_empty() {
612 self.dropped_objects.remove(ingestion_id);
613 }
614 }
615
616 let ingestion = self
617 .collections
618 .get_mut(ingestion_id)
619 .expect("instance contains unknown ingestion");
620
621 let ingestion_description = match &ingestion.data_source {
622 DataSource::Ingestion(ingestion_description) => ingestion_description.clone(),
623 _ => panic!(
624 "unexpected data source for ingestion: {:?}",
625 ingestion.data_source
626 ),
627 };
628
629 let old_style_ingestion = *ingestion_id != ingestion_description.remap_collection_id;
630 let subsource_ids = ingestion_description.collection_ids().filter(|id| {
631 let should_discard =
636 old_style_ingestion && id == &ingestion_description.remap_collection_id;
637 !should_discard
638 });
639 for id in subsource_ids {
640 source_status_updates.push(make_update(id, "source"));
641 }
642 }
643
644 for id in instance.active_exports() {
645 if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
646 active_replicas.remove(&replica_id);
647 if active_replicas.is_empty() {
648 self.dropped_objects.remove(id);
649 }
650 }
651
652 sink_status_updates.push(make_update(*id, "sink"));
653 }
654
655 instance.drop_replica(replica_id);
656
657 if !self.read_only {
658 if !source_status_updates.is_empty() {
659 self.append_status_introspection_updates(
660 IntrospectionType::SourceStatusHistory,
661 source_status_updates,
662 );
663 }
664 if !sink_status_updates.is_empty() {
665 self.append_status_introspection_updates(
666 IntrospectionType::SinkStatusHistory,
667 sink_status_updates,
668 );
669 }
670 }
671 }
672
673 async fn evolve_nullability_for_bootstrap(
674 &mut self,
675 storage_metadata: &StorageMetadata,
676 collections: Vec<(GlobalId, RelationDesc)>,
677 ) -> Result<(), StorageError<Self::Timestamp>> {
678 let persist_client = self
679 .persist
680 .open(self.persist_location.clone())
681 .await
682 .unwrap();
683
684 for (global_id, relation_desc) in collections {
685 let shard_id = storage_metadata.get_collection_shard(global_id)?;
686 let diagnostics = Diagnostics {
687 shard_name: global_id.to_string(),
688 handle_purpose: "evolve nullability for bootstrap".to_string(),
689 };
690 let latest_schema = persist_client
691 .latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
692 .await
693 .expect("invalid persist usage");
694 let Some((schema_id, current_schema, _)) = latest_schema else {
695 tracing::debug!(?global_id, "no schema registered");
696 continue;
697 };
698 tracing::debug!(?global_id, ?current_schema, new_schema = ?relation_desc, "migrating schema");
699
700 let diagnostics = Diagnostics {
701 shard_name: global_id.to_string(),
702 handle_purpose: "evolve nullability for bootstrap".to_string(),
703 };
704 let evolve_result = persist_client
705 .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
706 shard_id,
707 schema_id,
708 &relation_desc,
709 &UnitSchema,
710 diagnostics,
711 )
712 .await
713 .expect("invalid persist usage");
714 match evolve_result {
715 CaESchema::Ok(_) => (),
716 CaESchema::ExpectedMismatch {
717 schema_id,
718 key,
719 val: _,
720 } => {
721 return Err(StorageError::PersistSchemaEvolveRace {
722 global_id,
723 shard_id,
724 schema_id,
725 relation_desc: key,
726 });
727 }
728 CaESchema::Incompatible => {
729 return Err(StorageError::PersistInvalidSchemaEvolve {
730 global_id,
731 shard_id,
732 });
733 }
734 };
735 }
736
737 Ok(())
738 }
739
740 #[instrument(name = "storage::create_collections")]
759 async fn create_collections_for_bootstrap(
760 &mut self,
761 storage_metadata: &StorageMetadata,
762 register_ts: Option<Self::Timestamp>,
763 mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
764 migrated_storage_collections: &BTreeSet<GlobalId>,
765 ) -> Result<(), StorageError<Self::Timestamp>> {
766 self.migrated_storage_collections
767 .extend(migrated_storage_collections.iter().cloned());
768
769 self.storage_collections
770 .create_collections_for_bootstrap(
771 storage_metadata,
772 register_ts.clone(),
773 collections.clone(),
774 migrated_storage_collections,
775 )
776 .await?;
777
778 drop(self.persist_warm_task.take());
781
782 collections.sort_by_key(|(id, _)| *id);
787 collections.dedup();
788 for pos in 1..collections.len() {
789 if collections[pos - 1].0 == collections[pos].0 {
790 return Err(StorageError::CollectionIdReused(collections[pos].0));
791 }
792 }
793
794 let enriched_with_metadata = collections
796 .into_iter()
797 .map(|(id, description)| {
798 let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
799
800 let txns_shard = description
803 .data_source
804 .in_txns()
805 .then(|| *self.txns_read.txns_id());
806
807 let metadata = CollectionMetadata {
808 persist_location: self.persist_location.clone(),
809 data_shard,
810 relation_desc: description.desc.clone(),
811 txns_shard,
812 };
813
814 Ok((id, description, metadata))
815 })
816 .collect_vec();
817
818 let persist_client = self
820 .persist
821 .open(self.persist_location.clone())
822 .await
823 .unwrap();
824 let persist_client = &persist_client;
825
826 use futures::stream::{StreamExt, TryStreamExt};
829 let this = &*self;
830 let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
831 .map(|data: Result<_, StorageError<Self::Timestamp>>| {
832 async move {
833 let (id, description, metadata) = data?;
834
835 debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
838
839 let write = this
840 .open_data_handles(
841 &id,
842 metadata.data_shard,
843 metadata.relation_desc.clone(),
844 persist_client,
845 )
846 .await;
847
848 Ok::<_, StorageError<T>>((id, description, write, metadata))
849 }
850 })
851 .buffer_unordered(50)
853 .try_collect()
866 .await?;
867
868 let mut to_execute = BTreeSet::new();
871 let mut new_collections = BTreeSet::new();
876 let mut table_registers = Vec::with_capacity(to_register.len());
877
878 to_register.sort_by_key(|(id, ..)| *id);
880
881 let (tables_to_register, collections_to_register): (Vec<_>, Vec<_>) = to_register
887 .into_iter()
888 .partition(|(_id, desc, ..)| desc.data_source == DataSource::Table);
889 let to_register = tables_to_register
890 .into_iter()
891 .rev()
892 .chain(collections_to_register.into_iter());
893
894 let mut new_source_statistic_entries = BTreeSet::new();
898 let mut new_webhook_statistic_entries = BTreeSet::new();
899 let mut new_sink_statistic_entries = BTreeSet::new();
900
901 for (id, description, write, metadata) in to_register {
902 let is_in_txns = |id, metadata: &CollectionMetadata| {
903 metadata.txns_shard.is_some()
904 && !(self.read_only && migrated_storage_collections.contains(&id))
905 };
906
907 to_execute.insert(id);
908 new_collections.insert(id);
909
910 let write_frontier = write.upper();
911
912 let storage_dependencies = self.determine_collection_dependencies(id, &description)?;
914
915 let dependency_read_holds = self
916 .storage_collections
917 .acquire_read_holds(storage_dependencies)
918 .expect("can acquire read holds");
919
920 let mut dependency_since = Antichain::from_elem(T::minimum());
921 for read_hold in dependency_read_holds.iter() {
922 dependency_since.join_assign(read_hold.since());
923 }
924
925 let data_source = description.data_source;
926
927 if !dependency_read_holds.is_empty()
936 && !is_in_txns(id, &metadata)
937 && !matches!(&data_source, DataSource::Sink { .. })
938 {
939 if dependency_since.is_empty() {
945 halt!(
946 "dependency since frontier is empty while dependent upper \
947 is not empty (dependent id={id}, write_frontier={:?}, dependency_read_holds={:?}), \
948 this indicates concurrent deletion of a collection",
949 write_frontier,
950 dependency_read_holds,
951 );
952 }
953
954 mz_ore::soft_assert_or_log!(
972 write_frontier.elements() == &[T::minimum()]
973 || write_frontier.is_empty()
974 || PartialOrder::less_than(&dependency_since, write_frontier),
975 "dependency since has advanced past dependent ({id}) upper \n
976 dependent ({id}): upper {:?} \n
977 dependency since {:?} \n
978 dependency read holds: {:?}",
979 write_frontier,
980 dependency_since,
981 dependency_read_holds,
982 );
983 }
984
985 let mut extra_state = CollectionStateExtra::None;
987 let mut maybe_instance_id = None;
988 match &data_source {
989 DataSource::Introspection(typ) => {
990 debug!(
991 ?data_source, meta = ?metadata,
992 "registering {id} with persist monotonic worker",
993 );
994 self.register_introspection_collection(
1000 id,
1001 *typ,
1002 write,
1003 persist_client.clone(),
1004 )?;
1005 }
1006 DataSource::Webhook => {
1007 debug!(
1008 ?data_source, meta = ?metadata,
1009 "registering {id} with persist monotonic worker",
1010 );
1011 new_source_statistic_entries.insert(id);
1012 new_webhook_statistic_entries.insert(id);
1015 self.collection_manager
1021 .register_append_only_collection(id, write, false, None);
1022 }
1023 DataSource::IngestionExport {
1024 ingestion_id,
1025 details,
1026 data_config,
1027 } => {
1028 debug!(
1029 ?data_source, meta = ?metadata,
1030 "not registering {id} with a controller persist worker",
1031 );
1032 let ingestion_state = self
1034 .collections
1035 .get_mut(ingestion_id)
1036 .expect("known to exist");
1037
1038 let instance_id = match &mut ingestion_state.data_source {
1039 DataSource::Ingestion(ingestion_desc) => {
1040 ingestion_desc.source_exports.insert(
1041 id,
1042 SourceExport {
1043 storage_metadata: (),
1044 details: details.clone(),
1045 data_config: data_config.clone(),
1046 },
1047 );
1048
1049 ingestion_desc.instance_id
1054 }
1055 _ => unreachable!(
1056 "SourceExport must only refer to primary sources that already exist"
1057 ),
1058 };
1059
1060 to_execute.remove(&id);
1062 to_execute.insert(*ingestion_id);
1063
1064 let ingestion_state = IngestionState {
1065 read_capabilities: MutableAntichain::from(dependency_since.clone()),
1066 dependency_read_holds,
1067 derived_since: dependency_since,
1068 write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1069 hold_policy: ReadPolicy::step_back(),
1070 instance_id,
1071 hydrated_on: BTreeSet::new(),
1072 };
1073
1074 extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1075 maybe_instance_id = Some(instance_id);
1076
1077 new_source_statistic_entries.insert(id);
1078 }
1079 DataSource::Table => {
1080 debug!(
1081 ?data_source, meta = ?metadata,
1082 "registering {id} with persist table worker",
1083 );
1084 table_registers.push((id, write));
1085 }
1086 DataSource::Progress | DataSource::Other => {
1087 debug!(
1088 ?data_source, meta = ?metadata,
1089 "not registering {id} with a controller persist worker",
1090 );
1091 }
1092 DataSource::Ingestion(ingestion_desc) => {
1093 debug!(
1094 ?data_source, meta = ?metadata,
1095 "not registering {id} with a controller persist worker",
1096 );
1097
1098 let mut dependency_since = Antichain::from_elem(T::minimum());
1099 for read_hold in dependency_read_holds.iter() {
1100 dependency_since.join_assign(read_hold.since());
1101 }
1102
1103 let ingestion_state = IngestionState {
1104 read_capabilities: MutableAntichain::from(dependency_since.clone()),
1105 dependency_read_holds,
1106 derived_since: dependency_since,
1107 write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1108 hold_policy: ReadPolicy::step_back(),
1109 instance_id: ingestion_desc.instance_id,
1110 hydrated_on: BTreeSet::new(),
1111 };
1112
1113 extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1114 maybe_instance_id = Some(ingestion_desc.instance_id);
1115
1116 new_source_statistic_entries.insert(id);
1117 }
1118 DataSource::Sink { desc } => {
1119 let mut dependency_since = Antichain::from_elem(T::minimum());
1120 for read_hold in dependency_read_holds.iter() {
1121 dependency_since.join_assign(read_hold.since());
1122 }
1123
1124 let [self_hold, read_hold] =
1125 dependency_read_holds.try_into().expect("two holds");
1126
1127 let state = ExportState::new(
1128 desc.instance_id,
1129 read_hold,
1130 self_hold,
1131 write_frontier.clone(),
1132 ReadPolicy::step_back(),
1133 );
1134 maybe_instance_id = Some(state.cluster_id);
1135 extra_state = CollectionStateExtra::Export(state);
1136
1137 new_sink_statistic_entries.insert(id);
1138 }
1139 }
1140
1141 let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(id, maybe_instance_id);
1142 let collection_state =
1143 CollectionState::new(data_source, metadata, extra_state, wallclock_lag_metrics);
1144
1145 self.collections.insert(id, collection_state);
1146 }
1147
1148 {
1149 let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1150
1151 for id in new_webhook_statistic_entries {
1154 source_statistics.webhook_statistics.entry(id).or_default();
1155 }
1156
1157 }
1161
1162 if !table_registers.is_empty() {
1164 let register_ts = register_ts
1165 .expect("caller should have provided a register_ts when creating a table");
1166
1167 if self.read_only {
1168 table_registers
1178 .retain(|(id, _write_handle)| migrated_storage_collections.contains(id));
1179
1180 self.persist_table_worker
1181 .register(register_ts, table_registers)
1182 .await
1183 .expect("table worker unexpectedly shut down");
1184 } else {
1185 self.persist_table_worker
1186 .register(register_ts, table_registers)
1187 .await
1188 .expect("table worker unexpectedly shut down");
1189 }
1190 }
1191
1192 self.append_shard_mappings(new_collections.into_iter(), Diff::ONE);
1193
1194 for id in to_execute {
1196 match &self.collection(id)?.data_source {
1197 DataSource::Ingestion(ingestion) => {
1198 if !self.read_only
1199 || (ENABLE_0DT_DEPLOYMENT_SOURCES.get(self.config.config_set())
1200 && ingestion.desc.connection.supports_read_only())
1201 {
1202 self.run_ingestion(id)?;
1203 }
1204 }
1205 DataSource::IngestionExport { .. } => unreachable!(
1206 "ingestion exports do not execute directly, but instead schedule their source to be re-executed"
1207 ),
1208 DataSource::Introspection(_)
1209 | DataSource::Webhook
1210 | DataSource::Table
1211 | DataSource::Progress
1212 | DataSource::Other => {}
1213 DataSource::Sink { .. } => {
1214 if !self.read_only {
1215 self.run_export(id)?;
1216 }
1217 }
1218 };
1219 }
1220
1221 Ok(())
1222 }
1223
1224 fn check_alter_ingestion_source_desc(
1225 &mut self,
1226 ingestion_id: GlobalId,
1227 source_desc: &SourceDesc,
1228 ) -> Result<(), StorageError<Self::Timestamp>> {
1229 let source_collection = self.collection(ingestion_id)?;
1230 let data_source = &source_collection.data_source;
1231 match &data_source {
1232 DataSource::Ingestion(cur_ingestion) => {
1233 cur_ingestion
1234 .desc
1235 .alter_compatible(ingestion_id, source_desc)?;
1236 }
1237 o => {
1238 tracing::info!(
1239 "{ingestion_id} inalterable because its data source is {:?} and not an ingestion",
1240 o
1241 );
1242 Err(AlterError { id: ingestion_id })?
1243 }
1244 }
1245
1246 Ok(())
1247 }
1248
1249 async fn alter_ingestion_connections(
1250 &mut self,
1251 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
1252 ) -> Result<(), StorageError<Self::Timestamp>> {
1253 self.storage_collections
1255 .alter_ingestion_connections(source_connections.clone())
1256 .await?;
1257
1258 let mut ingestions_to_run = BTreeSet::new();
1259
1260 for (id, conn) in source_connections {
1261 let collection = self
1262 .collections
1263 .get_mut(&id)
1264 .ok_or_else(|| StorageError::IdentifierMissing(id))?;
1265
1266 match &mut collection.data_source {
1267 DataSource::Ingestion(ingestion) => {
1268 if ingestion.desc.connection != conn {
1271 tracing::info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
1272 ingestion.desc.connection = conn;
1273 ingestions_to_run.insert(id);
1274 } else {
1275 tracing::warn!(
1276 "update_source_connection called on {id} but the \
1277 connection was the same"
1278 );
1279 }
1280 }
1281 o => {
1282 tracing::warn!("update_source_connection called on {:?}", o);
1283 Err(StorageError::IdentifierInvalid(id))?;
1284 }
1285 }
1286 }
1287
1288 for id in ingestions_to_run {
1289 self.run_ingestion(id)?;
1290 }
1291 Ok(())
1292 }
1293
1294 async fn alter_ingestion_export_data_configs(
1295 &mut self,
1296 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
1297 ) -> Result<(), StorageError<Self::Timestamp>> {
1298 self.storage_collections
1300 .alter_ingestion_export_data_configs(source_exports.clone())
1301 .await?;
1302
1303 let mut ingestions_to_run = BTreeSet::new();
1304
1305 for (source_export_id, new_data_config) in source_exports {
1306 let source_export_collection = self
1309 .collections
1310 .get_mut(&source_export_id)
1311 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1312 let ingestion_id = match &mut source_export_collection.data_source {
1313 DataSource::IngestionExport {
1314 ingestion_id,
1315 details: _,
1316 data_config,
1317 } => {
1318 *data_config = new_data_config.clone();
1319 *ingestion_id
1320 }
1321 o => {
1322 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1323 Err(StorageError::IdentifierInvalid(source_export_id))?
1324 }
1325 };
1326 let ingestion_collection = self
1329 .collections
1330 .get_mut(&ingestion_id)
1331 .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
1332
1333 match &mut ingestion_collection.data_source {
1334 DataSource::Ingestion(ingestion_desc) => {
1335 let source_export = ingestion_desc
1336 .source_exports
1337 .get_mut(&source_export_id)
1338 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1339
1340 if source_export.data_config != new_data_config {
1343 tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
1344 source_export.data_config = new_data_config;
1345
1346 ingestions_to_run.insert(ingestion_id);
1347 } else {
1348 tracing::warn!(
1349 "alter_ingestion_export_data_configs called on \
1350 export {source_export_id} of {ingestion_id} but \
1351 the data config was the same"
1352 );
1353 }
1354 }
1355 o => {
1356 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1357 Err(StorageError::IdentifierInvalid(ingestion_id))?
1358 }
1359 }
1360 }
1361
1362 for id in ingestions_to_run {
1363 self.run_ingestion(id)?;
1364 }
1365 Ok(())
1366 }
1367
1368 async fn alter_table_desc(
1369 &mut self,
1370 existing_collection: GlobalId,
1371 new_collection: GlobalId,
1372 new_desc: RelationDesc,
1373 expected_version: RelationVersion,
1374 register_ts: Self::Timestamp,
1375 ) -> Result<(), StorageError<Self::Timestamp>> {
1376 let data_shard = {
1377 let Controller {
1378 collections,
1379 storage_collections,
1380 ..
1381 } = self;
1382
1383 let existing = collections
1384 .get(&existing_collection)
1385 .ok_or(StorageError::IdentifierMissing(existing_collection))?;
1386 if existing.data_source != DataSource::Table {
1387 return Err(StorageError::IdentifierInvalid(existing_collection));
1388 }
1389
1390 storage_collections
1392 .alter_table_desc(
1393 existing_collection,
1394 new_collection,
1395 new_desc.clone(),
1396 expected_version,
1397 )
1398 .await?;
1399
1400 existing.collection_metadata.data_shard.clone()
1401 };
1402
1403 let persist_client = self
1404 .persist
1405 .open(self.persist_location.clone())
1406 .await
1407 .expect("invalid persist location");
1408 let write_handle = self
1409 .open_data_handles(
1410 &existing_collection,
1411 data_shard,
1412 new_desc.clone(),
1413 &persist_client,
1414 )
1415 .await;
1416
1417 let collection_meta = CollectionMetadata {
1418 persist_location: self.persist_location.clone(),
1419 data_shard,
1420 relation_desc: new_desc.clone(),
1421 txns_shard: Some(self.txns_read.txns_id().clone()),
1423 };
1424 let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(new_collection, None);
1426 let collection_state = CollectionState::new(
1427 DataSource::Table,
1428 collection_meta,
1429 CollectionStateExtra::None,
1430 wallclock_lag_metrics,
1431 );
1432
1433 self.collections.insert(new_collection, collection_state);
1436
1437 self.persist_table_worker
1438 .register(register_ts, vec![(new_collection, write_handle)])
1439 .await
1440 .expect("table worker unexpectedly shut down");
1441
1442 self.append_shard_mappings([new_collection].into_iter(), Diff::ONE);
1443
1444 Ok(())
1445 }
1446
1447 fn export(
1448 &self,
1449 id: GlobalId,
1450 ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1451 self.collections
1452 .get(&id)
1453 .and_then(|c| match &c.extra_state {
1454 CollectionStateExtra::Export(state) => Some(state),
1455 _ => None,
1456 })
1457 .ok_or(StorageError::IdentifierMissing(id))
1458 }
1459
1460 fn export_mut(
1461 &mut self,
1462 id: GlobalId,
1463 ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1464 self.collections
1465 .get_mut(&id)
1466 .and_then(|c| match &mut c.extra_state {
1467 CollectionStateExtra::Export(state) => Some(state),
1468 _ => None,
1469 })
1470 .ok_or(StorageError::IdentifierMissing(id))
1471 }
1472
1473 async fn create_oneshot_ingestion(
1475 &mut self,
1476 ingestion_id: uuid::Uuid,
1477 collection_id: GlobalId,
1478 instance_id: StorageInstanceId,
1479 request: OneshotIngestionRequest,
1480 result_tx: OneshotResultCallback<ProtoBatch>,
1481 ) -> Result<(), StorageError<Self::Timestamp>> {
1482 let collection_meta = self
1483 .collections
1484 .get(&collection_id)
1485 .ok_or_else(|| StorageError::IdentifierMissing(collection_id))?
1486 .collection_metadata
1487 .clone();
1488 let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1489 StorageError::Generic(anyhow::anyhow!("missing cluster {instance_id}"))
1491 })?;
1492 let oneshot_cmd = RunOneshotIngestion {
1493 ingestion_id,
1494 collection_id,
1495 collection_meta,
1496 request,
1497 };
1498
1499 if !self.read_only {
1500 instance.send(StorageCommand::RunOneshotIngestion(Box::new(oneshot_cmd)));
1501 let pending = PendingOneshotIngestion {
1502 result_tx,
1503 cluster_id: instance_id,
1504 };
1505 let novel = self
1506 .pending_oneshot_ingestions
1507 .insert(ingestion_id, pending);
1508 assert_none!(novel);
1509 Ok(())
1510 } else {
1511 Err(StorageError::ReadOnly)
1512 }
1513 }
1514
1515 fn cancel_oneshot_ingestion(
1516 &mut self,
1517 ingestion_id: uuid::Uuid,
1518 ) -> Result<(), StorageError<Self::Timestamp>> {
1519 if self.read_only {
1520 return Err(StorageError::ReadOnly);
1521 }
1522
1523 let pending = self
1524 .pending_oneshot_ingestions
1525 .remove(&ingestion_id)
1526 .ok_or_else(|| {
1527 StorageError::Generic(anyhow::anyhow!("missing oneshot ingestion {ingestion_id}"))
1529 })?;
1530
1531 match self.instances.get_mut(&pending.cluster_id) {
1532 Some(instance) => {
1533 instance.send(StorageCommand::CancelOneshotIngestion(ingestion_id));
1534 }
1535 None => {
1536 mz_ore::soft_panic_or_log!(
1537 "canceling oneshot ingestion on non-existent cluster, ingestion {:?}, instance {}",
1538 ingestion_id,
1539 pending.cluster_id,
1540 );
1541 }
1542 }
1543 pending.cancel();
1545
1546 Ok(())
1547 }
1548
1549 async fn alter_export(
1550 &mut self,
1551 id: GlobalId,
1552 new_description: ExportDescription<Self::Timestamp>,
1553 ) -> Result<(), StorageError<Self::Timestamp>> {
1554 let from_id = new_description.sink.from;
1555
1556 let desired_read_holds = vec![from_id.clone(), id.clone()];
1559 let [input_hold, self_hold] = self
1560 .storage_collections
1561 .acquire_read_holds(desired_read_holds)
1562 .expect("missing dependency")
1563 .try_into()
1564 .expect("expected number of holds");
1565 let from_storage_metadata = self.storage_collections.collection_metadata(from_id)?;
1566 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1567
1568 let cur_export = self.export_mut(id)?;
1570 let input_readable = cur_export
1571 .write_frontier
1572 .iter()
1573 .all(|t| input_hold.since().less_than(t));
1574 if !input_readable {
1575 return Err(StorageError::ReadBeforeSince(from_id));
1576 }
1577
1578 let new_export = ExportState {
1579 read_capabilities: cur_export.read_capabilities.clone(),
1580 cluster_id: new_description.instance_id,
1581 derived_since: cur_export.derived_since.clone(),
1582 read_holds: [input_hold, self_hold],
1583 read_policy: cur_export.read_policy.clone(),
1584 write_frontier: cur_export.write_frontier.clone(),
1585 };
1586 *cur_export = new_export;
1587
1588 let cmd = RunSinkCommand {
1589 id,
1590 description: StorageSinkDesc {
1591 from: from_id,
1592 from_desc: new_description.sink.from_desc,
1593 connection: new_description.sink.connection,
1594 envelope: new_description.sink.envelope,
1595 as_of: new_description.sink.as_of,
1596 version: new_description.sink.version,
1597 from_storage_metadata,
1598 with_snapshot: new_description.sink.with_snapshot,
1599 to_storage_metadata,
1600 },
1601 };
1602
1603 let instance = self
1605 .instances
1606 .get_mut(&new_description.instance_id)
1607 .ok_or_else(|| StorageError::ExportInstanceMissing {
1608 storage_instance_id: new_description.instance_id,
1609 export_id: id,
1610 })?;
1611
1612 instance.send(StorageCommand::RunSink(Box::new(cmd)));
1613 Ok(())
1614 }
1615
1616 async fn alter_export_connections(
1618 &mut self,
1619 exports: BTreeMap<GlobalId, StorageSinkConnection>,
1620 ) -> Result<(), StorageError<Self::Timestamp>> {
1621 let mut updates_by_instance =
1622 BTreeMap::<StorageInstanceId, Vec<(RunSinkCommand<T>, ExportDescription<T>)>>::new();
1623
1624 for (id, connection) in exports {
1625 let (mut new_export_description, as_of): (ExportDescription<Self::Timestamp>, _) = {
1633 let export = &self.collections[&id];
1634 let DataSource::Sink { desc } = &export.data_source else {
1635 panic!("export exists")
1636 };
1637 let CollectionStateExtra::Export(state) = &export.extra_state else {
1638 panic!("export exists")
1639 };
1640 let export_description = desc.clone();
1641 let as_of = state.input_hold().since().clone();
1642
1643 (export_description, as_of)
1644 };
1645 let current_sink = new_export_description.sink.clone();
1646
1647 new_export_description.sink.connection = connection;
1648
1649 current_sink.alter_compatible(id, &new_export_description.sink)?;
1651
1652 let from_storage_metadata = self
1653 .storage_collections
1654 .collection_metadata(new_export_description.sink.from)?;
1655 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1656
1657 let cmd = RunSinkCommand {
1658 id,
1659 description: StorageSinkDesc {
1660 from: new_export_description.sink.from,
1661 from_desc: new_export_description.sink.from_desc.clone(),
1662 connection: new_export_description.sink.connection.clone(),
1663 envelope: new_export_description.sink.envelope,
1664 with_snapshot: new_export_description.sink.with_snapshot,
1665 version: new_export_description.sink.version,
1666 as_of: as_of.to_owned(),
1677 from_storage_metadata,
1678 to_storage_metadata,
1679 },
1680 };
1681
1682 let update = updates_by_instance
1683 .entry(new_export_description.instance_id)
1684 .or_default();
1685 update.push((cmd, new_export_description));
1686 }
1687
1688 for (instance_id, updates) in updates_by_instance {
1689 let mut export_updates = BTreeMap::new();
1690 let mut cmds = Vec::with_capacity(updates.len());
1691
1692 for (cmd, export_state) in updates {
1693 export_updates.insert(cmd.id, export_state);
1694 cmds.push(cmd);
1695 }
1696
1697 let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1699 StorageError::ExportInstanceMissing {
1700 storage_instance_id: instance_id,
1701 export_id: *export_updates
1702 .keys()
1703 .next()
1704 .expect("set of exports not empty"),
1705 }
1706 })?;
1707
1708 for cmd in cmds {
1709 instance.send(StorageCommand::RunSink(Box::new(cmd)));
1710 }
1711
1712 for (id, new_export_description) in export_updates {
1714 let Some(state) = self.collections.get_mut(&id) else {
1715 panic!("export known to exist")
1716 };
1717 let DataSource::Sink { desc } = &mut state.data_source else {
1718 panic!("export known to exist")
1719 };
1720 *desc = new_export_description;
1721 }
1722 }
1723
1724 Ok(())
1725 }
1726
1727 fn drop_tables(
1742 &mut self,
1743 storage_metadata: &StorageMetadata,
1744 identifiers: Vec<GlobalId>,
1745 ts: Self::Timestamp,
1746 ) -> Result<(), StorageError<Self::Timestamp>> {
1747 let (table_write_ids, data_source_ids): (Vec<_>, Vec<_>) = identifiers
1749 .into_iter()
1750 .partition(|id| match self.collections[id].data_source {
1751 DataSource::Table => true,
1752 DataSource::IngestionExport { .. } | DataSource::Webhook => false,
1753 _ => panic!("identifier is not a table: {}", id),
1754 });
1755
1756 if table_write_ids.len() > 0 {
1758 let drop_notif = self
1759 .persist_table_worker
1760 .drop_handles(table_write_ids.clone(), ts);
1761 let tx = self.pending_table_handle_drops_tx.clone();
1762 mz_ore::task::spawn(|| "table-cleanup".to_string(), async move {
1763 drop_notif.await;
1764 for identifier in table_write_ids {
1765 let _ = tx.send(identifier);
1766 }
1767 });
1768 }
1769
1770 if data_source_ids.len() > 0 {
1772 self.validate_collection_ids(data_source_ids.iter().cloned())?;
1773 self.drop_sources_unvalidated(storage_metadata, data_source_ids)?;
1774 }
1775
1776 Ok(())
1777 }
1778
1779 fn drop_sources(
1780 &mut self,
1781 storage_metadata: &StorageMetadata,
1782 identifiers: Vec<GlobalId>,
1783 ) -> Result<(), StorageError<Self::Timestamp>> {
1784 self.validate_collection_ids(identifiers.iter().cloned())?;
1785 self.drop_sources_unvalidated(storage_metadata, identifiers)
1786 }
1787
1788 fn drop_sources_unvalidated(
1789 &mut self,
1790 storage_metadata: &StorageMetadata,
1791 ids: Vec<GlobalId>,
1792 ) -> Result<(), StorageError<Self::Timestamp>> {
1793 let mut ingestions_to_execute = BTreeSet::new();
1796 let mut ingestions_to_drop = BTreeSet::new();
1797 let mut source_statistics_to_drop = Vec::new();
1798
1799 let mut collections_to_drop = Vec::new();
1803
1804 for id in ids.iter() {
1805 let metadata = storage_metadata.get_collection_shard::<T>(*id);
1806 mz_ore::soft_assert_or_log!(
1807 matches!(metadata, Err(StorageError::IdentifierMissing(_))),
1808 "dropping {id}, but drop was not synchronized with storage \
1809 controller via `synchronize_collections`"
1810 );
1811
1812 let collection_state = self.collections.get(id);
1813
1814 if let Some(collection_state) = collection_state {
1815 match collection_state.data_source {
1816 DataSource::Webhook => {
1817 let fut = self.collection_manager.unregister_collection(*id);
1820 mz_ore::task::spawn(|| format!("storage-webhook-cleanup-{id}"), fut);
1821
1822 collections_to_drop.push(*id);
1823 source_statistics_to_drop.push(*id);
1824 }
1825 DataSource::Ingestion(_) => {
1826 ingestions_to_drop.insert(*id);
1827 source_statistics_to_drop.push(*id);
1828 }
1829 DataSource::IngestionExport { ingestion_id, .. } => {
1830 ingestions_to_execute.insert(ingestion_id);
1837
1838 let ingestion_state = match self.collections.get_mut(&ingestion_id) {
1840 Some(ingestion_collection) => ingestion_collection,
1841 None => {
1843 tracing::error!(
1844 "primary source {ingestion_id} seemingly dropped before subsource {id}"
1845 );
1846 continue;
1847 }
1848 };
1849
1850 match &mut ingestion_state.data_source {
1851 DataSource::Ingestion(ingestion_desc) => {
1852 let removed = ingestion_desc.source_exports.remove(id);
1853 mz_ore::soft_assert_or_log!(
1854 removed.is_some(),
1855 "dropped subsource {id} already removed from source exports"
1856 );
1857 }
1858 _ => unreachable!(
1859 "SourceExport must only refer to primary sources that already exist"
1860 ),
1861 };
1862
1863 ingestions_to_drop.insert(*id);
1867 source_statistics_to_drop.push(*id);
1868 }
1869 DataSource::Progress | DataSource::Table | DataSource::Other => {
1870 collections_to_drop.push(*id);
1871 }
1872 DataSource::Introspection(_) | DataSource::Sink { .. } => {
1873 soft_panic_or_log!(
1876 "drop_sources called on a {:?} (id={id}))",
1877 collection_state.data_source,
1878 );
1879 }
1880 }
1881 }
1882 }
1883
1884 ingestions_to_execute.retain(|id| !ingestions_to_drop.contains(id));
1886 for ingestion_id in ingestions_to_execute {
1887 self.run_ingestion(ingestion_id)?;
1888 }
1889
1890 let ingestion_policies = ingestions_to_drop
1897 .iter()
1898 .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
1899 .collect();
1900
1901 tracing::debug!(
1902 ?ingestion_policies,
1903 "dropping sources by setting read hold policies"
1904 );
1905 self.set_hold_policies(ingestion_policies);
1906
1907 let shards_to_update: BTreeSet<_> = ingestions_to_drop
1909 .iter()
1910 .chain(collections_to_drop.iter())
1911 .cloned()
1912 .collect();
1913 self.append_shard_mappings(shards_to_update.into_iter(), Diff::MINUS_ONE);
1914
1915 let status_now = mz_ore::now::to_datetime((self.now)());
1916 let mut status_updates = vec![];
1917 for id in ingestions_to_drop.iter() {
1918 status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
1919 }
1920
1921 if !self.read_only {
1922 self.append_status_introspection_updates(
1923 IntrospectionType::SourceStatusHistory,
1924 status_updates,
1925 );
1926 }
1927
1928 {
1929 let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1930 for id in source_statistics_to_drop {
1931 source_statistics
1932 .source_statistics
1933 .retain(|(stats_id, _), _| stats_id != &id);
1934 source_statistics
1935 .webhook_statistics
1936 .retain(|stats_id, _| stats_id != &id);
1937 }
1938 }
1939
1940 for id in ingestions_to_drop.iter().chain(collections_to_drop.iter()) {
1942 tracing::info!(%id, "dropping collection state");
1943 let collection = self
1944 .collections
1945 .remove(id)
1946 .expect("list populated after checking that self.collections contains it");
1947
1948 let instance = match &collection.extra_state {
1949 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
1950 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
1951 CollectionStateExtra::None => None,
1952 }
1953 .and_then(|i| self.instances.get(&i));
1954
1955 if let Some(instance) = instance {
1959 let active_replicas = instance.get_active_replicas_for_object(id);
1960 if !active_replicas.is_empty() {
1961 match &collection.data_source {
1968 DataSource::Ingestion(ingestion_desc) => {
1969 if *id != ingestion_desc.remap_collection_id {
1970 self.dropped_objects.insert(
1971 ingestion_desc.remap_collection_id,
1972 active_replicas.clone(),
1973 );
1974 }
1975 }
1976 _ => {}
1977 }
1978
1979 self.dropped_objects.insert(*id, active_replicas);
1980 }
1981 }
1982 }
1983
1984 self.storage_collections
1986 .drop_collections_unvalidated(storage_metadata, ids);
1987
1988 Ok(())
1989 }
1990
1991 fn drop_sinks(
1993 &mut self,
1994 storage_metadata: &StorageMetadata,
1995 identifiers: Vec<GlobalId>,
1996 ) -> Result<(), StorageError<Self::Timestamp>> {
1997 self.validate_export_ids(identifiers.iter().cloned())?;
1998 self.drop_sinks_unvalidated(storage_metadata, identifiers);
1999 Ok(())
2000 }
2001
2002 fn drop_sinks_unvalidated(
2003 &mut self,
2004 storage_metadata: &StorageMetadata,
2005 mut sinks_to_drop: Vec<GlobalId>,
2006 ) {
2007 sinks_to_drop.retain(|id| self.export(*id).is_ok());
2009
2010 let drop_policy = sinks_to_drop
2017 .iter()
2018 .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
2019 .collect();
2020
2021 tracing::debug!(
2022 ?drop_policy,
2023 "dropping sources by setting read hold policies"
2024 );
2025 self.set_hold_policies(drop_policy);
2026
2027 let status_now = mz_ore::now::to_datetime((self.now)());
2034
2035 let mut status_updates = vec![];
2037 {
2038 let mut sink_statistics = self.sink_statistics.lock().expect("poisoned");
2039 for id in sinks_to_drop.iter() {
2040 status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
2041 sink_statistics.retain(|(stats_id, _), _| stats_id != id);
2042 }
2043 }
2044
2045 if !self.read_only {
2046 self.append_status_introspection_updates(
2047 IntrospectionType::SinkStatusHistory,
2048 status_updates,
2049 );
2050 }
2051
2052 for id in sinks_to_drop.iter() {
2054 tracing::info!(%id, "dropping export state");
2055 let collection = self
2056 .collections
2057 .remove(id)
2058 .expect("list populated after checking that self.collections contains it");
2059
2060 let instance = match &collection.extra_state {
2061 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
2062 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
2063 CollectionStateExtra::None => None,
2064 }
2065 .and_then(|i| self.instances.get(&i));
2066
2067 if let Some(instance) = instance {
2071 let active_replicas = instance.get_active_replicas_for_object(id);
2072 if !active_replicas.is_empty() {
2073 self.dropped_objects.insert(*id, active_replicas);
2074 }
2075 }
2076 }
2077
2078 self.storage_collections
2080 .drop_collections_unvalidated(storage_metadata, sinks_to_drop);
2081 }
2082
2083 #[instrument(level = "debug")]
2084 fn append_table(
2085 &mut self,
2086 write_ts: Self::Timestamp,
2087 advance_to: Self::Timestamp,
2088 commands: Vec<(GlobalId, Vec<TableData>)>,
2089 ) -> Result<
2090 tokio::sync::oneshot::Receiver<Result<(), StorageError<Self::Timestamp>>>,
2091 StorageError<Self::Timestamp>,
2092 > {
2093 if self.read_only {
2094 if !commands
2097 .iter()
2098 .all(|(id, _)| id.is_system() && self.migrated_storage_collections.contains(id))
2099 {
2100 return Err(StorageError::ReadOnly);
2101 }
2102 }
2103
2104 for (id, updates) in commands.iter() {
2106 if !updates.is_empty() {
2107 if !write_ts.less_than(&advance_to) {
2108 return Err(StorageError::UpdateBeyondUpper(*id));
2109 }
2110 }
2111 }
2112
2113 Ok(self
2114 .persist_table_worker
2115 .append(write_ts, advance_to, commands))
2116 }
2117
2118 fn monotonic_appender(
2119 &self,
2120 id: GlobalId,
2121 ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>> {
2122 self.collection_manager.monotonic_appender(id)
2123 }
2124
2125 fn webhook_statistics(
2126 &self,
2127 id: GlobalId,
2128 ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>> {
2129 let source_statistics = self.source_statistics.lock().expect("poisoned");
2131 source_statistics
2132 .webhook_statistics
2133 .get(&id)
2134 .cloned()
2135 .ok_or(StorageError::IdentifierMissing(id))
2136 }
2137
2138 async fn ready(&mut self) {
2139 if self.maintenance_scheduled {
2140 return;
2141 }
2142
2143 if !self.pending_table_handle_drops_rx.is_empty() {
2144 return;
2145 }
2146
2147 tokio::select! {
2148 Some(m) = self.instance_response_rx.recv() => {
2149 self.stashed_responses.push(m);
2150 while let Ok(m) = self.instance_response_rx.try_recv() {
2151 self.stashed_responses.push(m);
2152 }
2153 }
2154 _ = self.maintenance_ticker.tick() => {
2155 self.maintenance_scheduled = true;
2156 },
2157 };
2158 }
2159
2160 #[instrument(level = "debug")]
2161 fn process(
2162 &mut self,
2163 storage_metadata: &StorageMetadata,
2164 ) -> Result<Option<Response<T>>, anyhow::Error> {
2165 if self.maintenance_scheduled {
2167 self.maintain();
2168 self.maintenance_scheduled = false;
2169 }
2170
2171 for instance in self.instances.values_mut() {
2172 instance.rehydrate_failed_replicas();
2173 }
2174
2175 let mut status_updates = vec![];
2176 let mut updated_frontiers = BTreeMap::new();
2177
2178 let stashed_responses = std::mem::take(&mut self.stashed_responses);
2180 for resp in stashed_responses {
2181 match resp {
2182 (_replica_id, StorageResponse::FrontierUpper(id, upper)) => {
2183 self.update_write_frontier(id, &upper);
2184 updated_frontiers.insert(id, upper);
2185 }
2186 (replica_id, StorageResponse::DroppedId(id)) => {
2187 let replica_id = replica_id.expect("DroppedId from unknown replica");
2188 if let Some(remaining_replicas) = self.dropped_objects.get_mut(&id) {
2189 remaining_replicas.remove(&replica_id);
2190 if remaining_replicas.is_empty() {
2191 self.dropped_objects.remove(&id);
2192 }
2193 } else {
2194 soft_panic_or_log!("unexpected DroppedId for {id}");
2195 }
2196 }
2197 (replica_id, StorageResponse::StatisticsUpdates(source_stats, sink_stats)) => {
2198 {
2200 let replica_id = if let Some(replica_id) = replica_id {
2207 replica_id
2208 } else {
2209 tracing::error!(
2210 ?source_stats,
2211 "missing replica_id for source statistics update"
2212 );
2213 continue;
2214 };
2215
2216 let mut shared_stats = self.source_statistics.lock().expect("poisoned");
2217
2218 for stat in source_stats {
2219 let collection_id = stat.id.clone();
2220
2221 if self.collection(collection_id).is_err() {
2222 continue;
2225 }
2226
2227 let entry = shared_stats
2228 .source_statistics
2229 .entry((stat.id, Some(replica_id)));
2230
2231 match entry {
2232 btree_map::Entry::Vacant(vacant_entry) => {
2233 let mut stats = ControllerSourceStatistics::new(
2234 collection_id,
2235 Some(replica_id),
2236 );
2237 stats.incorporate(stat);
2238 vacant_entry.insert(stats);
2239 }
2240 btree_map::Entry::Occupied(mut occupied_entry) => {
2241 occupied_entry.get_mut().incorporate(stat);
2242 }
2243 }
2244 }
2245 }
2246
2247 {
2248 let replica_id = if let Some(replica_id) = replica_id {
2259 replica_id
2260 } else {
2261 tracing::error!(
2262 ?sink_stats,
2263 "missing replica_id for sink statistics update"
2264 );
2265 continue;
2266 };
2267
2268 let mut shared_stats = self.sink_statistics.lock().expect("poisoned");
2269
2270 for stat in sink_stats {
2271 let collection_id = stat.id.clone();
2272
2273 if self.collection(collection_id).is_err() {
2274 continue;
2277 }
2278
2279 let entry = shared_stats.entry((stat.id, Some(replica_id)));
2280
2281 match entry {
2282 btree_map::Entry::Vacant(vacant_entry) => {
2283 let mut stats =
2284 ControllerSinkStatistics::new(collection_id, replica_id);
2285 stats.incorporate(stat);
2286 vacant_entry.insert(stats);
2287 }
2288 btree_map::Entry::Occupied(mut occupied_entry) => {
2289 occupied_entry.get_mut().incorporate(stat);
2290 }
2291 }
2292 }
2293 }
2294 }
2295 (replica_id, StorageResponse::StatusUpdate(mut status_update)) => {
2296 match status_update.status {
2312 Status::Running => {
2313 let collection = self.collections.get_mut(&status_update.id);
2314 match collection {
2315 Some(collection) => {
2316 match collection.extra_state {
2317 CollectionStateExtra::Ingestion(
2318 ref mut ingestion_state,
2319 ) => {
2320 if ingestion_state.hydrated_on.is_empty() {
2321 tracing::debug!(ingestion_id = %status_update.id, "ingestion is hydrated");
2322 }
2323 ingestion_state.hydrated_on.insert(replica_id.expect(
2324 "replica id should be present for status running",
2325 ));
2326 }
2327 CollectionStateExtra::Export(_) => {
2328 }
2330 CollectionStateExtra::None => {
2331 }
2333 }
2334 }
2335 None => (), }
2338 }
2339 Status::Paused => {
2340 let collection = self.collections.get_mut(&status_update.id);
2341 match collection {
2342 Some(collection) => {
2343 match collection.extra_state {
2344 CollectionStateExtra::Ingestion(
2345 ref mut ingestion_state,
2346 ) => {
2347 tracing::debug!(ingestion_id = %status_update.id, "ingestion is now paused");
2354 ingestion_state.hydrated_on.clear();
2355 }
2356 CollectionStateExtra::Export(_) => {
2357 }
2359 CollectionStateExtra::None => {
2360 }
2362 }
2363 }
2364 None => (), }
2367 }
2368 _ => (),
2369 }
2370
2371 if let Some(id) = replica_id {
2373 status_update.replica_id = Some(id);
2374 }
2375 status_updates.push(status_update);
2376 }
2377 (_replica_id, StorageResponse::StagedBatches(batches)) => {
2378 for (ingestion_id, batches) in batches {
2379 match self.pending_oneshot_ingestions.remove(&ingestion_id) {
2380 Some(pending) => {
2381 if let Some(instance) = self.instances.get_mut(&pending.cluster_id)
2384 {
2385 instance
2386 .send(StorageCommand::CancelOneshotIngestion(ingestion_id));
2387 }
2388 (pending.result_tx)(batches)
2390 }
2391 None => {
2392 }
2395 }
2396 }
2397 }
2398 }
2399 }
2400
2401 self.record_status_updates(status_updates);
2402
2403 let mut dropped_table_ids = Vec::new();
2405 while let Ok(dropped_id) = self.pending_table_handle_drops_rx.try_recv() {
2406 dropped_table_ids.push(dropped_id);
2407 }
2408 if !dropped_table_ids.is_empty() {
2409 self.drop_sources(storage_metadata, dropped_table_ids)?;
2410 }
2411
2412 if updated_frontiers.is_empty() {
2413 Ok(None)
2414 } else {
2415 Ok(Some(Response::FrontierUpdates(
2416 updated_frontiers.into_iter().collect(),
2417 )))
2418 }
2419 }
2420
2421 async fn inspect_persist_state(
2422 &self,
2423 id: GlobalId,
2424 ) -> Result<serde_json::Value, anyhow::Error> {
2425 let collection = &self.storage_collections.collection_metadata(id)?;
2426 let client = self
2427 .persist
2428 .open(collection.persist_location.clone())
2429 .await?;
2430 let shard_state = client
2431 .inspect_shard::<Self::Timestamp>(&collection.data_shard)
2432 .await?;
2433 let json_state = serde_json::to_value(shard_state)?;
2434 Ok(json_state)
2435 }
2436
2437 fn append_introspection_updates(
2438 &mut self,
2439 type_: IntrospectionType,
2440 updates: Vec<(Row, Diff)>,
2441 ) {
2442 let id = self.introspection_ids[&type_];
2443 let updates = updates.into_iter().map(|update| update.into()).collect();
2444 self.collection_manager.blind_write(id, updates);
2445 }
2446
2447 fn append_status_introspection_updates(
2448 &mut self,
2449 type_: IntrospectionType,
2450 updates: Vec<StatusUpdate>,
2451 ) {
2452 let id = self.introspection_ids[&type_];
2453 let updates: Vec<_> = updates.into_iter().map(|update| update.into()).collect();
2454 if !updates.is_empty() {
2455 self.collection_manager.blind_write(id, updates);
2456 }
2457 }
2458
2459 fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp) {
2460 let id = self.introspection_ids[&type_];
2461 self.collection_manager.differential_write(id, op);
2462 }
2463
2464 fn append_only_introspection_tx(
2465 &self,
2466 type_: IntrospectionType,
2467 ) -> mpsc::UnboundedSender<(
2468 Vec<AppendOnlyUpdate>,
2469 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2470 )> {
2471 let id = self.introspection_ids[&type_];
2472 self.collection_manager.append_only_write_sender(id)
2473 }
2474
2475 fn differential_introspection_tx(
2476 &self,
2477 type_: IntrospectionType,
2478 ) -> mpsc::UnboundedSender<(
2479 StorageWriteOp,
2480 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2481 )> {
2482 let id = self.introspection_ids[&type_];
2483 self.collection_manager.differential_write_sender(id)
2484 }
2485
2486 async fn real_time_recent_timestamp(
2487 &self,
2488 timestamp_objects: BTreeSet<GlobalId>,
2489 timeout: Duration,
2490 ) -> Result<
2491 BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>,
2492 StorageError<Self::Timestamp>,
2493 > {
2494 use mz_storage_types::sources::GenericSourceConnection;
2495
2496 let mut rtr_futures = BTreeMap::new();
2497
2498 for id in timestamp_objects.into_iter().filter(GlobalId::is_user) {
2500 let collection = match self.collection(id) {
2501 Ok(c) => c,
2502 Err(_) => continue,
2504 };
2505
2506 let (source_conn, remap_id) = match &collection.data_source {
2507 DataSource::Ingestion(IngestionDescription {
2508 desc: SourceDesc { connection, .. },
2509 remap_collection_id,
2510 ..
2511 }) => match connection {
2512 GenericSourceConnection::Kafka(_)
2513 | GenericSourceConnection::Postgres(_)
2514 | GenericSourceConnection::MySql(_)
2515 | GenericSourceConnection::SqlServer(_) => {
2516 (connection.clone(), *remap_collection_id)
2517 }
2518
2519 GenericSourceConnection::LoadGenerator(_) => continue,
2524 },
2525 _ => {
2527 continue;
2528 }
2529 };
2530
2531 let config = self.config().clone();
2533
2534 let read_handle = self.read_handle_for_snapshot(remap_id).await?;
2542
2543 let remap_read_hold = self
2546 .storage_collections
2547 .acquire_read_holds(vec![remap_id])
2548 .map_err(|_e| StorageError::ReadBeforeSince(remap_id))?
2549 .expect_element(|| "known to be exactly one");
2550
2551 let remap_as_of = remap_read_hold
2552 .since()
2553 .to_owned()
2554 .into_option()
2555 .ok_or(StorageError::ReadBeforeSince(remap_id))?;
2556
2557 rtr_futures.insert(
2558 id,
2559 tokio::time::timeout(timeout, async move {
2560 use mz_storage_types::sources::SourceConnection as _;
2561
2562 let as_of = Antichain::from_elem(remap_as_of);
2565 let remap_subscribe = read_handle
2566 .subscribe(as_of.clone())
2567 .await
2568 .map_err(|_| StorageError::ReadBeforeSince(remap_id))?;
2569
2570 tracing::debug!(?id, type_ = source_conn.name(), upstream = ?source_conn.external_reference(), "fetching real time recency");
2571
2572 let result = rtr::real_time_recency_ts(source_conn, id, config, as_of, remap_subscribe)
2573 .await.map_err(|e| {
2574 tracing::debug!(?id, "real time recency error: {:?}", e);
2575 e
2576 });
2577
2578 drop(remap_read_hold);
2580
2581 result
2582 }),
2583 );
2584 }
2585
2586 Ok(Box::pin(async move {
2587 let (ids, futs): (Vec<_>, Vec<_>) = rtr_futures.into_iter().unzip();
2588 ids.into_iter()
2589 .zip_eq(futures::future::join_all(futs).await)
2590 .try_fold(T::minimum(), |curr, (id, per_source_res)| {
2591 let new =
2592 per_source_res.map_err(|_e: Elapsed| StorageError::RtrTimeout(id))??;
2593 Ok::<_, StorageError<Self::Timestamp>>(std::cmp::max(curr, new))
2594 })
2595 }))
2596 }
2597}
2598
2599pub fn prepare_initialization<T>(txn: &mut dyn StorageTxn<T>) -> Result<(), StorageError<T>> {
2606 if txn.get_txn_wal_shard().is_none() {
2607 let txns_id = ShardId::new();
2608 txn.write_txn_wal_shard(txns_id)?;
2609 }
2610
2611 Ok(())
2612}
2613
2614impl<T> Controller<T>
2615where
2616 T: Timestamp
2617 + Lattice
2618 + TotalOrder
2619 + Codec64
2620 + From<EpochMillis>
2621 + TimestampManipulation
2622 + Into<Datum<'static>>,
2623 Self: StorageController<Timestamp = T>,
2624{
2625 pub async fn new(
2633 build_info: &'static BuildInfo,
2634 persist_location: PersistLocation,
2635 persist_clients: Arc<PersistClientCache>,
2636 now: NowFn,
2637 wallclock_lag: WallclockLagFn<T>,
2638 txns_metrics: Arc<TxnMetrics>,
2639 read_only: bool,
2640 metrics_registry: &MetricsRegistry,
2641 controller_metrics: ControllerMetrics,
2642 connection_context: ConnectionContext,
2643 txn: &dyn StorageTxn<T>,
2644 storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
2645 ) -> Self {
2646 let txns_client = persist_clients
2647 .open(persist_location.clone())
2648 .await
2649 .expect("location should be valid");
2650
2651 let persist_warm_task = warm_persist_state_in_background(
2652 txns_client.clone(),
2653 txn.get_collection_metadata().into_values(),
2654 );
2655 let persist_warm_task = Some(persist_warm_task.abort_on_drop());
2656
2657 let txns_id = txn
2661 .get_txn_wal_shard()
2662 .expect("must call prepare initialization before creating storage controller");
2663
2664 let persist_table_worker = if read_only {
2665 let txns_write = txns_client
2666 .open_writer(
2667 txns_id,
2668 Arc::new(TxnsCodecRow::desc()),
2669 Arc::new(UnitSchema),
2670 Diagnostics {
2671 shard_name: "txns".to_owned(),
2672 handle_purpose: "follow txns upper".to_owned(),
2673 },
2674 )
2675 .await
2676 .expect("txns schema shouldn't change");
2677 persist_handles::PersistTableWriteWorker::new_read_only_mode(txns_write)
2678 } else {
2679 let mut txns = TxnsHandle::open(
2680 T::minimum(),
2681 txns_client.clone(),
2682 txns_client.dyncfgs().clone(),
2683 Arc::clone(&txns_metrics),
2684 txns_id,
2685 )
2686 .await;
2687 txns.upgrade_version().await;
2688 persist_handles::PersistTableWriteWorker::new_txns(txns)
2689 };
2690 let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
2691
2692 let collection_manager = collection_mgmt::CollectionManager::new(read_only, now.clone());
2693
2694 let introspection_ids = BTreeMap::new();
2695 let introspection_tokens = Arc::new(Mutex::new(BTreeMap::new()));
2696
2697 let (statistics_interval_sender, _) =
2698 channel(mz_storage_types::parameters::STATISTICS_INTERVAL_DEFAULT);
2699
2700 let (pending_table_handle_drops_tx, pending_table_handle_drops_rx) =
2701 tokio::sync::mpsc::unbounded_channel();
2702
2703 let mut maintenance_ticker = tokio::time::interval(Duration::from_secs(1));
2704 maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2705
2706 let (instance_response_tx, instance_response_rx) = mpsc::unbounded_channel();
2707
2708 let metrics = StorageControllerMetrics::new(metrics_registry, controller_metrics);
2709
2710 let now_dt = mz_ore::now::to_datetime(now());
2711
2712 Self {
2713 build_info,
2714 collections: BTreeMap::default(),
2715 dropped_objects: Default::default(),
2716 persist_table_worker,
2717 txns_read,
2718 txns_metrics,
2719 stashed_responses: vec![],
2720 pending_table_handle_drops_tx,
2721 pending_table_handle_drops_rx,
2722 pending_oneshot_ingestions: BTreeMap::default(),
2723 collection_manager,
2724 introspection_ids,
2725 introspection_tokens,
2726 now,
2727 read_only,
2728 source_statistics: Arc::new(Mutex::new(statistics::SourceStatistics {
2729 source_statistics: BTreeMap::new(),
2730 webhook_statistics: BTreeMap::new(),
2731 })),
2732 sink_statistics: Arc::new(Mutex::new(BTreeMap::new())),
2733 statistics_interval_sender,
2734 instances: BTreeMap::new(),
2735 initialized: false,
2736 config: StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs()),
2737 persist_location,
2738 persist: persist_clients,
2739 metrics,
2740 recorded_frontiers: BTreeMap::new(),
2741 recorded_replica_frontiers: BTreeMap::new(),
2742 wallclock_lag,
2743 wallclock_lag_last_recorded: now_dt,
2744 storage_collections,
2745 migrated_storage_collections: BTreeSet::new(),
2746 maintenance_ticker,
2747 maintenance_scheduled: false,
2748 instance_response_rx,
2749 instance_response_tx,
2750 persist_warm_task,
2751 }
2752 }
2753
2754 #[instrument(level = "debug")]
2762 fn set_hold_policies(&mut self, policies: Vec<(GlobalId, ReadPolicy<T>)>) {
2763 let mut read_capability_changes = BTreeMap::default();
2764
2765 for (id, policy) in policies.into_iter() {
2766 if let Some(collection) = self.collections.get_mut(&id) {
2767 let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state
2768 {
2769 CollectionStateExtra::Ingestion(ingestion) => (
2770 ingestion.write_frontier.borrow(),
2771 &mut ingestion.derived_since,
2772 &mut ingestion.hold_policy,
2773 ),
2774 CollectionStateExtra::None => {
2775 unreachable!("set_hold_policies is only called for ingestions");
2776 }
2777 CollectionStateExtra::Export(export) => (
2778 export.write_frontier.borrow(),
2779 &mut export.derived_since,
2780 &mut export.read_policy,
2781 ),
2782 };
2783
2784 let new_derived_since = policy.frontier(write_frontier);
2785 let mut update = swap_updates(derived_since, new_derived_since);
2786 if !update.is_empty() {
2787 read_capability_changes.insert(id, update);
2788 }
2789
2790 *hold_policy = policy;
2791 }
2792 }
2793
2794 if !read_capability_changes.is_empty() {
2795 self.update_hold_capabilities(&mut read_capability_changes);
2796 }
2797 }
2798
2799 #[instrument(level = "debug", fields(updates))]
2800 fn update_write_frontier(&mut self, id: GlobalId, new_upper: &Antichain<T>) {
2801 let mut read_capability_changes = BTreeMap::default();
2802
2803 if let Some(collection) = self.collections.get_mut(&id) {
2804 let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state {
2805 CollectionStateExtra::Ingestion(ingestion) => (
2806 &mut ingestion.write_frontier,
2807 &mut ingestion.derived_since,
2808 &ingestion.hold_policy,
2809 ),
2810 CollectionStateExtra::None => {
2811 if matches!(collection.data_source, DataSource::Progress) {
2812 } else {
2814 tracing::error!(
2815 ?collection,
2816 ?new_upper,
2817 "updated write frontier for collection which is not an ingestion"
2818 );
2819 }
2820 return;
2821 }
2822 CollectionStateExtra::Export(export) => (
2823 &mut export.write_frontier,
2824 &mut export.derived_since,
2825 &export.read_policy,
2826 ),
2827 };
2828
2829 if PartialOrder::less_than(write_frontier, new_upper) {
2830 write_frontier.clone_from(new_upper);
2831 }
2832
2833 let new_derived_since = hold_policy.frontier(write_frontier.borrow());
2834 let mut update = swap_updates(derived_since, new_derived_since);
2835 if !update.is_empty() {
2836 read_capability_changes.insert(id, update);
2837 }
2838 } else if self.dropped_objects.contains_key(&id) {
2839 } else {
2842 soft_panic_or_log!("spurious upper update for {id}: {new_upper:?}");
2843 }
2844
2845 if !read_capability_changes.is_empty() {
2846 self.update_hold_capabilities(&mut read_capability_changes);
2847 }
2848 }
2849
2850 #[instrument(level = "debug", fields(updates))]
2854 fn update_hold_capabilities(&mut self, updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>) {
2855 let mut collections_net = BTreeMap::new();
2857
2858 while let Some(key) = updates.keys().rev().next().cloned() {
2863 let mut update = updates.remove(&key).unwrap();
2864
2865 if key.is_user() {
2866 debug!(id = %key, ?update, "update_hold_capability");
2867 }
2868
2869 if let Some(collection) = self.collections.get_mut(&key) {
2870 match &mut collection.extra_state {
2871 CollectionStateExtra::Ingestion(ingestion) => {
2872 let changes = ingestion.read_capabilities.update_iter(update.drain());
2873 update.extend(changes);
2874
2875 let (changes, frontier, _cluster_id) =
2876 collections_net.entry(key).or_insert_with(|| {
2877 (
2878 <ChangeBatch<_>>::new(),
2879 Antichain::new(),
2880 ingestion.instance_id,
2881 )
2882 });
2883
2884 changes.extend(update.drain());
2885 *frontier = ingestion.read_capabilities.frontier().to_owned();
2886 }
2887 CollectionStateExtra::None => {
2888 soft_panic_or_log!(
2890 "trying to update holds for collection {collection:?} which is not \
2891 an ingestion: {update:?}"
2892 );
2893 continue;
2894 }
2895 CollectionStateExtra::Export(export) => {
2896 let changes = export.read_capabilities.update_iter(update.drain());
2897 update.extend(changes);
2898
2899 let (changes, frontier, _cluster_id) =
2900 collections_net.entry(key).or_insert_with(|| {
2901 (<ChangeBatch<_>>::new(), Antichain::new(), export.cluster_id)
2902 });
2903
2904 changes.extend(update.drain());
2905 *frontier = export.read_capabilities.frontier().to_owned();
2906 }
2907 }
2908 } else {
2909 tracing::warn!(id = ?key, ?update, "update_hold_capabilities for unknown object");
2911 }
2912 }
2913
2914 for (key, (mut changes, frontier, cluster_id)) in collections_net {
2917 if !changes.is_empty() {
2918 if key.is_user() {
2919 debug!(id = %key, ?frontier, "downgrading ingestion read holds!");
2920 }
2921
2922 let collection = self
2923 .collections
2924 .get_mut(&key)
2925 .expect("missing collection state");
2926
2927 let read_holds = match &mut collection.extra_state {
2928 CollectionStateExtra::Ingestion(ingestion) => {
2929 ingestion.dependency_read_holds.as_mut_slice()
2930 }
2931 CollectionStateExtra::Export(export) => export.read_holds.as_mut_slice(),
2932 CollectionStateExtra::None => {
2933 soft_panic_or_log!(
2934 "trying to downgrade read holds for collection which is not an \
2935 ingestion: {collection:?}"
2936 );
2937 continue;
2938 }
2939 };
2940
2941 for read_hold in read_holds.iter_mut() {
2942 read_hold
2943 .try_downgrade(frontier.clone())
2944 .expect("we only advance the frontier");
2945 }
2946
2947 if let Some(instance) = self.instances.get_mut(&cluster_id) {
2949 instance.send(StorageCommand::AllowCompaction(key, frontier.clone()));
2950 } else {
2951 soft_panic_or_log!(
2952 "missing instance client for cluster {cluster_id} while we still have outstanding AllowCompaction command {frontier:?} for {key}"
2953 );
2954 }
2955 }
2956 }
2957 }
2958
2959 fn validate_collection_ids(
2961 &self,
2962 ids: impl Iterator<Item = GlobalId>,
2963 ) -> Result<(), StorageError<T>> {
2964 for id in ids {
2965 self.storage_collections.check_exists(id)?;
2966 }
2967 Ok(())
2968 }
2969
2970 fn validate_export_ids(
2972 &self,
2973 ids: impl Iterator<Item = GlobalId>,
2974 ) -> Result<(), StorageError<T>> {
2975 for id in ids {
2976 self.export(id)?;
2977 }
2978 Ok(())
2979 }
2980
2981 async fn open_data_handles(
2989 &self,
2990 id: &GlobalId,
2991 shard: ShardId,
2992 relation_desc: RelationDesc,
2993 persist_client: &PersistClient,
2994 ) -> WriteHandle<SourceData, (), T, StorageDiff> {
2995 let diagnostics = Diagnostics {
2996 shard_name: id.to_string(),
2997 handle_purpose: format!("controller data for {}", id),
2998 };
2999
3000 let mut write = persist_client
3001 .open_writer(
3002 shard,
3003 Arc::new(relation_desc),
3004 Arc::new(UnitSchema),
3005 diagnostics.clone(),
3006 )
3007 .await
3008 .expect("invalid persist usage");
3009
3010 write.fetch_recent_upper().await;
3019
3020 write
3021 }
3022
3023 fn register_introspection_collection(
3028 &mut self,
3029 id: GlobalId,
3030 introspection_type: IntrospectionType,
3031 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
3032 persist_client: PersistClient,
3033 ) -> Result<(), StorageError<T>> {
3034 tracing::info!(%id, ?introspection_type, "registering introspection collection");
3035
3036 let force_writable = self.read_only && self.migrated_storage_collections.contains(&id);
3040 if force_writable {
3041 assert!(id.is_system(), "unexpected non-system global id: {id:?}");
3042 info!("writing to migrated storage collection {id} in read-only mode");
3043 }
3044
3045 let prev = self.introspection_ids.insert(introspection_type, id);
3046 assert!(
3047 prev.is_none(),
3048 "cannot have multiple IDs for introspection type"
3049 );
3050
3051 let metadata = self.storage_collections.collection_metadata(id)?.clone();
3052
3053 let read_handle_fn = move || {
3054 let persist_client = persist_client.clone();
3055 let metadata = metadata.clone();
3056
3057 let fut = async move {
3058 let read_handle = persist_client
3059 .open_leased_reader::<SourceData, (), T, StorageDiff>(
3060 metadata.data_shard,
3061 Arc::new(metadata.relation_desc.clone()),
3062 Arc::new(UnitSchema),
3063 Diagnostics {
3064 shard_name: id.to_string(),
3065 handle_purpose: format!("snapshot {}", id),
3066 },
3067 USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3068 )
3069 .await
3070 .expect("invalid persist usage");
3071 read_handle
3072 };
3073
3074 fut.boxed()
3075 };
3076
3077 let recent_upper = write_handle.shared_upper();
3078
3079 match CollectionManagerKind::from(&introspection_type) {
3080 CollectionManagerKind::Differential => {
3085 let statistics_retention_duration =
3086 dyncfgs::STATISTICS_RETENTION_DURATION.get(self.config().config_set());
3087
3088 let introspection_config = DifferentialIntrospectionConfig {
3090 recent_upper,
3091 introspection_type,
3092 storage_collections: Arc::clone(&self.storage_collections),
3093 collection_manager: self.collection_manager.clone(),
3094 source_statistics: Arc::clone(&self.source_statistics),
3095 sink_statistics: Arc::clone(&self.sink_statistics),
3096 statistics_interval: self.config.parameters.statistics_interval.clone(),
3097 statistics_interval_receiver: self.statistics_interval_sender.subscribe(),
3098 statistics_retention_duration,
3099 metrics: self.metrics.clone(),
3100 introspection_tokens: Arc::clone(&self.introspection_tokens),
3101 };
3102 self.collection_manager.register_differential_collection(
3103 id,
3104 write_handle,
3105 read_handle_fn,
3106 force_writable,
3107 introspection_config,
3108 );
3109 }
3110 CollectionManagerKind::AppendOnly => {
3118 let introspection_config = AppendOnlyIntrospectionConfig {
3119 introspection_type,
3120 config_set: Arc::clone(self.config.config_set()),
3121 parameters: self.config.parameters.clone(),
3122 storage_collections: Arc::clone(&self.storage_collections),
3123 };
3124 self.collection_manager.register_append_only_collection(
3125 id,
3126 write_handle,
3127 force_writable,
3128 Some(introspection_config),
3129 );
3130 }
3131 }
3132
3133 Ok(())
3134 }
3135
3136 fn reconcile_dangling_statistics(&self) {
3139 self.source_statistics
3140 .lock()
3141 .expect("poisoned")
3142 .source_statistics
3143 .retain(|(k, _replica_id), _| self.storage_collections.check_exists(*k).is_ok());
3145 self.sink_statistics
3146 .lock()
3147 .expect("poisoned")
3148 .retain(|(k, _replica_id), _| self.export(*k).is_ok());
3149 }
3150
3151 #[instrument(level = "debug")]
3161 fn append_shard_mappings<I>(&self, global_ids: I, diff: Diff)
3162 where
3163 I: Iterator<Item = GlobalId>,
3164 {
3165 mz_ore::soft_assert_or_log!(
3166 diff == Diff::MINUS_ONE || diff == Diff::ONE,
3167 "use 1 for insert or -1 for delete"
3168 );
3169
3170 let id = *self
3171 .introspection_ids
3172 .get(&IntrospectionType::ShardMapping)
3173 .expect("should be registered before this call");
3174
3175 let mut updates = vec![];
3176 let mut row_buf = Row::default();
3178
3179 for global_id in global_ids {
3180 let shard_id = if let Some(collection) = self.collections.get(&global_id) {
3181 collection.collection_metadata.data_shard.clone()
3182 } else {
3183 panic!("unknown global id: {}", global_id);
3184 };
3185
3186 let mut packer = row_buf.packer();
3187 packer.push(Datum::from(global_id.to_string().as_str()));
3188 packer.push(Datum::from(shard_id.to_string().as_str()));
3189 updates.push((row_buf.clone(), diff));
3190 }
3191
3192 self.collection_manager.differential_append(id, updates);
3193 }
3194
3195 fn determine_collection_dependencies(
3197 &self,
3198 self_id: GlobalId,
3199 collection_desc: &CollectionDescription<T>,
3200 ) -> Result<Vec<GlobalId>, StorageError<T>> {
3201 let mut dependencies = Vec::new();
3202
3203 if let Some(id) = collection_desc.primary {
3204 dependencies.push(id);
3205 }
3206
3207 match &collection_desc.data_source {
3208 DataSource::Introspection(_)
3209 | DataSource::Webhook
3210 | DataSource::Table
3211 | DataSource::Progress
3212 | DataSource::Other => (),
3213 DataSource::IngestionExport { ingestion_id, .. } => {
3214 let source_collection = self.collection(*ingestion_id)?;
3217 let ingestion_remap_collection_id = match &source_collection.data_source {
3218 DataSource::Ingestion(ingestion) => ingestion.remap_collection_id,
3219 _ => unreachable!(
3220 "SourceExport must only refer to primary sources that already exist"
3221 ),
3222 };
3223
3224 dependencies.extend([self_id, ingestion_remap_collection_id]);
3230 }
3231 DataSource::Ingestion(ingestion) => {
3233 dependencies.push(self_id);
3238 if self_id != ingestion.remap_collection_id {
3239 dependencies.push(ingestion.remap_collection_id);
3240 }
3241 }
3242 DataSource::Sink { desc } => {
3243 dependencies.extend([self_id, desc.sink.from]);
3245 }
3246 };
3247
3248 Ok(dependencies)
3249 }
3250
3251 async fn read_handle_for_snapshot(
3252 &self,
3253 id: GlobalId,
3254 ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
3255 let metadata = self.storage_collections.collection_metadata(id)?;
3256 read_handle_for_snapshot(&self.persist, id, &metadata).await
3257 }
3258
3259 fn record_status_updates(&mut self, updates: Vec<StatusUpdate>) {
3262 if self.read_only {
3263 return;
3264 }
3265
3266 let mut sink_status_updates = vec![];
3267 let mut source_status_updates = vec![];
3268
3269 for update in updates {
3270 let id = update.id;
3271 if self.export(id).is_ok() {
3272 sink_status_updates.push(update);
3273 } else if self.storage_collections.check_exists(id).is_ok() {
3274 source_status_updates.push(update);
3275 }
3276 }
3277
3278 self.append_status_introspection_updates(
3279 IntrospectionType::SourceStatusHistory,
3280 source_status_updates,
3281 );
3282 self.append_status_introspection_updates(
3283 IntrospectionType::SinkStatusHistory,
3284 sink_status_updates,
3285 );
3286 }
3287
3288 fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, StorageError<T>> {
3289 self.collections
3290 .get(&id)
3291 .ok_or(StorageError::IdentifierMissing(id))
3292 }
3293
3294 fn run_ingestion(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3297 tracing::info!(%id, "starting ingestion");
3298
3299 let collection = self.collection(id)?;
3300 let ingestion_description = match &collection.data_source {
3301 DataSource::Ingestion(i) => i.clone(),
3302 _ => {
3303 tracing::warn!("run_ingestion called on non-ingestion ID {}", id);
3304 Err(StorageError::IdentifierInvalid(id))?
3305 }
3306 };
3307
3308 let mut source_exports = BTreeMap::new();
3310 for (export_id, export) in ingestion_description.source_exports.clone() {
3311 let export_storage_metadata = self.collection(export_id)?.collection_metadata.clone();
3312 source_exports.insert(
3313 export_id,
3314 SourceExport {
3315 storage_metadata: export_storage_metadata,
3316 details: export.details,
3317 data_config: export.data_config,
3318 },
3319 );
3320 }
3321
3322 let remap_collection = self.collection(ingestion_description.remap_collection_id)?;
3323
3324 let description = IngestionDescription::<CollectionMetadata> {
3325 source_exports,
3326 remap_metadata: remap_collection.collection_metadata.clone(),
3327 desc: ingestion_description.desc.clone(),
3329 instance_id: ingestion_description.instance_id,
3330 remap_collection_id: ingestion_description.remap_collection_id,
3331 };
3332
3333 let storage_instance_id = description.instance_id;
3334 let instance = self
3336 .instances
3337 .get_mut(&storage_instance_id)
3338 .ok_or_else(|| StorageError::IngestionInstanceMissing {
3339 storage_instance_id,
3340 ingestion_id: id,
3341 })?;
3342
3343 let augmented_ingestion = Box::new(RunIngestionCommand { id, description });
3344 instance.send(StorageCommand::RunIngestion(augmented_ingestion));
3345
3346 Ok(())
3347 }
3348
3349 fn run_export(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3352 let DataSource::Sink { desc: description } = &self.collections[&id].data_source else {
3353 return Err(StorageError::IdentifierMissing(id));
3354 };
3355
3356 let from_storage_metadata = self
3357 .storage_collections
3358 .collection_metadata(description.sink.from)?;
3359 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
3360
3361 let export_state = self.storage_collections.collection_frontiers(id)?;
3365 let mut as_of = description.sink.as_of.clone();
3366 as_of.join_assign(&export_state.implied_capability);
3367 let with_snapshot = description.sink.with_snapshot
3368 && !PartialOrder::less_than(&as_of, &export_state.write_frontier);
3369
3370 info!(
3371 sink_id = %id,
3372 from_id = %description.sink.from,
3373 write_frontier = ?export_state.write_frontier,
3374 ?as_of,
3375 ?with_snapshot,
3376 "run_export"
3377 );
3378
3379 let cmd = RunSinkCommand {
3380 id,
3381 description: StorageSinkDesc {
3382 from: description.sink.from,
3383 from_desc: description.sink.from_desc.clone(),
3384 connection: description.sink.connection.clone(),
3385 envelope: description.sink.envelope,
3386 as_of,
3387 version: description.sink.version,
3388 from_storage_metadata,
3389 with_snapshot,
3390 to_storage_metadata,
3391 },
3392 };
3393
3394 let storage_instance_id = description.instance_id.clone();
3395
3396 let instance = self
3397 .instances
3398 .get_mut(&storage_instance_id)
3399 .ok_or_else(|| StorageError::ExportInstanceMissing {
3400 storage_instance_id,
3401 export_id: id,
3402 })?;
3403
3404 instance.send(StorageCommand::RunSink(Box::new(cmd)));
3405
3406 Ok(())
3407 }
3408
3409 fn update_frontier_introspection(&mut self) {
3414 let mut global_frontiers = BTreeMap::new();
3415 let mut replica_frontiers = BTreeMap::new();
3416
3417 for collection_frontiers in self.storage_collections.active_collection_frontiers() {
3418 let id = collection_frontiers.id;
3419 let since = collection_frontiers.read_capabilities;
3420 let upper = collection_frontiers.write_frontier;
3421
3422 let instance = self
3423 .collections
3424 .get(&id)
3425 .and_then(|collection_state| match &collection_state.extra_state {
3426 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
3427 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
3428 CollectionStateExtra::None => None,
3429 })
3430 .and_then(|i| self.instances.get(&i));
3431
3432 if let Some(instance) = instance {
3433 for replica_id in instance.replica_ids() {
3434 replica_frontiers.insert((id, replica_id), upper.clone());
3435 }
3436 }
3437
3438 global_frontiers.insert(id, (since, upper));
3439 }
3440
3441 let mut global_updates = Vec::new();
3442 let mut replica_updates = Vec::new();
3443
3444 let mut push_global_update =
3445 |id: GlobalId, (since, upper): (Antichain<T>, Antichain<T>), diff: Diff| {
3446 let read_frontier = since.into_option().map_or(Datum::Null, |t| t.into());
3447 let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3448 let row = Row::pack_slice(&[
3449 Datum::String(&id.to_string()),
3450 read_frontier,
3451 write_frontier,
3452 ]);
3453 global_updates.push((row, diff));
3454 };
3455
3456 let mut push_replica_update =
3457 |(id, replica_id): (GlobalId, ReplicaId), upper: Antichain<T>, diff: Diff| {
3458 let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3459 let row = Row::pack_slice(&[
3460 Datum::String(&id.to_string()),
3461 Datum::String(&replica_id.to_string()),
3462 write_frontier,
3463 ]);
3464 replica_updates.push((row, diff));
3465 };
3466
3467 let mut old_global_frontiers =
3468 std::mem::replace(&mut self.recorded_frontiers, global_frontiers);
3469 for (&id, new) in &self.recorded_frontiers {
3470 match old_global_frontiers.remove(&id) {
3471 Some(old) if &old != new => {
3472 push_global_update(id, new.clone(), Diff::ONE);
3473 push_global_update(id, old, Diff::MINUS_ONE);
3474 }
3475 Some(_) => (),
3476 None => push_global_update(id, new.clone(), Diff::ONE),
3477 }
3478 }
3479 for (id, old) in old_global_frontiers {
3480 push_global_update(id, old, Diff::MINUS_ONE);
3481 }
3482
3483 let mut old_replica_frontiers =
3484 std::mem::replace(&mut self.recorded_replica_frontiers, replica_frontiers);
3485 for (&key, new) in &self.recorded_replica_frontiers {
3486 match old_replica_frontiers.remove(&key) {
3487 Some(old) if &old != new => {
3488 push_replica_update(key, new.clone(), Diff::ONE);
3489 push_replica_update(key, old, Diff::MINUS_ONE);
3490 }
3491 Some(_) => (),
3492 None => push_replica_update(key, new.clone(), Diff::ONE),
3493 }
3494 }
3495 for (key, old) in old_replica_frontiers {
3496 push_replica_update(key, old, Diff::MINUS_ONE);
3497 }
3498
3499 let id = self.introspection_ids[&IntrospectionType::Frontiers];
3500 self.collection_manager
3501 .differential_append(id, global_updates);
3502
3503 let id = self.introspection_ids[&IntrospectionType::ReplicaFrontiers];
3504 self.collection_manager
3505 .differential_append(id, replica_updates);
3506 }
3507
3508 fn refresh_wallclock_lag(&mut self) {
3527 let now_ms = (self.now)();
3528 let histogram_period =
3529 WallclockLagHistogramPeriod::from_epoch_millis(now_ms, self.config.config_set());
3530
3531 let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
3532 Some(ts) => (self.wallclock_lag)(ts.clone()),
3533 None => Duration::ZERO,
3534 };
3535
3536 for frontiers in self.storage_collections.active_collection_frontiers() {
3537 let id = frontiers.id;
3538 let Some(collection) = self.collections.get_mut(&id) else {
3539 continue;
3540 };
3541
3542 let collection_unreadable =
3543 PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
3544 let lag = if collection_unreadable {
3545 WallclockLag::Undefined
3546 } else {
3547 let lag = frontier_lag(&frontiers.write_frontier);
3548 WallclockLag::Seconds(lag.as_secs())
3549 };
3550
3551 collection.wallclock_lag_max = collection.wallclock_lag_max.max(lag);
3552
3553 let secs = lag.unwrap_seconds_or(u64::MAX);
3556 collection.wallclock_lag_metrics.observe(secs);
3557
3558 if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
3559 let bucket = lag.map_seconds(|secs| secs.next_power_of_two());
3560
3561 let instance_id = match &collection.extra_state {
3562 CollectionStateExtra::Ingestion(i) => Some(i.instance_id),
3563 CollectionStateExtra::Export(e) => Some(e.cluster_id()),
3564 CollectionStateExtra::None => None,
3565 };
3566 let workload_class = instance_id
3567 .and_then(|id| self.instances.get(&id))
3568 .and_then(|i| i.workload_class.clone());
3569 let labels = match workload_class {
3570 Some(wc) => [("workload_class", wc.clone())].into(),
3571 None => BTreeMap::new(),
3572 };
3573
3574 let key = (histogram_period, bucket, labels);
3575 *stash.entry(key).or_default() += Diff::ONE;
3576 }
3577 }
3578
3579 self.maybe_record_wallclock_lag();
3581 }
3582
3583 fn maybe_record_wallclock_lag(&mut self) {
3591 if self.read_only {
3592 return;
3593 }
3594
3595 let duration_trunc = |datetime: DateTime<_>, interval| {
3596 let td = TimeDelta::from_std(interval).ok()?;
3597 datetime.duration_trunc(td).ok()
3598 };
3599
3600 let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(self.config.config_set());
3601 let now_dt = mz_ore::now::to_datetime((self.now)());
3602 let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
3603 soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
3604 let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
3605 duration_trunc(now_dt, *default).unwrap()
3606 });
3607 if now_trunc <= self.wallclock_lag_last_recorded {
3608 return;
3609 }
3610
3611 let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
3612
3613 let mut history_updates = Vec::new();
3614 let mut histogram_updates = Vec::new();
3615 let mut row_buf = Row::default();
3616 for frontiers in self.storage_collections.active_collection_frontiers() {
3617 let id = frontiers.id;
3618 let Some(collection) = self.collections.get_mut(&id) else {
3619 continue;
3620 };
3621
3622 let max_lag = std::mem::replace(&mut collection.wallclock_lag_max, WallclockLag::MIN);
3623 let row = Row::pack_slice(&[
3624 Datum::String(&id.to_string()),
3625 Datum::Null,
3626 max_lag.into_interval_datum(),
3627 Datum::TimestampTz(now_ts),
3628 ]);
3629 history_updates.push((row, Diff::ONE));
3630
3631 let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
3632 continue;
3633 };
3634
3635 for ((period, lag, labels), count) in std::mem::take(stash) {
3636 let mut packer = row_buf.packer();
3637 packer.extend([
3638 Datum::TimestampTz(period.start),
3639 Datum::TimestampTz(period.end),
3640 Datum::String(&id.to_string()),
3641 lag.into_uint64_datum(),
3642 ]);
3643 let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
3644 packer.push_dict(labels);
3645
3646 histogram_updates.push((row_buf.clone(), count));
3647 }
3648 }
3649
3650 if !history_updates.is_empty() {
3651 self.append_introspection_updates(
3652 IntrospectionType::WallclockLagHistory,
3653 history_updates,
3654 );
3655 }
3656 if !histogram_updates.is_empty() {
3657 self.append_introspection_updates(
3658 IntrospectionType::WallclockLagHistogram,
3659 histogram_updates,
3660 );
3661 }
3662
3663 self.wallclock_lag_last_recorded = now_trunc;
3664 }
3665
3666 fn maintain(&mut self) {
3671 self.update_frontier_introspection();
3672 self.refresh_wallclock_lag();
3673
3674 for instance in self.instances.values_mut() {
3676 instance.refresh_state_metrics();
3677 }
3678 }
3679}
3680
3681impl From<&IntrospectionType> for CollectionManagerKind {
3682 fn from(value: &IntrospectionType) -> Self {
3683 match value {
3684 IntrospectionType::ShardMapping
3685 | IntrospectionType::Frontiers
3686 | IntrospectionType::ReplicaFrontiers
3687 | IntrospectionType::StorageSourceStatistics
3688 | IntrospectionType::StorageSinkStatistics
3689 | IntrospectionType::ComputeDependencies
3690 | IntrospectionType::ComputeOperatorHydrationStatus
3691 | IntrospectionType::ComputeMaterializedViewRefreshes
3692 | IntrospectionType::ComputeErrorCounts
3693 | IntrospectionType::ComputeHydrationTimes => CollectionManagerKind::Differential,
3694
3695 IntrospectionType::SourceStatusHistory
3696 | IntrospectionType::SinkStatusHistory
3697 | IntrospectionType::PrivatelinkConnectionStatusHistory
3698 | IntrospectionType::ReplicaStatusHistory
3699 | IntrospectionType::ReplicaMetricsHistory
3700 | IntrospectionType::WallclockLagHistory
3701 | IntrospectionType::WallclockLagHistogram
3702 | IntrospectionType::PreparedStatementHistory
3703 | IntrospectionType::StatementExecutionHistory
3704 | IntrospectionType::SessionHistory
3705 | IntrospectionType::StatementLifecycleHistory
3706 | IntrospectionType::SqlText => CollectionManagerKind::AppendOnly,
3707 }
3708 }
3709}
3710
3711async fn snapshot_statistics<T>(
3717 id: GlobalId,
3718 upper: Antichain<T>,
3719 storage_collections: &Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
3720) -> Vec<Row>
3721where
3722 T: Codec64 + From<EpochMillis> + TimestampManipulation,
3723{
3724 match upper.as_option() {
3725 Some(f) if f > &T::minimum() => {
3726 let as_of = f.step_back().unwrap();
3727
3728 let snapshot = storage_collections.snapshot(id, as_of).await.unwrap();
3729 snapshot
3730 .into_iter()
3731 .map(|(row, diff)| {
3732 assert_eq!(diff, 1);
3733 row
3734 })
3735 .collect()
3736 }
3737 _ => Vec::new(),
3740 }
3741}
3742
3743async fn read_handle_for_snapshot<T>(
3744 persist: &PersistClientCache,
3745 id: GlobalId,
3746 metadata: &CollectionMetadata,
3747) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>>
3748where
3749 T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
3750{
3751 let persist_client = persist
3752 .open(metadata.persist_location.clone())
3753 .await
3754 .unwrap();
3755
3756 let read_handle = persist_client
3761 .open_leased_reader::<SourceData, (), _, _>(
3762 metadata.data_shard,
3763 Arc::new(metadata.relation_desc.clone()),
3764 Arc::new(UnitSchema),
3765 Diagnostics {
3766 shard_name: id.to_string(),
3767 handle_purpose: format!("snapshot {}", id),
3768 },
3769 USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3770 )
3771 .await
3772 .expect("invalid persist usage");
3773 Ok(read_handle)
3774}
3775
3776#[derive(Debug)]
3778struct CollectionState<T: TimelyTimestamp> {
3779 pub data_source: DataSource<T>,
3781
3782 pub collection_metadata: CollectionMetadata,
3783
3784 pub extra_state: CollectionStateExtra<T>,
3785
3786 wallclock_lag_max: WallclockLag,
3788 wallclock_lag_histogram_stash: Option<
3795 BTreeMap<
3796 (
3797 WallclockLagHistogramPeriod,
3798 WallclockLag,
3799 BTreeMap<&'static str, String>,
3800 ),
3801 Diff,
3802 >,
3803 >,
3804 wallclock_lag_metrics: WallclockLagMetrics,
3806}
3807
3808impl<T: TimelyTimestamp> CollectionState<T> {
3809 fn new(
3810 data_source: DataSource<T>,
3811 collection_metadata: CollectionMetadata,
3812 extra_state: CollectionStateExtra<T>,
3813 wallclock_lag_metrics: WallclockLagMetrics,
3814 ) -> Self {
3815 let wallclock_lag_histogram_stash = match &data_source {
3819 DataSource::Other => None,
3820 _ => Some(Default::default()),
3821 };
3822
3823 Self {
3824 data_source,
3825 collection_metadata,
3826 extra_state,
3827 wallclock_lag_max: WallclockLag::MIN,
3828 wallclock_lag_histogram_stash,
3829 wallclock_lag_metrics,
3830 }
3831 }
3832}
3833
3834#[derive(Debug)]
3836enum CollectionStateExtra<T: TimelyTimestamp> {
3837 Ingestion(IngestionState<T>),
3838 Export(ExportState<T>),
3839 None,
3840}
3841
3842#[derive(Debug)]
3844struct IngestionState<T: TimelyTimestamp> {
3845 pub read_capabilities: MutableAntichain<T>,
3847
3848 pub derived_since: Antichain<T>,
3851
3852 pub dependency_read_holds: Vec<ReadHold<T>>,
3854
3855 pub write_frontier: Antichain<T>,
3857
3858 pub hold_policy: ReadPolicy<T>,
3865
3866 pub instance_id: StorageInstanceId,
3868
3869 pub hydrated_on: BTreeSet<ReplicaId>,
3871}
3872
3873struct StatusHistoryDesc<K> {
3878 retention_policy: StatusHistoryRetentionPolicy,
3879 extract_key: Box<dyn Fn(&[Datum]) -> K + Send>,
3880 extract_time: Box<dyn Fn(&[Datum]) -> CheckedTimestamp<DateTime<Utc>> + Send>,
3881}
3882enum StatusHistoryRetentionPolicy {
3883 LastN(usize),
3885 TimeWindow(Duration),
3887}
3888
3889fn source_status_history_desc(
3890 params: &StorageParameters,
3891) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3892 let desc = &MZ_SOURCE_STATUS_HISTORY_DESC;
3893 let (source_id_idx, _) = desc.get_by_name(&"source_id".into()).expect("exists");
3894 let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3895 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3896
3897 StatusHistoryDesc {
3898 retention_policy: StatusHistoryRetentionPolicy::LastN(
3899 params.keep_n_source_status_history_entries,
3900 ),
3901 extract_key: Box::new(move |datums| {
3902 (
3903 GlobalId::from_str(datums[source_id_idx].unwrap_str()).expect("GlobalId column"),
3904 if datums[replica_id_idx].is_null() {
3905 None
3906 } else {
3907 Some(
3908 ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3909 .expect("ReplicaId column"),
3910 )
3911 },
3912 )
3913 }),
3914 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3915 }
3916}
3917
3918fn sink_status_history_desc(
3919 params: &StorageParameters,
3920) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3921 let desc = &MZ_SINK_STATUS_HISTORY_DESC;
3922 let (sink_id_idx, _) = desc.get_by_name(&"sink_id".into()).expect("exists");
3923 let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3924 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3925
3926 StatusHistoryDesc {
3927 retention_policy: StatusHistoryRetentionPolicy::LastN(
3928 params.keep_n_sink_status_history_entries,
3929 ),
3930 extract_key: Box::new(move |datums| {
3931 (
3932 GlobalId::from_str(datums[sink_id_idx].unwrap_str()).expect("GlobalId column"),
3933 if datums[replica_id_idx].is_null() {
3934 None
3935 } else {
3936 Some(
3937 ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3938 .expect("ReplicaId column"),
3939 )
3940 },
3941 )
3942 }),
3943 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3944 }
3945}
3946
3947fn privatelink_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<GlobalId> {
3948 let desc = &MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC;
3949 let (key_idx, _) = desc.get_by_name(&"connection_id".into()).expect("exists");
3950 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3951
3952 StatusHistoryDesc {
3953 retention_policy: StatusHistoryRetentionPolicy::LastN(
3954 params.keep_n_privatelink_status_history_entries,
3955 ),
3956 extract_key: Box::new(move |datums| {
3957 GlobalId::from_str(datums[key_idx].unwrap_str()).expect("GlobalId column")
3958 }),
3959 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3960 }
3961}
3962
3963fn replica_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<(GlobalId, u64)> {
3964 let desc = &REPLICA_STATUS_HISTORY_DESC;
3965 let (replica_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3966 let (process_idx, _) = desc.get_by_name(&"process_id".into()).expect("exists");
3967 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3968
3969 StatusHistoryDesc {
3970 retention_policy: StatusHistoryRetentionPolicy::TimeWindow(
3971 params.replica_status_history_retention_window,
3972 ),
3973 extract_key: Box::new(move |datums| {
3974 (
3975 GlobalId::from_str(datums[replica_idx].unwrap_str()).expect("GlobalId column"),
3976 datums[process_idx].unwrap_uint64(),
3977 )
3978 }),
3979 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3980 }
3981}
3982
3983fn swap_updates<T: Timestamp>(
3985 from: &mut Antichain<T>,
3986 mut replace_with: Antichain<T>,
3987) -> ChangeBatch<T> {
3988 let mut update = ChangeBatch::new();
3989 if PartialOrder::less_equal(from, &replace_with) {
3990 update.extend(replace_with.iter().map(|time| (time.clone(), 1)));
3991 std::mem::swap(from, &mut replace_with);
3992 update.extend(replace_with.iter().map(|time| (time.clone(), -1)));
3993 }
3994 update
3995}