1use std::any::Any;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::{Debug, Display};
15use std::num::NonZeroI64;
16use std::str::FromStr;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use async_trait::async_trait;
21use chrono::{DateTime, DurationRound, TimeDelta, Utc};
22use derivative::Derivative;
23use differential_dataflow::lattice::Lattice;
24use futures::FutureExt;
25use futures::StreamExt;
26use itertools::Itertools;
27use mz_build_info::BuildInfo;
28use mz_cluster_client::client::ClusterReplicaLocation;
29use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
30use mz_cluster_client::{ReplicaId, WallclockLagFn};
31use mz_controller_types::dyncfgs::{
32 ENABLE_0DT_DEPLOYMENT_SOURCES, ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION,
33 WALLCLOCK_LAG_RECORDING_INTERVAL,
34};
35use mz_ore::collections::CollectionExt;
36use mz_ore::metrics::MetricsRegistry;
37use mz_ore::now::{EpochMillis, NowFn};
38use mz_ore::task::AbortOnDropHandle;
39use mz_ore::{assert_none, instrument, soft_panic_or_log};
40use mz_persist_client::batch::ProtoBatch;
41use mz_persist_client::cache::PersistClientCache;
42use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
43use mz_persist_client::read::ReadHandle;
44use mz_persist_client::schema::CaESchema;
45use mz_persist_client::write::WriteHandle;
46use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
47use mz_persist_types::Codec64;
48use mz_persist_types::codec_impls::UnitSchema;
49use mz_proto::RustType;
50use mz_repr::adt::interval::Interval;
51use mz_repr::adt::timestamp::CheckedTimestamp;
52use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
53use mz_storage_client::client::{
54 AppendOnlyUpdate, ProtoStorageCommand, ProtoStorageResponse, RunIngestionCommand,
55 RunOneshotIngestion, RunSinkCommand, Status, StatusUpdate, StorageCommand, StorageResponse,
56 TableData,
57};
58use mz_storage_client::controller::{
59 BoxFuture, CollectionDescription, DataSource, ExportDescription, ExportState,
60 IntrospectionType, MonotonicAppender, PersistEpoch, Response, StorageController,
61 StorageMetadata, StorageTxn, StorageWriteOp, WallclockLagHistogramPeriod,
62};
63use mz_storage_client::healthcheck::{
64 MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC,
65 MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_STATUS_HISTORY_DESC,
66};
67use mz_storage_client::metrics::StorageControllerMetrics;
68use mz_storage_client::statistics::{
69 SinkStatisticsUpdate, SourceStatisticsUpdate, WebhookStatistics,
70};
71use mz_storage_client::storage_collections::StorageCollections;
72use mz_storage_types::configuration::StorageConfiguration;
73use mz_storage_types::connections::ConnectionContext;
74use mz_storage_types::connections::inline::InlinedConnection;
75use mz_storage_types::controller::{AlterError, CollectionMetadata, StorageError, TxnsCodecRow};
76use mz_storage_types::instances::StorageInstanceId;
77use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
78use mz_storage_types::parameters::StorageParameters;
79use mz_storage_types::read_holds::ReadHold;
80use mz_storage_types::read_policy::ReadPolicy;
81use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
82use mz_storage_types::sources::{
83 GenericSourceConnection, IngestionDescription, SourceConnection, SourceData, SourceDesc,
84 SourceExport, SourceExportDataConfig,
85};
86use mz_storage_types::{AlterCompatible, StorageDiff, dyncfgs};
87use mz_txn_wal::metrics::Metrics as TxnMetrics;
88use mz_txn_wal::txn_read::TxnsRead;
89use mz_txn_wal::txns::TxnsHandle;
90use timely::order::{PartialOrder, TotalOrder};
91use timely::progress::Timestamp as TimelyTimestamp;
92use timely::progress::frontier::MutableAntichain;
93use timely::progress::{Antichain, ChangeBatch, Timestamp};
94use tokio::sync::watch::{Sender, channel};
95use tokio::sync::{mpsc, oneshot};
96use tokio::time::MissedTickBehavior;
97use tokio::time::error::Elapsed;
98use tracing::{debug, info, warn};
99
100use crate::collection_mgmt::{
101 AppendOnlyIntrospectionConfig, CollectionManagerKind, DifferentialIntrospectionConfig,
102};
103use crate::instance::{Instance, ReplicaConfig};
104use crate::statistics::StatsState;
105
106mod collection_mgmt;
107mod history;
108mod instance;
109mod persist_handles;
110mod rtr;
111mod statistics;
112
113#[derive(Derivative)]
114#[derivative(Debug)]
115struct PendingOneshotIngestion {
116 #[derivative(Debug = "ignore")]
118 result_tx: OneshotResultCallback<ProtoBatch>,
119 cluster_id: StorageInstanceId,
121}
122
123impl PendingOneshotIngestion {
124 pub(crate) fn cancel(self) {
128 (self.result_tx)(vec![Err("canceled".to_string())])
129 }
130}
131
132#[derive(Derivative)]
134#[derivative(Debug)]
135pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation>
136{
137 build_info: &'static BuildInfo,
139 now: NowFn,
141 envd_epoch: NonZeroI64,
143
144 read_only: bool,
150
151 pub(crate) collections: BTreeMap<GlobalId, CollectionState<T>>,
156
157 dropped_objects: BTreeMap<GlobalId, BTreeSet<ReplicaId>>,
166
167 pub(crate) persist_table_worker: persist_handles::PersistTableWriteWorker<T>,
169 txns_read: TxnsRead<T>,
171 txns_metrics: Arc<TxnMetrics>,
172 stashed_responses: Vec<(Option<ReplicaId>, StorageResponse<T>)>,
173 #[derivative(Debug = "ignore")]
175 pending_table_handle_drops_tx: mpsc::UnboundedSender<GlobalId>,
176 #[derivative(Debug = "ignore")]
178 pending_table_handle_drops_rx: mpsc::UnboundedReceiver<GlobalId>,
179 #[derivative(Debug = "ignore")]
181 pending_oneshot_ingestions: BTreeMap<uuid::Uuid, PendingOneshotIngestion>,
182
183 pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
185
186 pub(crate) introspection_ids: BTreeMap<IntrospectionType, GlobalId>,
188 introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
193
194 source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
199 sink_statistics: Arc<Mutex<BTreeMap<GlobalId, statistics::StatsState<SinkStatisticsUpdate>>>>,
202 statistics_interval_sender: Sender<Duration>,
204
205 instances: BTreeMap<StorageInstanceId, Instance<T>>,
207 initialized: bool,
209 config: StorageConfiguration,
211 persist_location: PersistLocation,
213 persist: Arc<PersistClientCache>,
215 metrics: StorageControllerMetrics,
217 recorded_frontiers: BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)>,
220 recorded_replica_frontiers: BTreeMap<(GlobalId, ReplicaId), Antichain<T>>,
223
224 #[derivative(Debug = "ignore")]
226 wallclock_lag: WallclockLagFn<T>,
227 wallclock_lag_last_recorded: DateTime<Utc>,
229
230 storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
232 migrated_storage_collections: BTreeSet<GlobalId>,
234
235 maintenance_ticker: tokio::time::Interval,
237 maintenance_scheduled: bool,
239
240 instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
242 instance_response_rx: mpsc::UnboundedReceiver<(Option<ReplicaId>, StorageResponse<T>)>,
244
245 persist_warm_task: Option<AbortOnDropHandle<Box<dyn Debug + Send>>>,
247}
248
249fn warm_persist_state_in_background(
254 client: PersistClient,
255 shard_ids: impl Iterator<Item = ShardId> + Send + 'static,
256) -> mz_ore::task::JoinHandle<Box<dyn Debug + Send>> {
257 const MAX_CONCURRENT_WARMS: usize = 16;
259 let logic = async move {
260 let fetchers: Vec<_> = tokio_stream::iter(shard_ids)
261 .map(|shard_id| {
262 let client = client.clone();
263 async move {
264 client
265 .create_batch_fetcher::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
266 shard_id,
267 Arc::new(RelationDesc::empty()),
268 Arc::new(UnitSchema),
269 true,
270 Diagnostics::from_purpose("warm persist load state"),
271 )
272 .await
273 }
274 })
275 .buffer_unordered(MAX_CONCURRENT_WARMS)
276 .collect()
277 .await;
278 let fetchers: Box<dyn Debug + Send> = Box::new(fetchers);
279 fetchers
280 };
281 mz_ore::task::spawn(|| "warm_persist_load_state", logic)
282}
283
284#[async_trait(?Send)]
285impl<T> StorageController for Controller<T>
286where
287 T: Timestamp
288 + Lattice
289 + TotalOrder
290 + Codec64
291 + From<EpochMillis>
292 + TimestampManipulation
293 + Into<Datum<'static>>
294 + Display,
295 StorageCommand<T>: RustType<ProtoStorageCommand>,
296 StorageResponse<T>: RustType<ProtoStorageResponse>,
297{
298 type Timestamp = T;
299
300 fn initialization_complete(&mut self) {
301 self.reconcile_dangling_statistics();
302 self.initialized = true;
303
304 for instance in self.instances.values_mut() {
305 instance.send(StorageCommand::InitializationComplete);
306 }
307 }
308
309 fn update_parameters(&mut self, config_params: StorageParameters) {
310 self.storage_collections
311 .update_parameters(config_params.clone());
312
313 self.persist.cfg().apply_from(&config_params.dyncfg_updates);
316
317 for instance in self.instances.values_mut() {
318 let params = Box::new(config_params.clone());
319 instance.send(StorageCommand::UpdateConfiguration(params));
320 }
321 self.config.update(config_params);
322 self.statistics_interval_sender
323 .send_replace(self.config.parameters.statistics_interval);
324 self.collection_manager.update_user_batch_duration(
325 self.config
326 .parameters
327 .user_storage_managed_collections_batch_duration,
328 );
329 }
330
331 fn config(&self) -> &StorageConfiguration {
333 &self.config
334 }
335
336 fn collection_metadata(
337 &self,
338 id: GlobalId,
339 ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>> {
340 self.storage_collections.collection_metadata(id)
341 }
342
343 fn collection_hydrated(
344 &self,
345 collection_id: GlobalId,
346 ) -> Result<bool, StorageError<Self::Timestamp>> {
347 let collection = self.collection(collection_id)?;
348
349 let instance_id = match &collection.data_source {
350 DataSource::Ingestion(ingestion_description) => ingestion_description.instance_id,
351 DataSource::IngestionExport { ingestion_id, .. } => {
352 let ingestion_state = self.collections.get(ingestion_id).expect("known to exist");
353
354 let instance_id = match &ingestion_state.data_source {
355 DataSource::Ingestion(ingestion_desc) => ingestion_desc.instance_id,
356 _ => unreachable!("SourceExport must only refer to primary source"),
357 };
358
359 instance_id
360 }
361 _ => return Ok(true),
362 };
363
364 let instance = self.instances.get(&instance_id).ok_or_else(|| {
365 StorageError::IngestionInstanceMissing {
366 storage_instance_id: instance_id,
367 ingestion_id: collection_id,
368 }
369 })?;
370
371 if instance.replica_ids().next().is_none() {
372 return Ok(true);
375 }
376
377 match &collection.extra_state {
378 CollectionStateExtra::Ingestion(ingestion_state) => {
379 Ok(ingestion_state.hydrated_on.len() >= 1)
381 }
382 CollectionStateExtra::Export(_) => {
383 Ok(true)
388 }
389 CollectionStateExtra::None => {
390 Ok(true)
394 }
395 }
396 }
397
398 #[mz_ore::instrument(level = "debug")]
399 fn collections_hydrated_on_replicas(
400 &self,
401 target_replica_ids: Option<Vec<ReplicaId>>,
402 exclude_collections: &BTreeSet<GlobalId>,
403 ) -> Result<bool, StorageError<Self::Timestamp>> {
404 let target_replicas: Option<BTreeSet<ReplicaId>> =
407 target_replica_ids.map(|ids| ids.into_iter().collect());
408
409 let mut all_hydrated = true;
410 for (collection_id, collection_state) in self.collections.iter() {
411 if collection_id.is_transient() || exclude_collections.contains(collection_id) {
412 continue;
413 }
414 let hydrated = match &collection_state.extra_state {
415 CollectionStateExtra::Ingestion(state) => {
416 match &target_replicas {
417 Some(target_replicas) => !state.hydrated_on.is_disjoint(target_replicas),
418 None => {
419 state.hydrated_on.len() >= 1
422 }
423 }
424 }
425 CollectionStateExtra::Export(_) => {
426 true
431 }
432 CollectionStateExtra::None => {
433 true
437 }
438 };
439 if !hydrated {
440 tracing::info!(%collection_id, "collection is not hydrated on any replica");
441 all_hydrated = false;
442 }
445 }
446 Ok(all_hydrated)
447 }
448
449 fn collection_frontiers(
450 &self,
451 id: GlobalId,
452 ) -> Result<
453 (Antichain<Self::Timestamp>, Antichain<Self::Timestamp>),
454 StorageError<Self::Timestamp>,
455 > {
456 let frontiers = self.storage_collections.collection_frontiers(id)?;
457 Ok((frontiers.implied_capability, frontiers.write_frontier))
458 }
459
460 fn collections_frontiers(
461 &self,
462 mut ids: Vec<GlobalId>,
463 ) -> Result<Vec<(GlobalId, Antichain<T>, Antichain<T>)>, StorageError<Self::Timestamp>> {
464 let mut result = vec![];
465 ids.retain(|&id| match self.export(id) {
470 Ok(export) => {
471 result.push((
472 id,
473 export.input_hold().since().clone(),
474 export.write_frontier.clone(),
475 ));
476 false
477 }
478 Err(_) => true,
479 });
480 result.extend(
481 self.storage_collections
482 .collections_frontiers(ids)?
483 .into_iter()
484 .map(|frontiers| {
485 (
486 frontiers.id,
487 frontiers.implied_capability,
488 frontiers.write_frontier,
489 )
490 }),
491 );
492
493 Ok(result)
494 }
495
496 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
497 self.storage_collections.active_collection_metadatas()
498 }
499
500 fn active_ingestions(
501 &self,
502 instance_id: StorageInstanceId,
503 ) -> Box<dyn Iterator<Item = &GlobalId> + '_> {
504 Box::new(self.instances[&instance_id].active_ingestions())
505 }
506
507 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
508 self.storage_collections.check_exists(id)
509 }
510
511 fn create_instance(&mut self, id: StorageInstanceId) {
512 let metrics = self.metrics.for_instance(id);
513 let mut instance = Instance::new(
514 self.envd_epoch,
515 metrics,
516 Arc::clone(self.config().config_set()),
517 self.now.clone(),
518 self.instance_response_tx.clone(),
519 );
520 if self.initialized {
521 instance.send(StorageCommand::InitializationComplete);
522 }
523 if !self.read_only {
524 instance.send(StorageCommand::AllowWrites);
525 }
526
527 let params = Box::new(self.config.parameters.clone());
528 instance.send(StorageCommand::UpdateConfiguration(params));
529
530 let old_instance = self.instances.insert(id, instance);
531 assert_none!(old_instance, "storage instance {id} already exists");
532 }
533
534 fn drop_instance(&mut self, id: StorageInstanceId) {
535 let instance = self.instances.remove(&id);
536 assert!(instance.is_some(), "storage instance {id} does not exist");
537 }
538
539 fn update_instance_workload_class(
540 &mut self,
541 id: StorageInstanceId,
542 workload_class: Option<String>,
543 ) {
544 let instance = self
545 .instances
546 .get_mut(&id)
547 .unwrap_or_else(|| panic!("instance {id} does not exist"));
548
549 instance.workload_class = workload_class;
550 }
551
552 fn connect_replica(
553 &mut self,
554 instance_id: StorageInstanceId,
555 replica_id: ReplicaId,
556 location: ClusterReplicaLocation,
557 ) {
558 let instance = self
559 .instances
560 .get_mut(&instance_id)
561 .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
562
563 let config = ReplicaConfig {
564 build_info: self.build_info,
565 location,
566 grpc_client: self.config.parameters.grpc_client.clone(),
567 };
568 instance.add_replica(replica_id, config);
569 }
570
571 fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId) {
572 let instance = self
573 .instances
574 .get_mut(&instance_id)
575 .unwrap_or_else(|| panic!("instance {instance_id} does not exist"));
576
577 let status_now = mz_ore::now::to_datetime((self.now)());
578 let mut source_status_updates = vec![];
579 let mut sink_status_updates = vec![];
580
581 let make_update = |id, object_type| StatusUpdate {
582 id,
583 status: Status::Paused,
584 timestamp: status_now,
585 error: None,
586 hints: BTreeSet::from([format!(
587 "The replica running this {object_type} has been dropped"
588 )]),
589 namespaced_errors: Default::default(),
590 replica_id: Some(replica_id),
591 };
592
593 for id in instance.active_ingestions() {
594 if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
595 active_replicas.remove(&replica_id);
596 if active_replicas.is_empty() {
597 self.dropped_objects.remove(id);
598 }
599 }
600
601 let ingestion = self
602 .collections
603 .get_mut(id)
604 .expect("instance contains unknown ingestion");
605
606 let ingestion_description = match &ingestion.data_source {
607 DataSource::Ingestion(ingestion_description) => ingestion_description.clone(),
608 _ => panic!(
609 "unexpected data source for ingestion: {:?}",
610 ingestion.data_source
611 ),
612 };
613
614 let subsource_ids = ingestion_description
624 .collection_ids()
625 .filter(|id| id != &ingestion_description.remap_collection_id);
626 for id in subsource_ids {
627 source_status_updates.push(make_update(id, "source"));
628 }
629 }
630
631 for id in instance.active_exports() {
632 if let Some(active_replicas) = self.dropped_objects.get_mut(id) {
633 active_replicas.remove(&replica_id);
634 if active_replicas.is_empty() {
635 self.dropped_objects.remove(id);
636 }
637 }
638
639 sink_status_updates.push(make_update(*id, "sink"));
640 }
641
642 instance.drop_replica(replica_id);
643
644 if !self.read_only {
645 if !source_status_updates.is_empty() {
646 self.append_status_introspection_updates(
647 IntrospectionType::SourceStatusHistory,
648 source_status_updates,
649 );
650 }
651 if !sink_status_updates.is_empty() {
652 self.append_status_introspection_updates(
653 IntrospectionType::SinkStatusHistory,
654 sink_status_updates,
655 );
656 }
657 }
658 }
659
660 async fn evolve_nullability_for_bootstrap(
661 &mut self,
662 storage_metadata: &StorageMetadata,
663 collections: Vec<(GlobalId, RelationDesc)>,
664 ) -> Result<(), StorageError<Self::Timestamp>> {
665 let persist_client = self
666 .persist
667 .open(self.persist_location.clone())
668 .await
669 .unwrap();
670
671 for (global_id, relation_desc) in collections {
672 let shard_id = storage_metadata.get_collection_shard(global_id)?;
673 let diagnostics = Diagnostics {
674 shard_name: global_id.to_string(),
675 handle_purpose: "evolve nullability for bootstrap".to_string(),
676 };
677 let latest_schema = persist_client
678 .latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
679 .await
680 .expect("invalid persist usage");
681 let Some((schema_id, current_schema, _)) = latest_schema else {
682 tracing::debug!(?global_id, "no schema registered");
683 continue;
684 };
685 tracing::debug!(?global_id, ?current_schema, new_schema = ?relation_desc, "migrating schema");
686
687 let diagnostics = Diagnostics {
688 shard_name: global_id.to_string(),
689 handle_purpose: "evolve nullability for bootstrap".to_string(),
690 };
691 let evolve_result = persist_client
692 .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
693 shard_id,
694 schema_id,
695 &relation_desc,
696 &UnitSchema,
697 diagnostics,
698 )
699 .await
700 .expect("invalid persist usage");
701 match evolve_result {
702 CaESchema::Ok(_) => (),
703 CaESchema::ExpectedMismatch {
704 schema_id,
705 key,
706 val: _,
707 } => {
708 return Err(StorageError::PersistSchemaEvolveRace {
709 global_id,
710 shard_id,
711 schema_id,
712 relation_desc: key,
713 });
714 }
715 CaESchema::Incompatible => {
716 return Err(StorageError::PersistInvalidSchemaEvolve {
717 global_id,
718 shard_id,
719 });
720 }
721 };
722 }
723
724 Ok(())
725 }
726
727 #[instrument(name = "storage::create_collections")]
746 async fn create_collections_for_bootstrap(
747 &mut self,
748 storage_metadata: &StorageMetadata,
749 register_ts: Option<Self::Timestamp>,
750 mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
751 migrated_storage_collections: &BTreeSet<GlobalId>,
752 ) -> Result<(), StorageError<Self::Timestamp>> {
753 self.migrated_storage_collections
754 .extend(migrated_storage_collections.iter().cloned());
755
756 self.storage_collections
757 .create_collections_for_bootstrap(
758 storage_metadata,
759 register_ts.clone(),
760 collections.clone(),
761 migrated_storage_collections,
762 )
763 .await?;
764
765 drop(self.persist_warm_task.take());
768
769 collections.sort_by_key(|(id, _)| *id);
774 collections.dedup();
775 for pos in 1..collections.len() {
776 if collections[pos - 1].0 == collections[pos].0 {
777 return Err(StorageError::CollectionIdReused(collections[pos].0));
778 }
779 }
780
781 let enriched_with_metadata = collections
783 .into_iter()
784 .map(|(id, description)| {
785 let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
786
787 let get_shard = |id| -> Result<ShardId, StorageError<T>> {
788 let shard = storage_metadata.get_collection_shard::<T>(id)?;
789 Ok(shard)
790 };
791
792 let remap_shard = match &description.data_source {
793 DataSource::Ingestion(IngestionDescription {
795 remap_collection_id,
796 ..
797 }) => {
798 Some(get_shard(*remap_collection_id)?)
801 }
802 _ => None,
803 };
804
805 let txns_shard = description
808 .data_source
809 .in_txns()
810 .then(|| *self.txns_read.txns_id());
811
812 let metadata = CollectionMetadata {
813 persist_location: self.persist_location.clone(),
814 remap_shard,
815 data_shard,
816 relation_desc: description.desc.clone(),
817 txns_shard,
818 };
819
820 Ok((id, description, metadata))
821 })
822 .collect_vec();
823
824 let persist_client = self
826 .persist
827 .open(self.persist_location.clone())
828 .await
829 .unwrap();
830 let persist_client = &persist_client;
831
832 use futures::stream::{StreamExt, TryStreamExt};
835 let this = &*self;
836 let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
837 .map(|data: Result<_, StorageError<Self::Timestamp>>| {
838 async move {
839 let (id, description, metadata) = data?;
840
841 debug!(
844 "mapping GlobalId={} to remap shard ({:?}), data shard ({})",
845 id, metadata.remap_shard, metadata.data_shard
846 );
847
848 let write = this
849 .open_data_handles(
850 &id,
851 metadata.data_shard,
852 metadata.relation_desc.clone(),
853 persist_client,
854 )
855 .await;
856
857 Ok::<_, StorageError<T>>((id, description, write, metadata))
858 }
859 })
860 .buffer_unordered(50)
862 .try_collect()
875 .await?;
876
877 let mut to_execute = BTreeSet::new();
880 let mut new_collections = BTreeSet::new();
885 let mut table_registers = Vec::with_capacity(to_register.len());
886
887 to_register.sort_by_key(|(id, ..)| *id);
889
890 let (tables_to_register, collections_to_register): (Vec<_>, Vec<_>) = to_register
896 .into_iter()
897 .partition(|(_id, desc, ..)| matches!(desc.data_source, DataSource::Table { .. }));
898 let to_register = tables_to_register
899 .into_iter()
900 .rev()
901 .chain(collections_to_register.into_iter());
902
903 let mut new_source_statistic_entries = BTreeSet::new();
907 let mut new_webhook_statistic_entries = BTreeSet::new();
908 let mut new_sink_statistic_entries = BTreeSet::new();
909
910 for (id, description, write, metadata) in to_register {
911 let is_in_txns = |id, metadata: &CollectionMetadata| {
912 metadata.txns_shard.is_some()
913 && !(self.read_only && migrated_storage_collections.contains(&id))
914 };
915
916 let mut data_source = description.data_source;
917
918 to_execute.insert(id);
919 new_collections.insert(id);
920
921 if let DataSource::Ingestion(ingestion) = &mut data_source {
926 let export = ingestion.desc.primary_source_export();
927 ingestion.source_exports.insert(id, export);
928 }
929
930 let write_frontier = write.upper();
931
932 let storage_dependencies = self.determine_collection_dependencies(id, &data_source)?;
934
935 let dependency_read_holds = self
936 .storage_collections
937 .acquire_read_holds(storage_dependencies)
938 .expect("can acquire read holds");
939
940 let mut dependency_since = Antichain::from_elem(T::minimum());
941 for read_hold in dependency_read_holds.iter() {
942 dependency_since.join_assign(read_hold.since());
943 }
944
945 if !dependency_read_holds.is_empty()
954 && !is_in_txns(id, &metadata)
955 && !matches!(&data_source, DataSource::Sink { .. })
956 {
957 mz_ore::soft_assert_or_log!(
975 write_frontier.elements() == &[T::minimum()]
976 || write_frontier.is_empty()
977 || PartialOrder::less_than(&dependency_since, write_frontier),
978 "dependency since has advanced past dependent ({id}) upper \n
979 dependent ({id}): upper {:?} \n
980 dependency since {:?} \n
981 dependency read holds: {:?}",
982 write_frontier,
983 dependency_since,
984 dependency_read_holds,
985 );
986 }
987
988 let mut extra_state = CollectionStateExtra::None;
990 let mut maybe_instance_id = None;
991 match &data_source {
992 DataSource::Introspection(typ) => {
993 debug!(
994 ?data_source, meta = ?metadata,
995 "registering {id} with persist monotonic worker",
996 );
997 self.register_introspection_collection(
1003 id,
1004 *typ,
1005 write,
1006 persist_client.clone(),
1007 )?;
1008 }
1009 DataSource::Webhook => {
1010 debug!(
1011 ?data_source, meta = ?metadata,
1012 "registering {id} with persist monotonic worker",
1013 );
1014 new_source_statistic_entries.insert(id);
1015 new_webhook_statistic_entries.insert(id);
1018 self.collection_manager
1024 .register_append_only_collection(id, write, false, None);
1025 }
1026 DataSource::IngestionExport {
1027 ingestion_id,
1028 details,
1029 data_config,
1030 } => {
1031 debug!(
1032 ?data_source, meta = ?metadata,
1033 "not registering {id} with a controller persist worker",
1034 );
1035 let ingestion_state = self
1037 .collections
1038 .get_mut(ingestion_id)
1039 .expect("known to exist");
1040
1041 let instance_id = match &mut ingestion_state.data_source {
1042 DataSource::Ingestion(ingestion_desc) => {
1043 ingestion_desc.source_exports.insert(
1044 id,
1045 SourceExport {
1046 storage_metadata: (),
1047 details: details.clone(),
1048 data_config: data_config.clone(),
1049 },
1050 );
1051
1052 ingestion_desc.instance_id
1057 }
1058 _ => unreachable!(
1059 "SourceExport must only refer to primary sources that already exist"
1060 ),
1061 };
1062
1063 to_execute.remove(&id);
1065 to_execute.insert(*ingestion_id);
1066
1067 let ingestion_state = IngestionState {
1068 read_capabilities: MutableAntichain::from(dependency_since.clone()),
1069 dependency_read_holds,
1070 derived_since: dependency_since,
1071 write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1072 hold_policy: ReadPolicy::step_back(),
1073 instance_id,
1074 hydrated_on: BTreeSet::new(),
1075 };
1076
1077 extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1078 maybe_instance_id = Some(instance_id);
1079
1080 new_source_statistic_entries.insert(id);
1081 }
1082 DataSource::Table { .. } => {
1083 debug!(
1084 ?data_source, meta = ?metadata,
1085 "registering {id} with persist table worker",
1086 );
1087 table_registers.push((id, write));
1088 }
1089 DataSource::Progress | DataSource::Other => {
1090 debug!(
1091 ?data_source, meta = ?metadata,
1092 "not registering {id} with a controller persist worker",
1093 );
1094 }
1095 DataSource::Ingestion(ingestion_desc) => {
1096 debug!(
1097 ?data_source, meta = ?metadata,
1098 "not registering {id} with a controller persist worker",
1099 );
1100
1101 let mut dependency_since = Antichain::from_elem(T::minimum());
1102 for read_hold in dependency_read_holds.iter() {
1103 dependency_since.join_assign(read_hold.since());
1104 }
1105
1106 let ingestion_state = IngestionState {
1107 read_capabilities: MutableAntichain::from(dependency_since.clone()),
1108 dependency_read_holds,
1109 derived_since: dependency_since,
1110 write_frontier: Antichain::from_elem(Self::Timestamp::minimum()),
1111 hold_policy: ReadPolicy::step_back(),
1112 instance_id: ingestion_desc.instance_id,
1113 hydrated_on: BTreeSet::new(),
1114 };
1115
1116 extra_state = CollectionStateExtra::Ingestion(ingestion_state);
1117 maybe_instance_id = Some(ingestion_desc.instance_id);
1118
1119 new_source_statistic_entries.insert(id);
1120 }
1121 DataSource::Sink { desc } => {
1122 let mut dependency_since = Antichain::from_elem(T::minimum());
1123 for read_hold in dependency_read_holds.iter() {
1124 dependency_since.join_assign(read_hold.since());
1125 }
1126
1127 let [self_hold, read_hold] =
1128 dependency_read_holds.try_into().expect("two holds");
1129
1130 let state = ExportState::new(
1131 desc.instance_id,
1132 read_hold,
1133 self_hold,
1134 write_frontier.clone(),
1135 ReadPolicy::step_back(),
1136 );
1137 maybe_instance_id = Some(state.cluster_id);
1138 extra_state = CollectionStateExtra::Export(state);
1139
1140 new_sink_statistic_entries.insert(id);
1141 }
1142 }
1143
1144 let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(id, maybe_instance_id);
1145 let collection_state =
1146 CollectionState::new(data_source, metadata, extra_state, wallclock_lag_metrics);
1147
1148 self.collections.insert(id, collection_state);
1149 }
1150
1151 {
1152 let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1158 let mut sink_statistics = self.sink_statistics.lock().expect("poisoned");
1159
1160 for id in new_source_statistic_entries {
1161 source_statistics
1162 .source_statistics
1163 .entry(id)
1164 .or_insert_with(|| StatsState::new(SourceStatisticsUpdate::new(id)));
1165 }
1166 for id in new_webhook_statistic_entries {
1167 source_statistics.webhook_statistics.entry(id).or_default();
1168 }
1169 for id in new_sink_statistic_entries {
1170 sink_statistics
1171 .entry(id)
1172 .or_insert_with(|| StatsState::new(SinkStatisticsUpdate::new(id)));
1173 }
1174 }
1175
1176 if !table_registers.is_empty() {
1178 let register_ts = register_ts
1179 .expect("caller should have provided a register_ts when creating a table");
1180
1181 if self.read_only {
1182 table_registers
1192 .retain(|(id, _write_handle)| migrated_storage_collections.contains(id));
1193
1194 self.persist_table_worker
1195 .register(register_ts, table_registers)
1196 .await
1197 .expect("table worker unexpectedly shut down");
1198 } else {
1199 self.persist_table_worker
1200 .register(register_ts, table_registers)
1201 .await
1202 .expect("table worker unexpectedly shut down");
1203 }
1204 }
1205
1206 self.append_shard_mappings(new_collections.into_iter(), Diff::ONE);
1207
1208 for id in to_execute {
1210 match &self.collection(id)?.data_source {
1211 DataSource::Ingestion(ingestion) => {
1212 if !self.read_only
1213 || (ENABLE_0DT_DEPLOYMENT_SOURCES.get(self.config.config_set())
1214 && ingestion.desc.connection.supports_read_only())
1215 {
1216 self.run_ingestion(id)?;
1217 }
1218 }
1219 DataSource::IngestionExport { .. } => unreachable!(
1220 "ingestion exports do not execute directly, but instead schedule their source to be re-executed"
1221 ),
1222 DataSource::Introspection(_)
1223 | DataSource::Webhook
1224 | DataSource::Table { .. }
1225 | DataSource::Progress
1226 | DataSource::Other => {}
1227 DataSource::Sink { .. } => {
1228 if !self.read_only {
1229 self.run_export(id)?;
1230 }
1231 }
1232 };
1233 }
1234
1235 Ok(())
1236 }
1237
1238 fn check_alter_ingestion_source_desc(
1239 &mut self,
1240 ingestion_id: GlobalId,
1241 source_desc: &SourceDesc,
1242 ) -> Result<(), StorageError<Self::Timestamp>> {
1243 let source_collection = self.collection(ingestion_id)?;
1244 let data_source = &source_collection.data_source;
1245 match &data_source {
1246 DataSource::Ingestion(cur_ingestion) => {
1247 cur_ingestion
1248 .desc
1249 .alter_compatible(ingestion_id, source_desc)?;
1250 }
1251 o => {
1252 tracing::info!(
1253 "{ingestion_id} inalterable because its data source is {:?} and not an ingestion",
1254 o
1255 );
1256 Err(AlterError { id: ingestion_id })?
1257 }
1258 }
1259
1260 Ok(())
1261 }
1262
1263 async fn alter_ingestion_source_desc(
1264 &mut self,
1265 ingestion_id: GlobalId,
1266 source_desc: SourceDesc,
1267 ) -> Result<(), StorageError<Self::Timestamp>> {
1268 self.check_alter_ingestion_source_desc(ingestion_id, &source_desc)?;
1269
1270 let collection = self
1273 .collections
1274 .get_mut(&ingestion_id)
1275 .expect("validated exists");
1276 let curr_ingestion = match &mut collection.data_source {
1277 DataSource::Ingestion(curr_ingestion) => curr_ingestion,
1278 _ => unreachable!("verified collection refers to ingestion"),
1279 };
1280
1281 curr_ingestion.desc = source_desc;
1282 tracing::debug!("altered {ingestion_id}'s SourceDesc");
1283
1284 Ok(())
1294 }
1295
1296 async fn alter_ingestion_connections(
1297 &mut self,
1298 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
1299 ) -> Result<(), StorageError<Self::Timestamp>> {
1300 self.storage_collections
1302 .alter_ingestion_connections(source_connections.clone())
1303 .await?;
1304
1305 let mut ingestions_to_run = BTreeSet::new();
1306
1307 for (id, conn) in source_connections {
1308 let collection = self
1309 .collections
1310 .get_mut(&id)
1311 .ok_or_else(|| StorageError::IdentifierMissing(id))?;
1312
1313 match &mut collection.data_source {
1314 DataSource::Ingestion(ingestion) => {
1315 if ingestion.desc.connection != conn {
1318 tracing::info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
1319 ingestion.desc.connection = conn;
1320 ingestions_to_run.insert(id);
1321 } else {
1322 tracing::warn!(
1323 "update_source_connection called on {id} but the \
1324 connection was the same"
1325 );
1326 }
1327 }
1328 o => {
1329 tracing::warn!("update_source_connection called on {:?}", o);
1330 Err(StorageError::IdentifierInvalid(id))?;
1331 }
1332 }
1333 }
1334
1335 for id in ingestions_to_run {
1336 self.run_ingestion(id)?;
1337 }
1338 Ok(())
1339 }
1340
1341 async fn alter_ingestion_export_data_configs(
1342 &mut self,
1343 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
1344 ) -> Result<(), StorageError<Self::Timestamp>> {
1345 self.storage_collections
1347 .alter_ingestion_export_data_configs(source_exports.clone())
1348 .await?;
1349
1350 let mut ingestions_to_run = BTreeSet::new();
1351
1352 for (source_export_id, new_data_config) in source_exports {
1353 let source_export_collection = self
1356 .collections
1357 .get_mut(&source_export_id)
1358 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1359 let ingestion_id = match &mut source_export_collection.data_source {
1360 DataSource::IngestionExport {
1361 ingestion_id,
1362 details: _,
1363 data_config,
1364 } => {
1365 *data_config = new_data_config.clone();
1366 *ingestion_id
1367 }
1368 o => {
1369 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1370 Err(StorageError::IdentifierInvalid(source_export_id))?
1371 }
1372 };
1373 let ingestion_collection = self
1376 .collections
1377 .get_mut(&ingestion_id)
1378 .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
1379
1380 match &mut ingestion_collection.data_source {
1381 DataSource::Ingestion(ingestion_desc) => {
1382 let source_export = ingestion_desc
1383 .source_exports
1384 .get_mut(&source_export_id)
1385 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
1386
1387 if source_export.data_config != new_data_config {
1390 tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
1391 source_export.data_config = new_data_config;
1392
1393 ingestions_to_run.insert(ingestion_id);
1394 } else {
1395 tracing::warn!(
1396 "alter_ingestion_export_data_configs called on \
1397 export {source_export_id} of {ingestion_id} but \
1398 the data config was the same"
1399 );
1400 }
1401 }
1402 o => {
1403 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
1404 Err(StorageError::IdentifierInvalid(ingestion_id))?
1405 }
1406 }
1407 }
1408
1409 for id in ingestions_to_run {
1410 self.run_ingestion(id)?;
1411 }
1412 Ok(())
1413 }
1414
1415 async fn alter_table_desc(
1416 &mut self,
1417 existing_collection: GlobalId,
1418 new_collection: GlobalId,
1419 new_desc: RelationDesc,
1420 expected_version: RelationVersion,
1421 register_ts: Self::Timestamp,
1422 ) -> Result<(), StorageError<Self::Timestamp>> {
1423 let data_shard = {
1424 let Controller {
1425 collections,
1426 storage_collections,
1427 ..
1428 } = self;
1429
1430 let existing = collections
1431 .get(&existing_collection)
1432 .ok_or(StorageError::IdentifierMissing(existing_collection))?;
1433 if !matches!(existing.data_source, DataSource::Table { .. }) {
1434 return Err(StorageError::IdentifierInvalid(existing_collection));
1435 }
1436
1437 storage_collections
1439 .alter_table_desc(
1440 existing_collection,
1441 new_collection,
1442 new_desc.clone(),
1443 expected_version,
1444 )
1445 .await?;
1446
1447 existing.collection_metadata.data_shard.clone()
1448 };
1449
1450 let persist_client = self
1451 .persist
1452 .open(self.persist_location.clone())
1453 .await
1454 .expect("invalid persist location");
1455 let write_handle = self
1456 .open_data_handles(
1457 &existing_collection,
1458 data_shard,
1459 new_desc.clone(),
1460 &persist_client,
1461 )
1462 .await;
1463
1464 let collection_desc = CollectionDescription::<T>::for_table(new_desc.clone(), None);
1466 let collection_meta = CollectionMetadata {
1467 persist_location: self.persist_location.clone(),
1468 data_shard,
1469 relation_desc: new_desc.clone(),
1470 remap_shard: None,
1472 txns_shard: Some(self.txns_read.txns_id().clone()),
1473 };
1474 let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(new_collection, None);
1476 let collection_state = CollectionState::new(
1477 collection_desc.data_source.clone(),
1478 collection_meta,
1479 CollectionStateExtra::None,
1480 wallclock_lag_metrics,
1481 );
1482
1483 self.collections.insert(new_collection, collection_state);
1486 let existing = self
1487 .collections
1488 .get_mut(&existing_collection)
1489 .expect("missing existing collection");
1490 assert!(matches!(
1491 existing.data_source,
1492 DataSource::Table { primary: None }
1493 ));
1494 existing.data_source = DataSource::Table {
1495 primary: Some(new_collection),
1496 };
1497
1498 self.persist_table_worker
1499 .register(register_ts, vec![(new_collection, write_handle)])
1500 .await
1501 .expect("table worker unexpectedly shut down");
1502
1503 Ok(())
1504 }
1505
1506 fn export(
1507 &self,
1508 id: GlobalId,
1509 ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1510 self.collections
1511 .get(&id)
1512 .and_then(|c| match &c.extra_state {
1513 CollectionStateExtra::Export(state) => Some(state),
1514 _ => None,
1515 })
1516 .ok_or(StorageError::IdentifierMissing(id))
1517 }
1518
1519 fn export_mut(
1520 &mut self,
1521 id: GlobalId,
1522 ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
1523 self.collections
1524 .get_mut(&id)
1525 .and_then(|c| match &mut c.extra_state {
1526 CollectionStateExtra::Export(state) => Some(state),
1527 _ => None,
1528 })
1529 .ok_or(StorageError::IdentifierMissing(id))
1530 }
1531
1532 async fn create_oneshot_ingestion(
1534 &mut self,
1535 ingestion_id: uuid::Uuid,
1536 collection_id: GlobalId,
1537 instance_id: StorageInstanceId,
1538 request: OneshotIngestionRequest,
1539 result_tx: OneshotResultCallback<ProtoBatch>,
1540 ) -> Result<(), StorageError<Self::Timestamp>> {
1541 let collection_meta = self
1542 .collections
1543 .get(&collection_id)
1544 .ok_or_else(|| StorageError::IdentifierMissing(collection_id))?
1545 .collection_metadata
1546 .clone();
1547 let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1548 StorageError::Generic(anyhow::anyhow!("missing cluster {instance_id}"))
1550 })?;
1551 let oneshot_cmd = RunOneshotIngestion {
1552 ingestion_id,
1553 collection_id,
1554 collection_meta,
1555 request,
1556 };
1557
1558 if !self.read_only {
1559 instance.send(StorageCommand::RunOneshotIngestion(Box::new(oneshot_cmd)));
1560 let pending = PendingOneshotIngestion {
1561 result_tx,
1562 cluster_id: instance_id,
1563 };
1564 let novel = self
1565 .pending_oneshot_ingestions
1566 .insert(ingestion_id, pending);
1567 assert_none!(novel);
1568 Ok(())
1569 } else {
1570 Err(StorageError::ReadOnly)
1571 }
1572 }
1573
1574 fn cancel_oneshot_ingestion(
1575 &mut self,
1576 ingestion_id: uuid::Uuid,
1577 ) -> Result<(), StorageError<Self::Timestamp>> {
1578 if self.read_only {
1579 return Err(StorageError::ReadOnly);
1580 }
1581
1582 let pending = self
1583 .pending_oneshot_ingestions
1584 .remove(&ingestion_id)
1585 .ok_or_else(|| {
1586 StorageError::Generic(anyhow::anyhow!("missing oneshot ingestion {ingestion_id}"))
1588 })?;
1589
1590 match self.instances.get_mut(&pending.cluster_id) {
1591 Some(instance) => {
1592 instance.send(StorageCommand::CancelOneshotIngestion(ingestion_id));
1593 }
1594 None => {
1595 mz_ore::soft_panic_or_log!(
1596 "canceling oneshot ingestion on non-existent cluster, ingestion {:?}, instance {}",
1597 ingestion_id,
1598 pending.cluster_id,
1599 );
1600 }
1601 }
1602 pending.cancel();
1604
1605 Ok(())
1606 }
1607
1608 async fn alter_export(
1609 &mut self,
1610 id: GlobalId,
1611 new_description: ExportDescription<Self::Timestamp>,
1612 ) -> Result<(), StorageError<Self::Timestamp>> {
1613 let from_id = new_description.sink.from;
1614
1615 let desired_read_holds = vec![from_id.clone(), id.clone()];
1618 let [input_hold, self_hold] = self
1619 .storage_collections
1620 .acquire_read_holds(desired_read_holds)
1621 .expect("missing dependency")
1622 .try_into()
1623 .expect("expected number of holds");
1624 let from_storage_metadata = self.storage_collections.collection_metadata(from_id)?;
1625 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1626
1627 let cur_export = self.export_mut(id)?;
1629 let input_readable = cur_export
1630 .write_frontier
1631 .iter()
1632 .all(|t| input_hold.since().less_than(t));
1633 if !input_readable {
1634 return Err(StorageError::ReadBeforeSince(from_id));
1635 }
1636
1637 let new_export = ExportState {
1638 read_capabilities: cur_export.read_capabilities.clone(),
1639 cluster_id: new_description.instance_id,
1640 derived_since: cur_export.derived_since.clone(),
1641 read_holds: [input_hold, self_hold],
1642 read_policy: cur_export.read_policy.clone(),
1643 write_frontier: cur_export.write_frontier.clone(),
1644 };
1645 *cur_export = new_export;
1646
1647 let cmd = RunSinkCommand {
1648 id,
1649 description: StorageSinkDesc {
1650 from: from_id,
1651 from_desc: new_description.sink.from_desc,
1652 connection: new_description.sink.connection,
1653 envelope: new_description.sink.envelope,
1654 as_of: new_description.sink.as_of,
1655 version: new_description.sink.version,
1656 from_storage_metadata,
1657 with_snapshot: new_description.sink.with_snapshot,
1658 to_storage_metadata,
1659 },
1660 };
1661
1662 let instance = self
1664 .instances
1665 .get_mut(&new_description.instance_id)
1666 .ok_or_else(|| StorageError::ExportInstanceMissing {
1667 storage_instance_id: new_description.instance_id,
1668 export_id: id,
1669 })?;
1670
1671 instance.send(StorageCommand::RunSink(Box::new(cmd)));
1672 Ok(())
1673 }
1674
1675 async fn alter_export_connections(
1677 &mut self,
1678 exports: BTreeMap<GlobalId, StorageSinkConnection>,
1679 ) -> Result<(), StorageError<Self::Timestamp>> {
1680 let mut updates_by_instance =
1681 BTreeMap::<StorageInstanceId, Vec<(RunSinkCommand<T>, ExportDescription<T>)>>::new();
1682
1683 for (id, connection) in exports {
1684 let (mut new_export_description, as_of): (ExportDescription<Self::Timestamp>, _) = {
1692 let export = &self.collections[&id];
1693 let DataSource::Sink { desc } = &export.data_source else {
1694 panic!("export exists")
1695 };
1696 let CollectionStateExtra::Export(state) = &export.extra_state else {
1697 panic!("export exists")
1698 };
1699 let export_description = desc.clone();
1700 let as_of = state.input_hold().since().clone();
1701
1702 (export_description, as_of)
1703 };
1704 let current_sink = new_export_description.sink.clone();
1705
1706 new_export_description.sink.connection = connection;
1707
1708 current_sink.alter_compatible(id, &new_export_description.sink)?;
1710
1711 let from_storage_metadata = self
1712 .storage_collections
1713 .collection_metadata(new_export_description.sink.from)?;
1714 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
1715
1716 let cmd = RunSinkCommand {
1717 id,
1718 description: StorageSinkDesc {
1719 from: new_export_description.sink.from,
1720 from_desc: new_export_description.sink.from_desc.clone(),
1721 connection: new_export_description.sink.connection.clone(),
1722 envelope: new_export_description.sink.envelope,
1723 with_snapshot: new_export_description.sink.with_snapshot,
1724 version: new_export_description.sink.version,
1725 as_of: as_of.to_owned(),
1736 from_storage_metadata,
1737 to_storage_metadata,
1738 },
1739 };
1740
1741 let update = updates_by_instance
1742 .entry(new_export_description.instance_id)
1743 .or_default();
1744 update.push((cmd, new_export_description));
1745 }
1746
1747 for (instance_id, updates) in updates_by_instance {
1748 let mut export_updates = BTreeMap::new();
1749 let mut cmds = Vec::with_capacity(updates.len());
1750
1751 for (cmd, export_state) in updates {
1752 export_updates.insert(cmd.id, export_state);
1753 cmds.push(cmd);
1754 }
1755
1756 let instance = self.instances.get_mut(&instance_id).ok_or_else(|| {
1758 StorageError::ExportInstanceMissing {
1759 storage_instance_id: instance_id,
1760 export_id: *export_updates
1761 .keys()
1762 .next()
1763 .expect("set of exports not empty"),
1764 }
1765 })?;
1766
1767 for cmd in cmds {
1768 instance.send(StorageCommand::RunSink(Box::new(cmd)));
1769 }
1770
1771 for (id, new_export_description) in export_updates {
1773 let Some(state) = self.collections.get_mut(&id) else {
1774 panic!("export known to exist")
1775 };
1776 let DataSource::Sink { desc } = &mut state.data_source else {
1777 panic!("export known to exist")
1778 };
1779 *desc = new_export_description;
1780 }
1781 }
1782
1783 Ok(())
1784 }
1785
1786 fn drop_tables(
1801 &mut self,
1802 storage_metadata: &StorageMetadata,
1803 identifiers: Vec<GlobalId>,
1804 ts: Self::Timestamp,
1805 ) -> Result<(), StorageError<Self::Timestamp>> {
1806 let (table_write_ids, data_source_ids): (Vec<_>, Vec<_>) = identifiers
1808 .into_iter()
1809 .partition(|id| match self.collections[id].data_source {
1810 DataSource::Table { .. } => true,
1811 DataSource::IngestionExport { .. } | DataSource::Webhook => false,
1812 _ => panic!("identifier is not a table: {}", id),
1813 });
1814
1815 if table_write_ids.len() > 0 {
1817 let drop_notif = self
1818 .persist_table_worker
1819 .drop_handles(table_write_ids.clone(), ts);
1820 let tx = self.pending_table_handle_drops_tx.clone();
1821 mz_ore::task::spawn(|| "table-cleanup".to_string(), async move {
1822 drop_notif.await;
1823 for identifier in table_write_ids {
1824 let _ = tx.send(identifier);
1825 }
1826 });
1827 }
1828
1829 if data_source_ids.len() > 0 {
1831 self.validate_collection_ids(data_source_ids.iter().cloned())?;
1832 self.drop_sources_unvalidated(storage_metadata, data_source_ids)?;
1833 }
1834
1835 Ok(())
1836 }
1837
1838 fn drop_sources(
1839 &mut self,
1840 storage_metadata: &StorageMetadata,
1841 identifiers: Vec<GlobalId>,
1842 ) -> Result<(), StorageError<Self::Timestamp>> {
1843 self.validate_collection_ids(identifiers.iter().cloned())?;
1844 self.drop_sources_unvalidated(storage_metadata, identifiers)
1845 }
1846
1847 fn drop_sources_unvalidated(
1848 &mut self,
1849 storage_metadata: &StorageMetadata,
1850 ids: Vec<GlobalId>,
1851 ) -> Result<(), StorageError<Self::Timestamp>> {
1852 let mut ingestions_to_execute = BTreeSet::new();
1855 let mut ingestions_to_drop = BTreeSet::new();
1856 let mut source_statistics_to_drop = Vec::new();
1857
1858 let mut collections_to_drop = Vec::new();
1862
1863 for id in ids.iter() {
1864 let metadata = storage_metadata.get_collection_shard::<T>(*id);
1865 mz_ore::soft_assert_or_log!(
1866 matches!(metadata, Err(StorageError::IdentifierMissing(_))),
1867 "dropping {id}, but drop was not synchronized with storage \
1868 controller via `synchronize_collections`"
1869 );
1870
1871 let collection_state = self.collections.get(id);
1872
1873 if let Some(collection_state) = collection_state {
1874 match collection_state.data_source {
1875 DataSource::Webhook => {
1876 let fut = self.collection_manager.unregister_collection(*id);
1879 mz_ore::task::spawn(|| format!("storage-webhook-cleanup-{id}"), fut);
1880
1881 collections_to_drop.push(*id);
1882 source_statistics_to_drop.push(*id);
1883 }
1884 DataSource::Ingestion(_) => {
1885 ingestions_to_drop.insert(*id);
1886 source_statistics_to_drop.push(*id);
1887 }
1888 DataSource::IngestionExport { ingestion_id, .. } => {
1889 ingestions_to_execute.insert(ingestion_id);
1896
1897 let ingestion_state = match self.collections.get_mut(&ingestion_id) {
1899 Some(ingestion_collection) => ingestion_collection,
1900 None => {
1902 tracing::error!(
1903 "primary source {ingestion_id} seemingly dropped before subsource {id}"
1904 );
1905 continue;
1906 }
1907 };
1908
1909 match &mut ingestion_state.data_source {
1910 DataSource::Ingestion(ingestion_desc) => {
1911 let removed = ingestion_desc.source_exports.remove(id);
1912 mz_ore::soft_assert_or_log!(
1913 removed.is_some(),
1914 "dropped subsource {id} already removed from source exports"
1915 );
1916 }
1917 _ => unreachable!(
1918 "SourceExport must only refer to primary sources that already exist"
1919 ),
1920 };
1921
1922 ingestions_to_drop.insert(*id);
1926 source_statistics_to_drop.push(*id);
1927 }
1928 DataSource::Progress | DataSource::Table { .. } | DataSource::Other => {
1929 collections_to_drop.push(*id);
1930 }
1931 DataSource::Introspection(_) | DataSource::Sink { .. } => {
1932 soft_panic_or_log!(
1935 "drop_sources called on a {:?} (id={id}))",
1936 collection_state.data_source,
1937 );
1938 }
1939 }
1940 }
1941 }
1942
1943 ingestions_to_execute.retain(|id| !ingestions_to_drop.contains(id));
1945 for ingestion_id in ingestions_to_execute {
1946 self.run_ingestion(ingestion_id)?;
1947 }
1948
1949 let ingestion_policies = ingestions_to_drop
1956 .iter()
1957 .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
1958 .collect();
1959
1960 tracing::debug!(
1961 ?ingestion_policies,
1962 "dropping sources by setting read hold policies"
1963 );
1964 self.set_hold_policies(ingestion_policies);
1965
1966 let shards_to_update: BTreeSet<_> = ingestions_to_drop
1968 .iter()
1969 .chain(collections_to_drop.iter())
1970 .cloned()
1971 .collect();
1972 self.append_shard_mappings(shards_to_update.into_iter(), Diff::MINUS_ONE);
1973
1974 let status_now = mz_ore::now::to_datetime((self.now)());
1975 let mut status_updates = vec![];
1976 for id in ingestions_to_drop.iter() {
1977 status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
1978 }
1979
1980 if !self.read_only {
1981 self.append_status_introspection_updates(
1982 IntrospectionType::SourceStatusHistory,
1983 status_updates,
1984 );
1985 }
1986
1987 {
1988 let mut source_statistics = self.source_statistics.lock().expect("poisoned");
1989 for id in source_statistics_to_drop {
1990 source_statistics.source_statistics.remove(&id);
1991 source_statistics.webhook_statistics.remove(&id);
1992 }
1993 }
1994
1995 for id in ingestions_to_drop.iter().chain(collections_to_drop.iter()) {
1997 tracing::info!(%id, "dropping collection state");
1998 let collection = self
1999 .collections
2000 .remove(id)
2001 .expect("list populated after checking that self.collections contains it");
2002
2003 let instance = match &collection.extra_state {
2004 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
2005 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
2006 CollectionStateExtra::None => None,
2007 }
2008 .and_then(|i| self.instances.get(&i));
2009
2010 if let Some(instance) = instance {
2014 let active_replicas = instance.get_active_replicas_for_object(id);
2015 if !active_replicas.is_empty() {
2016 match &collection.data_source {
2023 DataSource::Ingestion(ingestion_desc) => {
2024 self.dropped_objects.insert(
2025 ingestion_desc.remap_collection_id,
2026 active_replicas.clone(),
2027 );
2028 }
2029 _ => {}
2030 }
2031
2032 self.dropped_objects.insert(*id, active_replicas);
2033 }
2034 }
2035 }
2036
2037 self.storage_collections
2039 .drop_collections_unvalidated(storage_metadata, ids);
2040
2041 Ok(())
2042 }
2043
2044 fn drop_sinks(
2046 &mut self,
2047 storage_metadata: &StorageMetadata,
2048 identifiers: Vec<GlobalId>,
2049 ) -> Result<(), StorageError<Self::Timestamp>> {
2050 self.validate_export_ids(identifiers.iter().cloned())?;
2051 self.drop_sinks_unvalidated(storage_metadata, identifiers);
2052 Ok(())
2053 }
2054
2055 fn drop_sinks_unvalidated(
2056 &mut self,
2057 storage_metadata: &StorageMetadata,
2058 mut sinks_to_drop: Vec<GlobalId>,
2059 ) {
2060 sinks_to_drop.retain(|id| self.export(*id).is_ok());
2062
2063 let drop_policy = sinks_to_drop
2070 .iter()
2071 .map(|id| (*id, ReadPolicy::ValidFrom(Antichain::new())))
2072 .collect();
2073
2074 tracing::debug!(
2075 ?drop_policy,
2076 "dropping sources by setting read hold policies"
2077 );
2078 self.set_hold_policies(drop_policy);
2079
2080 let status_now = mz_ore::now::to_datetime((self.now)());
2087
2088 let mut status_updates = vec![];
2090 {
2091 let mut sink_statistics = self.sink_statistics.lock().expect("poisoned");
2092 for id in sinks_to_drop.iter() {
2093 status_updates.push(StatusUpdate::new(*id, status_now, Status::Dropped));
2094 sink_statistics.remove(id);
2095 }
2096 }
2097
2098 if !self.read_only {
2099 self.append_status_introspection_updates(
2100 IntrospectionType::SinkStatusHistory,
2101 status_updates,
2102 );
2103 }
2104
2105 for id in sinks_to_drop.iter() {
2107 tracing::info!(%id, "dropping export state");
2108 let collection = self
2109 .collections
2110 .remove(id)
2111 .expect("list populated after checking that self.collections contains it");
2112
2113 let instance = match &collection.extra_state {
2114 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
2115 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
2116 CollectionStateExtra::None => None,
2117 }
2118 .and_then(|i| self.instances.get(&i));
2119
2120 if let Some(instance) = instance {
2124 let active_replicas = instance.get_active_replicas_for_object(id);
2125 if !active_replicas.is_empty() {
2126 self.dropped_objects.insert(*id, active_replicas);
2127 }
2128 }
2129 }
2130
2131 self.storage_collections
2133 .drop_collections_unvalidated(storage_metadata, sinks_to_drop);
2134 }
2135
2136 #[instrument(level = "debug")]
2137 fn append_table(
2138 &mut self,
2139 write_ts: Self::Timestamp,
2140 advance_to: Self::Timestamp,
2141 commands: Vec<(GlobalId, Vec<TableData>)>,
2142 ) -> Result<
2143 tokio::sync::oneshot::Receiver<Result<(), StorageError<Self::Timestamp>>>,
2144 StorageError<Self::Timestamp>,
2145 > {
2146 if self.read_only {
2147 if !commands
2150 .iter()
2151 .all(|(id, _)| id.is_system() && self.migrated_storage_collections.contains(id))
2152 {
2153 return Err(StorageError::ReadOnly);
2154 }
2155 }
2156
2157 for (id, updates) in commands.iter() {
2159 if !updates.is_empty() {
2160 if !write_ts.less_than(&advance_to) {
2161 return Err(StorageError::UpdateBeyondUpper(*id));
2162 }
2163 }
2164 }
2165
2166 Ok(self
2167 .persist_table_worker
2168 .append(write_ts, advance_to, commands))
2169 }
2170
2171 fn monotonic_appender(
2172 &self,
2173 id: GlobalId,
2174 ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>> {
2175 self.collection_manager.monotonic_appender(id)
2176 }
2177
2178 fn webhook_statistics(
2179 &self,
2180 id: GlobalId,
2181 ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>> {
2182 let source_statistics = self.source_statistics.lock().expect("poisoned");
2184 source_statistics
2185 .webhook_statistics
2186 .get(&id)
2187 .cloned()
2188 .ok_or(StorageError::IdentifierMissing(id))
2189 }
2190
2191 async fn ready(&mut self) {
2192 if self.maintenance_scheduled {
2193 return;
2194 }
2195
2196 if !self.pending_table_handle_drops_rx.is_empty() {
2197 return;
2198 }
2199
2200 tokio::select! {
2201 Some(m) = self.instance_response_rx.recv() => {
2202 self.stashed_responses.push(m);
2203 while let Ok(m) = self.instance_response_rx.try_recv() {
2204 self.stashed_responses.push(m);
2205 }
2206 }
2207 _ = self.maintenance_ticker.tick() => {
2208 self.maintenance_scheduled = true;
2209 },
2210 };
2211 }
2212
2213 #[instrument(level = "debug")]
2214 fn process(
2215 &mut self,
2216 storage_metadata: &StorageMetadata,
2217 ) -> Result<Option<Response<T>>, anyhow::Error> {
2218 if self.maintenance_scheduled {
2220 self.maintain();
2221 self.maintenance_scheduled = false;
2222 }
2223
2224 for instance in self.instances.values_mut() {
2225 instance.rehydrate_failed_replicas();
2226 }
2227
2228 let mut status_updates = vec![];
2229 let mut updated_frontiers = BTreeMap::new();
2230
2231 let stashed_responses = std::mem::take(&mut self.stashed_responses);
2233 for resp in stashed_responses {
2234 match resp {
2235 (_replica_id, StorageResponse::FrontierUpper(id, upper)) => {
2236 self.update_write_frontier(id, &upper);
2237 updated_frontiers.insert(id, upper);
2238 }
2239 (replica_id, StorageResponse::DroppedId(id)) => {
2240 let replica_id = replica_id.expect("DroppedId from unknown replica");
2241 if let Some(remaining_replicas) = self.dropped_objects.get_mut(&id) {
2242 remaining_replicas.remove(&replica_id);
2243 if remaining_replicas.is_empty() {
2244 self.dropped_objects.remove(&id);
2245 }
2246 } else {
2247 soft_panic_or_log!("unexpected DroppedId for {id}");
2248 }
2249 }
2250 (_replica_id, StorageResponse::StatisticsUpdates(source_stats, sink_stats)) => {
2251 {
2259 let mut shared_stats = self.source_statistics.lock().expect("poisoned");
2260 for stat in source_stats {
2261 shared_stats
2263 .source_statistics
2264 .entry(stat.id)
2265 .and_modify(|current| current.stat().incorporate(stat));
2266 }
2267 }
2268
2269 {
2270 let mut shared_stats = self.sink_statistics.lock().expect("poisoned");
2271 for stat in sink_stats {
2272 shared_stats
2274 .entry(stat.id)
2275 .and_modify(|current| current.stat().incorporate(stat));
2276 }
2277 }
2278 }
2279 (replica_id, StorageResponse::StatusUpdate(mut status_update)) => {
2280 match status_update.status {
2296 Status::Running => {
2297 let collection = self.collections.get_mut(&status_update.id);
2298 match collection {
2299 Some(collection) => {
2300 match collection.extra_state {
2301 CollectionStateExtra::Ingestion(
2302 ref mut ingestion_state,
2303 ) => {
2304 if ingestion_state.hydrated_on.is_empty() {
2305 tracing::debug!(ingestion_id = %status_update.id, "ingestion is hydrated");
2306 }
2307 ingestion_state.hydrated_on.insert(replica_id.expect(
2308 "replica id should be present for status running",
2309 ));
2310 }
2311 CollectionStateExtra::Export(_) => {
2312 }
2314 CollectionStateExtra::None => {
2315 }
2317 }
2318 }
2319 None => (), }
2322 }
2323 Status::Paused => {
2324 let collection = self.collections.get_mut(&status_update.id);
2325 match collection {
2326 Some(collection) => {
2327 match collection.extra_state {
2328 CollectionStateExtra::Ingestion(
2329 ref mut ingestion_state,
2330 ) => {
2331 tracing::debug!(ingestion_id = %status_update.id, "ingestion is now paused");
2338 ingestion_state.hydrated_on.clear();
2339 }
2340 CollectionStateExtra::Export(_) => {
2341 }
2343 CollectionStateExtra::None => {
2344 }
2346 }
2347 }
2348 None => (), }
2351 }
2352 _ => (),
2353 }
2354
2355 if let Some(id) = replica_id {
2357 status_update.replica_id = Some(id);
2358 }
2359 status_updates.push(status_update);
2360 }
2361 (_replica_id, StorageResponse::StagedBatches(batches)) => {
2362 for (ingestion_id, batches) in batches {
2363 match self.pending_oneshot_ingestions.remove(&ingestion_id) {
2364 Some(pending) => {
2365 if let Some(instance) = self.instances.get_mut(&pending.cluster_id)
2368 {
2369 instance
2370 .send(StorageCommand::CancelOneshotIngestion(ingestion_id));
2371 }
2372 (pending.result_tx)(batches)
2374 }
2375 None => mz_ore::soft_panic_or_log!("no sender for {ingestion_id}!"),
2378 }
2379 }
2380 }
2381 }
2382 }
2383
2384 self.record_status_updates(status_updates);
2385
2386 let mut dropped_table_ids = Vec::new();
2388 while let Ok(dropped_id) = self.pending_table_handle_drops_rx.try_recv() {
2389 dropped_table_ids.push(dropped_id);
2390 }
2391 if !dropped_table_ids.is_empty() {
2392 self.drop_sources(storage_metadata, dropped_table_ids)?;
2393 }
2394
2395 if updated_frontiers.is_empty() {
2396 Ok(None)
2397 } else {
2398 Ok(Some(Response::FrontierUpdates(
2399 updated_frontiers.into_iter().collect(),
2400 )))
2401 }
2402 }
2403
2404 async fn inspect_persist_state(
2405 &self,
2406 id: GlobalId,
2407 ) -> Result<serde_json::Value, anyhow::Error> {
2408 let collection = &self.storage_collections.collection_metadata(id)?;
2409 let client = self
2410 .persist
2411 .open(collection.persist_location.clone())
2412 .await?;
2413 let shard_state = client
2414 .inspect_shard::<Self::Timestamp>(&collection.data_shard)
2415 .await?;
2416 let json_state = serde_json::to_value(shard_state)?;
2417 Ok(json_state)
2418 }
2419
2420 fn append_introspection_updates(
2421 &mut self,
2422 type_: IntrospectionType,
2423 updates: Vec<(Row, Diff)>,
2424 ) {
2425 let id = self.introspection_ids[&type_];
2426 let updates = updates.into_iter().map(|update| update.into()).collect();
2427 self.collection_manager.blind_write(id, updates);
2428 }
2429
2430 fn append_status_introspection_updates(
2431 &mut self,
2432 type_: IntrospectionType,
2433 updates: Vec<StatusUpdate>,
2434 ) {
2435 let id = self.introspection_ids[&type_];
2436 let updates: Vec<_> = updates.into_iter().map(|update| update.into()).collect();
2437 if !updates.is_empty() {
2438 self.collection_manager.blind_write(id, updates);
2439 }
2440 }
2441
2442 fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp) {
2443 let id = self.introspection_ids[&type_];
2444 self.collection_manager.differential_write(id, op);
2445 }
2446
2447 fn append_only_introspection_tx(
2448 &self,
2449 type_: IntrospectionType,
2450 ) -> mpsc::UnboundedSender<(
2451 Vec<AppendOnlyUpdate>,
2452 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2453 )> {
2454 let id = self.introspection_ids[&type_];
2455 self.collection_manager.append_only_write_sender(id)
2456 }
2457
2458 fn differential_introspection_tx(
2459 &self,
2460 type_: IntrospectionType,
2461 ) -> mpsc::UnboundedSender<(
2462 StorageWriteOp,
2463 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
2464 )> {
2465 let id = self.introspection_ids[&type_];
2466 self.collection_manager.differential_write_sender(id)
2467 }
2468
2469 async fn real_time_recent_timestamp(
2470 &self,
2471 timestamp_objects: BTreeSet<GlobalId>,
2472 timeout: Duration,
2473 ) -> Result<
2474 BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>,
2475 StorageError<Self::Timestamp>,
2476 > {
2477 use mz_storage_types::sources::GenericSourceConnection;
2478
2479 let mut rtr_futures = BTreeMap::new();
2480
2481 for id in timestamp_objects.into_iter().filter(GlobalId::is_user) {
2483 let collection = match self.collection(id) {
2484 Ok(c) => c,
2485 Err(_) => continue,
2487 };
2488
2489 let (source_conn, remap_id) = match &collection.data_source {
2490 DataSource::Ingestion(IngestionDescription {
2491 desc: SourceDesc { connection, .. },
2492 remap_collection_id,
2493 ..
2494 }) => match connection {
2495 GenericSourceConnection::Kafka(_)
2496 | GenericSourceConnection::Postgres(_)
2497 | GenericSourceConnection::MySql(_)
2498 | GenericSourceConnection::SqlServer(_) => {
2499 (connection.clone(), *remap_collection_id)
2500 }
2501
2502 GenericSourceConnection::LoadGenerator(_) => continue,
2507 },
2508 _ => {
2510 continue;
2511 }
2512 };
2513
2514 let config = self.config().clone();
2516
2517 let read_handle = self.read_handle_for_snapshot(remap_id).await?;
2525
2526 let remap_read_hold = self
2529 .storage_collections
2530 .acquire_read_holds(vec![remap_id])
2531 .map_err(|_e| StorageError::ReadBeforeSince(remap_id))?
2532 .expect_element(|| "known to be exactly one");
2533
2534 let remap_as_of = remap_read_hold
2535 .since()
2536 .to_owned()
2537 .into_option()
2538 .ok_or(StorageError::ReadBeforeSince(remap_id))?;
2539
2540 rtr_futures.insert(
2541 id,
2542 tokio::time::timeout(timeout, async move {
2543 use mz_storage_types::sources::SourceConnection as _;
2544
2545 let as_of = Antichain::from_elem(remap_as_of);
2548 let remap_subscribe = read_handle
2549 .subscribe(as_of.clone())
2550 .await
2551 .map_err(|_| StorageError::ReadBeforeSince(remap_id))?;
2552
2553 tracing::debug!(?id, type_ = source_conn.name(), upstream = ?source_conn.external_reference(), "fetching real time recency");
2554
2555 let result = rtr::real_time_recency_ts(source_conn, id, config, as_of, remap_subscribe)
2556 .await.map_err(|e| {
2557 tracing::debug!(?id, "real time recency error: {:?}", e);
2558 e
2559 });
2560
2561 drop(remap_read_hold);
2563
2564 result
2565 }),
2566 );
2567 }
2568
2569 Ok(Box::pin(async move {
2570 let (ids, futs): (Vec<_>, Vec<_>) = rtr_futures.into_iter().unzip();
2571 ids.into_iter()
2572 .zip_eq(futures::future::join_all(futs).await)
2573 .try_fold(T::minimum(), |curr, (id, per_source_res)| {
2574 let new =
2575 per_source_res.map_err(|_e: Elapsed| StorageError::RtrTimeout(id))??;
2576 Ok::<_, StorageError<Self::Timestamp>>(std::cmp::max(curr, new))
2577 })
2578 }))
2579 }
2580}
2581
2582pub fn prepare_initialization<T>(txn: &mut dyn StorageTxn<T>) -> Result<(), StorageError<T>> {
2589 if txn.get_txn_wal_shard().is_none() {
2590 let txns_id = ShardId::new();
2591 txn.write_txn_wal_shard(txns_id)?;
2592 }
2593
2594 Ok(())
2595}
2596
2597impl<T> Controller<T>
2598where
2599 T: Timestamp
2600 + Lattice
2601 + TotalOrder
2602 + Codec64
2603 + From<EpochMillis>
2604 + TimestampManipulation
2605 + Into<Datum<'static>>,
2606 StorageCommand<T>: RustType<ProtoStorageCommand>,
2607 StorageResponse<T>: RustType<ProtoStorageResponse>,
2608 Self: StorageController<Timestamp = T>,
2609{
2610 pub async fn new(
2618 build_info: &'static BuildInfo,
2619 persist_location: PersistLocation,
2620 persist_clients: Arc<PersistClientCache>,
2621 now: NowFn,
2622 wallclock_lag: WallclockLagFn<T>,
2623 txns_metrics: Arc<TxnMetrics>,
2624 envd_epoch: NonZeroI64,
2625 read_only: bool,
2626 metrics_registry: &MetricsRegistry,
2627 controller_metrics: ControllerMetrics,
2628 connection_context: ConnectionContext,
2629 txn: &dyn StorageTxn<T>,
2630 storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
2631 ) -> Self {
2632 let txns_client = persist_clients
2633 .open(persist_location.clone())
2634 .await
2635 .expect("location should be valid");
2636
2637 let persist_warm_task = warm_persist_state_in_background(
2638 txns_client.clone(),
2639 txn.get_collection_metadata().into_values(),
2640 );
2641 let persist_warm_task = Some(persist_warm_task.abort_on_drop());
2642
2643 let txns_id = txn
2647 .get_txn_wal_shard()
2648 .expect("must call prepare initialization before creating storage controller");
2649
2650 let persist_table_worker = if read_only {
2651 let txns_write = txns_client
2652 .open_writer(
2653 txns_id,
2654 Arc::new(TxnsCodecRow::desc()),
2655 Arc::new(UnitSchema),
2656 Diagnostics {
2657 shard_name: "txns".to_owned(),
2658 handle_purpose: "follow txns upper".to_owned(),
2659 },
2660 )
2661 .await
2662 .expect("txns schema shouldn't change");
2663 persist_handles::PersistTableWriteWorker::new_read_only_mode(txns_write)
2664 } else {
2665 let txns = TxnsHandle::open(
2666 T::minimum(),
2667 txns_client.clone(),
2668 txns_client.dyncfgs().clone(),
2669 Arc::clone(&txns_metrics),
2670 txns_id,
2671 )
2672 .await;
2673 persist_handles::PersistTableWriteWorker::new_txns(txns)
2674 };
2675 let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
2676
2677 let collection_manager = collection_mgmt::CollectionManager::new(read_only, now.clone());
2678
2679 let introspection_ids = BTreeMap::new();
2680 let introspection_tokens = Arc::new(Mutex::new(BTreeMap::new()));
2681
2682 let (statistics_interval_sender, _) =
2683 channel(mz_storage_types::parameters::STATISTICS_INTERVAL_DEFAULT);
2684
2685 let (pending_table_handle_drops_tx, pending_table_handle_drops_rx) =
2686 tokio::sync::mpsc::unbounded_channel();
2687
2688 let mut maintenance_ticker = tokio::time::interval(Duration::from_secs(1));
2689 maintenance_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2690
2691 let (instance_response_tx, instance_response_rx) = mpsc::unbounded_channel();
2692
2693 let metrics = StorageControllerMetrics::new(metrics_registry, controller_metrics);
2694
2695 let now_dt = mz_ore::now::to_datetime(now());
2696
2697 Self {
2698 build_info,
2699 collections: BTreeMap::default(),
2700 dropped_objects: Default::default(),
2701 persist_table_worker,
2702 txns_read,
2703 txns_metrics,
2704 stashed_responses: vec![],
2705 pending_table_handle_drops_tx,
2706 pending_table_handle_drops_rx,
2707 pending_oneshot_ingestions: BTreeMap::default(),
2708 collection_manager,
2709 introspection_ids,
2710 introspection_tokens,
2711 now,
2712 envd_epoch,
2713 read_only,
2714 source_statistics: Arc::new(Mutex::new(statistics::SourceStatistics {
2715 source_statistics: BTreeMap::new(),
2716 webhook_statistics: BTreeMap::new(),
2717 })),
2718 sink_statistics: Arc::new(Mutex::new(BTreeMap::new())),
2719 statistics_interval_sender,
2720 instances: BTreeMap::new(),
2721 initialized: false,
2722 config: StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs()),
2723 persist_location,
2724 persist: persist_clients,
2725 metrics,
2726 recorded_frontiers: BTreeMap::new(),
2727 recorded_replica_frontiers: BTreeMap::new(),
2728 wallclock_lag,
2729 wallclock_lag_last_recorded: now_dt,
2730 storage_collections,
2731 migrated_storage_collections: BTreeSet::new(),
2732 maintenance_ticker,
2733 maintenance_scheduled: false,
2734 instance_response_rx,
2735 instance_response_tx,
2736 persist_warm_task,
2737 }
2738 }
2739
2740 #[instrument(level = "debug")]
2748 fn set_hold_policies(&mut self, policies: Vec<(GlobalId, ReadPolicy<T>)>) {
2749 let mut read_capability_changes = BTreeMap::default();
2750
2751 for (id, policy) in policies.into_iter() {
2752 if let Some(collection) = self.collections.get_mut(&id) {
2753 let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state
2754 {
2755 CollectionStateExtra::Ingestion(ingestion) => (
2756 ingestion.write_frontier.borrow(),
2757 &mut ingestion.derived_since,
2758 &mut ingestion.hold_policy,
2759 ),
2760 CollectionStateExtra::None => {
2761 unreachable!("set_hold_policies is only called for ingestions");
2762 }
2763 CollectionStateExtra::Export(export) => (
2764 export.write_frontier.borrow(),
2765 &mut export.derived_since,
2766 &mut export.read_policy,
2767 ),
2768 };
2769
2770 let new_derived_since = policy.frontier(write_frontier);
2771 let mut update = swap_updates(derived_since, new_derived_since);
2772 if !update.is_empty() {
2773 read_capability_changes.insert(id, update);
2774 }
2775
2776 *hold_policy = policy;
2777 }
2778 }
2779
2780 if !read_capability_changes.is_empty() {
2781 self.update_hold_capabilities(&mut read_capability_changes);
2782 }
2783 }
2784
2785 #[instrument(level = "debug", fields(updates))]
2786 fn update_write_frontier(&mut self, id: GlobalId, new_upper: &Antichain<T>) {
2787 let mut read_capability_changes = BTreeMap::default();
2788
2789 if let Some(collection) = self.collections.get_mut(&id) {
2790 let (write_frontier, derived_since, hold_policy) = match &mut collection.extra_state {
2791 CollectionStateExtra::Ingestion(ingestion) => (
2792 &mut ingestion.write_frontier,
2793 &mut ingestion.derived_since,
2794 &ingestion.hold_policy,
2795 ),
2796 CollectionStateExtra::None => {
2797 if matches!(collection.data_source, DataSource::Progress) {
2798 } else {
2800 tracing::error!(
2801 ?collection,
2802 ?new_upper,
2803 "updated write frontier for collection which is not an ingestion"
2804 );
2805 }
2806 return;
2807 }
2808 CollectionStateExtra::Export(export) => (
2809 &mut export.write_frontier,
2810 &mut export.derived_since,
2811 &export.read_policy,
2812 ),
2813 };
2814
2815 if PartialOrder::less_than(write_frontier, new_upper) {
2816 write_frontier.clone_from(new_upper);
2817 }
2818
2819 let new_derived_since = hold_policy.frontier(write_frontier.borrow());
2820 let mut update = swap_updates(derived_since, new_derived_since);
2821 if !update.is_empty() {
2822 read_capability_changes.insert(id, update);
2823 }
2824 } else if self.dropped_objects.contains_key(&id) {
2825 } else {
2828 soft_panic_or_log!("spurious upper update for {id}: {new_upper:?}");
2829 }
2830
2831 if !read_capability_changes.is_empty() {
2832 self.update_hold_capabilities(&mut read_capability_changes);
2833 }
2834 }
2835
2836 #[instrument(level = "debug", fields(updates))]
2840 fn update_hold_capabilities(&mut self, updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>) {
2841 let mut collections_net = BTreeMap::new();
2843
2844 while let Some(key) = updates.keys().rev().next().cloned() {
2849 let mut update = updates.remove(&key).unwrap();
2850
2851 if key.is_user() {
2852 debug!(id = %key, ?update, "update_hold_capability");
2853 }
2854
2855 if let Some(collection) = self.collections.get_mut(&key) {
2856 match &mut collection.extra_state {
2857 CollectionStateExtra::Ingestion(ingestion) => {
2858 let changes = ingestion.read_capabilities.update_iter(update.drain());
2859 update.extend(changes);
2860
2861 let (changes, frontier, _cluster_id) =
2862 collections_net.entry(key).or_insert_with(|| {
2863 (
2864 <ChangeBatch<_>>::new(),
2865 Antichain::new(),
2866 ingestion.instance_id,
2867 )
2868 });
2869
2870 changes.extend(update.drain());
2871 *frontier = ingestion.read_capabilities.frontier().to_owned();
2872 }
2873 CollectionStateExtra::None => {
2874 soft_panic_or_log!(
2876 "trying to update holds for collection {collection:?} which is not \
2877 an ingestion: {update:?}"
2878 );
2879 continue;
2880 }
2881 CollectionStateExtra::Export(export) => {
2882 let changes = export.read_capabilities.update_iter(update.drain());
2883 update.extend(changes);
2884
2885 let (changes, frontier, _cluster_id) =
2886 collections_net.entry(key).or_insert_with(|| {
2887 (<ChangeBatch<_>>::new(), Antichain::new(), export.cluster_id)
2888 });
2889
2890 changes.extend(update.drain());
2891 *frontier = export.read_capabilities.frontier().to_owned();
2892 }
2893 }
2894 } else {
2895 tracing::warn!(id = ?key, ?update, "update_hold_capabilities for unknown object");
2897 }
2898 }
2899
2900 for (key, (mut changes, frontier, cluster_id)) in collections_net {
2903 if !changes.is_empty() {
2904 if key.is_user() {
2905 debug!(id = %key, ?frontier, "downgrading ingestion read holds!");
2906 }
2907
2908 let collection = self
2909 .collections
2910 .get_mut(&key)
2911 .expect("missing collection state");
2912
2913 let read_holds = match &mut collection.extra_state {
2914 CollectionStateExtra::Ingestion(ingestion) => {
2915 ingestion.dependency_read_holds.as_mut_slice()
2916 }
2917 CollectionStateExtra::Export(export) => export.read_holds.as_mut_slice(),
2918 CollectionStateExtra::None => {
2919 soft_panic_or_log!(
2920 "trying to downgrade read holds for collection which is not an \
2921 ingestion: {collection:?}"
2922 );
2923 continue;
2924 }
2925 };
2926
2927 for read_hold in read_holds.iter_mut() {
2928 read_hold
2929 .try_downgrade(frontier.clone())
2930 .expect("we only advance the frontier");
2931 }
2932
2933 if let Some(instance) = self.instances.get_mut(&cluster_id) {
2935 instance.send(StorageCommand::AllowCompaction(key, frontier.clone()));
2936 } else {
2937 soft_panic_or_log!(
2938 "missing instance client for cluster {cluster_id} while we still have outstanding AllowCompaction command {frontier:?} for {key}"
2939 );
2940 }
2941 }
2942 }
2943 }
2944
2945 fn validate_collection_ids(
2947 &self,
2948 ids: impl Iterator<Item = GlobalId>,
2949 ) -> Result<(), StorageError<T>> {
2950 for id in ids {
2951 self.storage_collections.check_exists(id)?;
2952 }
2953 Ok(())
2954 }
2955
2956 fn validate_export_ids(
2958 &self,
2959 ids: impl Iterator<Item = GlobalId>,
2960 ) -> Result<(), StorageError<T>> {
2961 for id in ids {
2962 self.export(id)?;
2963 }
2964 Ok(())
2965 }
2966
2967 async fn open_data_handles(
2975 &self,
2976 id: &GlobalId,
2977 shard: ShardId,
2978 relation_desc: RelationDesc,
2979 persist_client: &PersistClient,
2980 ) -> WriteHandle<SourceData, (), T, StorageDiff> {
2981 let diagnostics = Diagnostics {
2982 shard_name: id.to_string(),
2983 handle_purpose: format!("controller data for {}", id),
2984 };
2985
2986 let mut write = persist_client
2987 .open_writer(
2988 shard,
2989 Arc::new(relation_desc),
2990 Arc::new(UnitSchema),
2991 diagnostics.clone(),
2992 )
2993 .await
2994 .expect("invalid persist usage");
2995
2996 write.fetch_recent_upper().await;
3005
3006 write
3007 }
3008
3009 fn register_introspection_collection(
3014 &mut self,
3015 id: GlobalId,
3016 introspection_type: IntrospectionType,
3017 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
3018 persist_client: PersistClient,
3019 ) -> Result<(), StorageError<T>> {
3020 tracing::info!(%id, ?introspection_type, "registering introspection collection");
3021
3022 let force_writable = self.read_only && self.migrated_storage_collections.contains(&id);
3026 if force_writable {
3027 assert!(id.is_system(), "unexpected non-system global id: {id:?}");
3028 info!("writing to migrated storage collection {id} in read-only mode");
3029 }
3030
3031 let prev = self.introspection_ids.insert(introspection_type, id);
3032 assert!(
3033 prev.is_none(),
3034 "cannot have multiple IDs for introspection type"
3035 );
3036
3037 let metadata = self.storage_collections.collection_metadata(id)?.clone();
3038
3039 let read_handle_fn = move || {
3040 let persist_client = persist_client.clone();
3041 let metadata = metadata.clone();
3042
3043 let fut = async move {
3044 let read_handle = persist_client
3045 .open_leased_reader::<SourceData, (), T, StorageDiff>(
3046 metadata.data_shard,
3047 Arc::new(metadata.relation_desc.clone()),
3048 Arc::new(UnitSchema),
3049 Diagnostics {
3050 shard_name: id.to_string(),
3051 handle_purpose: format!("snapshot {}", id),
3052 },
3053 USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3054 )
3055 .await
3056 .expect("invalid persist usage");
3057 read_handle
3058 };
3059
3060 fut.boxed()
3061 };
3062
3063 let recent_upper = write_handle.shared_upper();
3064
3065 match CollectionManagerKind::from(&introspection_type) {
3066 CollectionManagerKind::Differential => {
3071 let introspection_config = DifferentialIntrospectionConfig {
3073 recent_upper,
3074 introspection_type,
3075 storage_collections: Arc::clone(&self.storage_collections),
3076 collection_manager: self.collection_manager.clone(),
3077 source_statistics: Arc::clone(&self.source_statistics),
3078 sink_statistics: Arc::clone(&self.sink_statistics),
3079 statistics_interval: self.config.parameters.statistics_interval.clone(),
3080 statistics_interval_receiver: self.statistics_interval_sender.subscribe(),
3081 metrics: self.metrics.clone(),
3082 introspection_tokens: Arc::clone(&self.introspection_tokens),
3083 };
3084 self.collection_manager.register_differential_collection(
3085 id,
3086 write_handle,
3087 read_handle_fn,
3088 force_writable,
3089 introspection_config,
3090 );
3091 }
3092 CollectionManagerKind::AppendOnly => {
3100 let introspection_config = AppendOnlyIntrospectionConfig {
3101 introspection_type,
3102 config_set: Arc::clone(self.config.config_set()),
3103 parameters: self.config.parameters.clone(),
3104 storage_collections: Arc::clone(&self.storage_collections),
3105 };
3106 self.collection_manager.register_append_only_collection(
3107 id,
3108 write_handle,
3109 force_writable,
3110 Some(introspection_config),
3111 );
3112 }
3113 }
3114
3115 Ok(())
3116 }
3117
3118 fn reconcile_dangling_statistics(&self) {
3121 self.source_statistics
3122 .lock()
3123 .expect("poisoned")
3124 .source_statistics
3125 .retain(|k, _| self.storage_collections.check_exists(*k).is_ok());
3127 self.sink_statistics
3128 .lock()
3129 .expect("poisoned")
3130 .retain(|k, _| self.export(*k).is_ok());
3131 }
3132
3133 #[instrument(level = "debug")]
3143 fn append_shard_mappings<I>(&self, global_ids: I, diff: Diff)
3144 where
3145 I: Iterator<Item = GlobalId>,
3146 {
3147 mz_ore::soft_assert_or_log!(
3148 diff == Diff::MINUS_ONE || diff == Diff::ONE,
3149 "use 1 for insert or -1 for delete"
3150 );
3151
3152 let id = *self
3153 .introspection_ids
3154 .get(&IntrospectionType::ShardMapping)
3155 .expect("should be registered before this call");
3156
3157 let mut updates = vec![];
3158 let mut row_buf = Row::default();
3160
3161 for global_id in global_ids {
3162 let shard_id = if let Some(collection) = self.collections.get(&global_id) {
3163 collection.collection_metadata.data_shard.clone()
3164 } else {
3165 panic!("unknown global id: {}", global_id);
3166 };
3167
3168 let mut packer = row_buf.packer();
3169 packer.push(Datum::from(global_id.to_string().as_str()));
3170 packer.push(Datum::from(shard_id.to_string().as_str()));
3171 updates.push((row_buf.clone(), diff));
3172 }
3173
3174 self.collection_manager.differential_append(id, updates);
3175 }
3176
3177 fn determine_collection_dependencies(
3179 &self,
3180 self_id: GlobalId,
3181 data_source: &DataSource<T>,
3182 ) -> Result<Vec<GlobalId>, StorageError<T>> {
3183 let dependency = match &data_source {
3184 DataSource::Introspection(_)
3185 | DataSource::Webhook
3186 | DataSource::Table { primary: None }
3187 | DataSource::Progress
3188 | DataSource::Other => vec![],
3189 DataSource::Table {
3190 primary: Some(primary),
3191 } => vec![*primary],
3192 DataSource::IngestionExport { ingestion_id, .. } => {
3193 let source_collection = self.collection(*ingestion_id)?;
3196 let ingestion_remap_collection_id = match &source_collection.data_source {
3197 DataSource::Ingestion(ingestion) => ingestion.remap_collection_id,
3198 _ => unreachable!(
3199 "SourceExport must only refer to primary sources that already exist"
3200 ),
3201 };
3202
3203 vec![self_id, ingestion_remap_collection_id]
3209 }
3210 DataSource::Ingestion(ingestion) => {
3212 vec![self_id, ingestion.remap_collection_id]
3217 }
3218 DataSource::Sink { desc } => {
3219 vec![self_id, desc.sink.from]
3221 }
3222 };
3223
3224 Ok(dependency)
3225 }
3226
3227 async fn read_handle_for_snapshot(
3228 &self,
3229 id: GlobalId,
3230 ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
3231 let metadata = self.storage_collections.collection_metadata(id)?;
3232 read_handle_for_snapshot(&self.persist, id, &metadata).await
3233 }
3234
3235 fn record_status_updates(&mut self, updates: Vec<StatusUpdate>) {
3238 if self.read_only {
3239 return;
3240 }
3241
3242 let mut sink_status_updates = vec![];
3243 let mut source_status_updates = vec![];
3244
3245 for update in updates {
3246 let id = update.id;
3247 if self.export(id).is_ok() {
3248 sink_status_updates.push(update);
3249 } else if self.storage_collections.check_exists(id).is_ok() {
3250 source_status_updates.push(update);
3251 }
3252 }
3253
3254 self.append_status_introspection_updates(
3255 IntrospectionType::SourceStatusHistory,
3256 source_status_updates,
3257 );
3258 self.append_status_introspection_updates(
3259 IntrospectionType::SinkStatusHistory,
3260 sink_status_updates,
3261 );
3262 }
3263
3264 fn collection(&self, id: GlobalId) -> Result<&CollectionState<T>, StorageError<T>> {
3265 self.collections
3266 .get(&id)
3267 .ok_or(StorageError::IdentifierMissing(id))
3268 }
3269
3270 fn run_ingestion(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3273 tracing::info!(%id, "starting ingestion");
3274
3275 let collection = self.collection(id)?;
3276 let ingestion_description = match &collection.data_source {
3277 DataSource::Ingestion(i) => i.clone(),
3278 _ => {
3279 tracing::warn!("run_ingestion called on non-ingestion ID {}", id);
3280 Err(StorageError::IdentifierInvalid(id))?
3281 }
3282 };
3283
3284 let mut source_exports = BTreeMap::new();
3286 for (
3287 export_id,
3288 SourceExport {
3289 storage_metadata: (),
3290 details,
3291 data_config,
3292 },
3293 ) in ingestion_description.source_exports.clone()
3294 {
3295 let export_storage_metadata = self.collection(export_id)?.collection_metadata.clone();
3296 source_exports.insert(
3297 export_id,
3298 SourceExport {
3299 storage_metadata: export_storage_metadata,
3300 details,
3301 data_config,
3302 },
3303 );
3304 }
3305
3306 let description = IngestionDescription::<CollectionMetadata> {
3307 source_exports,
3308 ingestion_metadata: collection.collection_metadata.clone(),
3311 desc: ingestion_description.desc.clone(),
3313 instance_id: ingestion_description.instance_id,
3314 remap_collection_id: ingestion_description.remap_collection_id,
3315 };
3316
3317 let storage_instance_id = description.instance_id;
3318 let instance = self
3320 .instances
3321 .get_mut(&storage_instance_id)
3322 .ok_or_else(|| StorageError::IngestionInstanceMissing {
3323 storage_instance_id,
3324 ingestion_id: id,
3325 })?;
3326
3327 let augmented_ingestion = Box::new(RunIngestionCommand { id, description });
3328 instance.send(StorageCommand::RunIngestion(augmented_ingestion));
3329
3330 Ok(())
3331 }
3332
3333 fn run_export(&mut self, id: GlobalId) -> Result<(), StorageError<T>> {
3336 let DataSource::Sink { desc: description } = &self.collections[&id].data_source else {
3337 return Err(StorageError::IdentifierMissing(id));
3338 };
3339
3340 let from_storage_metadata = self
3341 .storage_collections
3342 .collection_metadata(description.sink.from)?;
3343 let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
3344
3345 let enable_snapshot_frontier =
3349 dyncfgs::STORAGE_SINK_SNAPSHOT_FRONTIER.get(self.config().config_set());
3350 let export_state = self.storage_collections.collection_frontiers(id)?;
3351 let mut as_of = description.sink.as_of.clone();
3352 as_of.join_assign(&export_state.implied_capability);
3353 let with_snapshot = if enable_snapshot_frontier
3354 && PartialOrder::less_than(&as_of, &export_state.write_frontier)
3355 {
3356 false
3357 } else {
3358 description.sink.with_snapshot
3359 };
3360
3361 info!(
3362 sink_id = %id,
3363 from_id = %description.sink.from,
3364 write_frontier = ?export_state.write_frontier,
3365 ?as_of,
3366 ?with_snapshot,
3367 "run_export"
3368 );
3369
3370 let cmd = RunSinkCommand {
3371 id,
3372 description: StorageSinkDesc {
3373 from: description.sink.from,
3374 from_desc: description.sink.from_desc.clone(),
3375 connection: description.sink.connection.clone(),
3376 envelope: description.sink.envelope,
3377 as_of,
3378 version: description.sink.version,
3379 from_storage_metadata,
3380 with_snapshot,
3381 to_storage_metadata,
3382 },
3383 };
3384
3385 let storage_instance_id = description.instance_id.clone();
3386
3387 let instance = self
3388 .instances
3389 .get_mut(&storage_instance_id)
3390 .ok_or_else(|| StorageError::ExportInstanceMissing {
3391 storage_instance_id,
3392 export_id: id,
3393 })?;
3394
3395 instance.send(StorageCommand::RunSink(Box::new(cmd)));
3396
3397 Ok(())
3398 }
3399
3400 fn update_frontier_introspection(&mut self) {
3405 let mut global_frontiers = BTreeMap::new();
3406 let mut replica_frontiers = BTreeMap::new();
3407
3408 for collection_frontiers in self.storage_collections.active_collection_frontiers() {
3409 let id = collection_frontiers.id;
3410 let since = collection_frontiers.read_capabilities;
3411 let upper = collection_frontiers.write_frontier;
3412
3413 let instance = self
3414 .collections
3415 .get(&id)
3416 .and_then(|collection_state| match &collection_state.extra_state {
3417 CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
3418 CollectionStateExtra::Export(export) => Some(export.cluster_id()),
3419 CollectionStateExtra::None => None,
3420 })
3421 .and_then(|i| self.instances.get(&i));
3422
3423 if let Some(instance) = instance {
3424 for replica_id in instance.replica_ids() {
3425 replica_frontiers.insert((id, replica_id), upper.clone());
3426 }
3427 }
3428
3429 global_frontiers.insert(id, (since, upper));
3430 }
3431
3432 let mut global_updates = Vec::new();
3433 let mut replica_updates = Vec::new();
3434
3435 let mut push_global_update =
3436 |id: GlobalId, (since, upper): (Antichain<T>, Antichain<T>), diff: Diff| {
3437 let read_frontier = since.into_option().map_or(Datum::Null, |t| t.into());
3438 let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3439 let row = Row::pack_slice(&[
3440 Datum::String(&id.to_string()),
3441 read_frontier,
3442 write_frontier,
3443 ]);
3444 global_updates.push((row, diff));
3445 };
3446
3447 let mut push_replica_update =
3448 |(id, replica_id): (GlobalId, ReplicaId), upper: Antichain<T>, diff: Diff| {
3449 let write_frontier = upper.into_option().map_or(Datum::Null, |t| t.into());
3450 let row = Row::pack_slice(&[
3451 Datum::String(&id.to_string()),
3452 Datum::String(&replica_id.to_string()),
3453 write_frontier,
3454 ]);
3455 replica_updates.push((row, diff));
3456 };
3457
3458 let mut old_global_frontiers =
3459 std::mem::replace(&mut self.recorded_frontiers, global_frontiers);
3460 for (&id, new) in &self.recorded_frontiers {
3461 match old_global_frontiers.remove(&id) {
3462 Some(old) if &old != new => {
3463 push_global_update(id, new.clone(), Diff::ONE);
3464 push_global_update(id, old, Diff::MINUS_ONE);
3465 }
3466 Some(_) => (),
3467 None => push_global_update(id, new.clone(), Diff::ONE),
3468 }
3469 }
3470 for (id, old) in old_global_frontiers {
3471 push_global_update(id, old, Diff::MINUS_ONE);
3472 }
3473
3474 let mut old_replica_frontiers =
3475 std::mem::replace(&mut self.recorded_replica_frontiers, replica_frontiers);
3476 for (&key, new) in &self.recorded_replica_frontiers {
3477 match old_replica_frontiers.remove(&key) {
3478 Some(old) if &old != new => {
3479 push_replica_update(key, new.clone(), Diff::ONE);
3480 push_replica_update(key, old, Diff::MINUS_ONE);
3481 }
3482 Some(_) => (),
3483 None => push_replica_update(key, new.clone(), Diff::ONE),
3484 }
3485 }
3486 for (key, old) in old_replica_frontiers {
3487 push_replica_update(key, old, Diff::MINUS_ONE);
3488 }
3489
3490 let id = self.introspection_ids[&IntrospectionType::Frontiers];
3491 self.collection_manager
3492 .differential_append(id, global_updates);
3493
3494 let id = self.introspection_ids[&IntrospectionType::ReplicaFrontiers];
3495 self.collection_manager
3496 .differential_append(id, replica_updates);
3497 }
3498
3499 fn refresh_wallclock_lag(&mut self) {
3508 self.maybe_record_wallclock_lag();
3510
3511 let now_ms = (self.now)();
3512 let histogram_period =
3513 WallclockLagHistogramPeriod::from_epoch_millis(now_ms, self.config.config_set());
3514
3515 let frontier_lag = |frontier: &Antichain<T>| match frontier.as_option() {
3516 Some(ts) => (self.wallclock_lag)(ts.clone()),
3517 None => Duration::ZERO,
3518 };
3519
3520 for frontiers in self.storage_collections.active_collection_frontiers() {
3521 let id = frontiers.id;
3522 let Some(collection) = self.collections.get_mut(&id) else {
3523 continue;
3524 };
3525
3526 let lag = frontier_lag(&frontiers.write_frontier);
3527 collection.wallclock_lag_max = std::cmp::max(collection.wallclock_lag_max, lag);
3528 collection.wallclock_lag_metrics.observe(lag);
3529
3530 if !ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION.get(self.config.config_set()) {
3531 continue;
3532 }
3533
3534 if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
3535 let bucket = lag.as_secs().next_power_of_two();
3536
3537 let instance_id = match &collection.extra_state {
3538 CollectionStateExtra::Ingestion(i) => Some(i.instance_id),
3539 CollectionStateExtra::Export(e) => Some(e.cluster_id()),
3540 CollectionStateExtra::None => None,
3541 };
3542 let workload_class = instance_id
3543 .and_then(|id| self.instances.get(&id))
3544 .and_then(|i| i.workload_class.clone());
3545 let labels = match workload_class {
3546 Some(wc) => [("workload_class", wc.clone())].into(),
3547 None => BTreeMap::new(),
3548 };
3549
3550 let key = (histogram_period, bucket, labels);
3551 *stash.entry(key).or_default() += Diff::ONE;
3552 }
3553 }
3554 }
3555
3556 fn maybe_record_wallclock_lag(&mut self) {
3564 if self.read_only {
3565 return;
3566 }
3567
3568 let duration_trunc = |datetime: DateTime<_>, interval| {
3569 let td = TimeDelta::from_std(interval).ok()?;
3570 datetime.duration_trunc(td).ok()
3571 };
3572
3573 let interval = WALLCLOCK_LAG_RECORDING_INTERVAL.get(self.config.config_set());
3574 let now_dt = mz_ore::now::to_datetime((self.now)());
3575 let now_trunc = duration_trunc(now_dt, interval).unwrap_or_else(|| {
3576 soft_panic_or_log!("excessive wallclock lag recording interval: {interval:?}");
3577 let default = WALLCLOCK_LAG_RECORDING_INTERVAL.default();
3578 duration_trunc(now_dt, *default).unwrap()
3579 });
3580 if now_trunc <= self.wallclock_lag_last_recorded {
3581 return;
3582 }
3583
3584 let now_ts: CheckedTimestamp<_> = now_trunc.try_into().expect("must fit");
3585
3586 let mut history_updates = Vec::new();
3587 let mut histogram_updates = Vec::new();
3588 let mut row_buf = Row::default();
3589 for frontiers in self.storage_collections.active_collection_frontiers() {
3590 let id = frontiers.id;
3591 let Some(collection) = self.collections.get_mut(&id) else {
3592 continue;
3593 };
3594
3595 let max_lag = std::mem::take(&mut collection.wallclock_lag_max);
3596 let max_lag_us = i64::try_from(max_lag.as_micros()).expect("must fit");
3597 let row = Row::pack_slice(&[
3598 Datum::String(&id.to_string()),
3599 Datum::Null,
3600 Datum::Interval(Interval::new(0, 0, max_lag_us)),
3601 Datum::TimestampTz(now_ts),
3602 ]);
3603 history_updates.push((row, Diff::ONE));
3604
3605 let Some(stash) = &mut collection.wallclock_lag_histogram_stash else {
3606 continue;
3607 };
3608
3609 for ((period, lag, labels), count) in std::mem::take(stash) {
3610 let mut packer = row_buf.packer();
3611 packer.extend([
3612 Datum::TimestampTz(period.start),
3613 Datum::TimestampTz(period.end),
3614 Datum::String(&id.to_string()),
3615 Datum::UInt64(lag),
3616 ]);
3617 let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
3618 packer.push_dict(labels);
3619
3620 histogram_updates.push((row_buf.clone(), count));
3621 }
3622 }
3623
3624 if !history_updates.is_empty() {
3625 self.append_introspection_updates(
3626 IntrospectionType::WallclockLagHistory,
3627 history_updates,
3628 );
3629 }
3630 if !histogram_updates.is_empty() {
3631 self.append_introspection_updates(
3632 IntrospectionType::WallclockLagHistogram,
3633 histogram_updates,
3634 );
3635 }
3636
3637 self.wallclock_lag_last_recorded = now_trunc;
3638 }
3639
3640 fn maintain(&mut self) {
3645 self.update_frontier_introspection();
3646 self.refresh_wallclock_lag();
3647
3648 for instance in self.instances.values_mut() {
3650 instance.refresh_state_metrics();
3651 }
3652 }
3653}
3654
3655impl From<&IntrospectionType> for CollectionManagerKind {
3656 fn from(value: &IntrospectionType) -> Self {
3657 match value {
3658 IntrospectionType::ShardMapping
3659 | IntrospectionType::Frontiers
3660 | IntrospectionType::ReplicaFrontiers
3661 | IntrospectionType::StorageSourceStatistics
3662 | IntrospectionType::StorageSinkStatistics
3663 | IntrospectionType::ComputeDependencies
3664 | IntrospectionType::ComputeOperatorHydrationStatus
3665 | IntrospectionType::ComputeMaterializedViewRefreshes
3666 | IntrospectionType::ComputeErrorCounts
3667 | IntrospectionType::ComputeHydrationTimes => CollectionManagerKind::Differential,
3668
3669 IntrospectionType::SourceStatusHistory
3670 | IntrospectionType::SinkStatusHistory
3671 | IntrospectionType::PrivatelinkConnectionStatusHistory
3672 | IntrospectionType::ReplicaStatusHistory
3673 | IntrospectionType::ReplicaMetricsHistory
3674 | IntrospectionType::WallclockLagHistory
3675 | IntrospectionType::WallclockLagHistogram
3676 | IntrospectionType::PreparedStatementHistory
3677 | IntrospectionType::StatementExecutionHistory
3678 | IntrospectionType::SessionHistory
3679 | IntrospectionType::StatementLifecycleHistory
3680 | IntrospectionType::SqlText => CollectionManagerKind::AppendOnly,
3681 }
3682 }
3683}
3684
3685async fn snapshot_statistics<T>(
3691 id: GlobalId,
3692 upper: Antichain<T>,
3693 storage_collections: &Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
3694) -> Vec<Row>
3695where
3696 T: Codec64 + From<EpochMillis> + TimestampManipulation,
3697{
3698 match upper.as_option() {
3699 Some(f) if f > &T::minimum() => {
3700 let as_of = f.step_back().unwrap();
3701
3702 let snapshot = storage_collections.snapshot(id, as_of).await.unwrap();
3703 snapshot
3704 .into_iter()
3705 .map(|(row, diff)| {
3706 assert_eq!(diff, 1);
3707 row
3708 })
3709 .collect()
3710 }
3711 _ => Vec::new(),
3714 }
3715}
3716
3717async fn read_handle_for_snapshot<T>(
3718 persist: &PersistClientCache,
3719 id: GlobalId,
3720 metadata: &CollectionMetadata,
3721) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>>
3722where
3723 T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
3724{
3725 let persist_client = persist
3726 .open(metadata.persist_location.clone())
3727 .await
3728 .unwrap();
3729
3730 let read_handle = persist_client
3735 .open_leased_reader::<SourceData, (), _, _>(
3736 metadata.data_shard,
3737 Arc::new(metadata.relation_desc.clone()),
3738 Arc::new(UnitSchema),
3739 Diagnostics {
3740 shard_name: id.to_string(),
3741 handle_purpose: format!("snapshot {}", id),
3742 },
3743 USE_CRITICAL_SINCE_SNAPSHOT.get(persist_client.dyncfgs()),
3744 )
3745 .await
3746 .expect("invalid persist usage");
3747 Ok(read_handle)
3748}
3749
3750#[derive(Debug)]
3752struct CollectionState<T: TimelyTimestamp> {
3753 pub data_source: DataSource<T>,
3755
3756 pub collection_metadata: CollectionMetadata,
3757
3758 pub extra_state: CollectionStateExtra<T>,
3759
3760 wallclock_lag_max: Duration,
3762 wallclock_lag_histogram_stash: Option<
3769 BTreeMap<
3770 (
3771 WallclockLagHistogramPeriod,
3772 u64,
3773 BTreeMap<&'static str, String>,
3774 ),
3775 Diff,
3776 >,
3777 >,
3778 wallclock_lag_metrics: WallclockLagMetrics,
3780}
3781
3782impl<T: TimelyTimestamp> CollectionState<T> {
3783 fn new(
3784 data_source: DataSource<T>,
3785 collection_metadata: CollectionMetadata,
3786 extra_state: CollectionStateExtra<T>,
3787 wallclock_lag_metrics: WallclockLagMetrics,
3788 ) -> Self {
3789 let wallclock_lag_histogram_stash = match &data_source {
3793 DataSource::Other => None,
3794 _ => Some(Default::default()),
3795 };
3796
3797 Self {
3798 data_source,
3799 collection_metadata,
3800 extra_state,
3801 wallclock_lag_max: Default::default(),
3802 wallclock_lag_histogram_stash,
3803 wallclock_lag_metrics,
3804 }
3805 }
3806}
3807
3808#[derive(Debug)]
3810enum CollectionStateExtra<T: TimelyTimestamp> {
3811 Ingestion(IngestionState<T>),
3812 Export(ExportState<T>),
3813 None,
3814}
3815
3816#[derive(Debug)]
3818struct IngestionState<T: TimelyTimestamp> {
3819 pub read_capabilities: MutableAntichain<T>,
3821
3822 pub derived_since: Antichain<T>,
3825
3826 pub dependency_read_holds: Vec<ReadHold<T>>,
3828
3829 pub write_frontier: Antichain<T>,
3831
3832 pub hold_policy: ReadPolicy<T>,
3839
3840 pub instance_id: StorageInstanceId,
3842
3843 pub hydrated_on: BTreeSet<ReplicaId>,
3845}
3846
3847struct StatusHistoryDesc<K> {
3852 retention_policy: StatusHistoryRetentionPolicy,
3853 extract_key: Box<dyn Fn(&[Datum]) -> K + Send>,
3854 extract_time: Box<dyn Fn(&[Datum]) -> CheckedTimestamp<DateTime<Utc>> + Send>,
3855}
3856enum StatusHistoryRetentionPolicy {
3857 LastN(usize),
3859 TimeWindow(Duration),
3861}
3862
3863fn source_status_history_desc(
3864 params: &StorageParameters,
3865) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3866 let desc = &MZ_SOURCE_STATUS_HISTORY_DESC;
3867 let (source_id_idx, _) = desc.get_by_name(&"source_id".into()).expect("exists");
3868 let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3869 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3870
3871 StatusHistoryDesc {
3872 retention_policy: StatusHistoryRetentionPolicy::LastN(
3873 params.keep_n_source_status_history_entries,
3874 ),
3875 extract_key: Box::new(move |datums| {
3876 (
3877 GlobalId::from_str(datums[source_id_idx].unwrap_str()).expect("GlobalId column"),
3878 if datums[replica_id_idx].is_null() {
3879 None
3880 } else {
3881 Some(
3882 ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3883 .expect("ReplicaId column"),
3884 )
3885 },
3886 )
3887 }),
3888 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3889 }
3890}
3891
3892fn sink_status_history_desc(
3893 params: &StorageParameters,
3894) -> StatusHistoryDesc<(GlobalId, Option<ReplicaId>)> {
3895 let desc = &MZ_SINK_STATUS_HISTORY_DESC;
3896 let (sink_id_idx, _) = desc.get_by_name(&"sink_id".into()).expect("exists");
3897 let (replica_id_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3898 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3899
3900 StatusHistoryDesc {
3901 retention_policy: StatusHistoryRetentionPolicy::LastN(
3902 params.keep_n_sink_status_history_entries,
3903 ),
3904 extract_key: Box::new(move |datums| {
3905 (
3906 GlobalId::from_str(datums[sink_id_idx].unwrap_str()).expect("GlobalId column"),
3907 if datums[replica_id_idx].is_null() {
3908 None
3909 } else {
3910 Some(
3911 ReplicaId::from_str(datums[replica_id_idx].unwrap_str())
3912 .expect("ReplicaId column"),
3913 )
3914 },
3915 )
3916 }),
3917 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3918 }
3919}
3920
3921fn privatelink_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<GlobalId> {
3922 let desc = &MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC;
3923 let (key_idx, _) = desc.get_by_name(&"connection_id".into()).expect("exists");
3924 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3925
3926 StatusHistoryDesc {
3927 retention_policy: StatusHistoryRetentionPolicy::LastN(
3928 params.keep_n_privatelink_status_history_entries,
3929 ),
3930 extract_key: Box::new(move |datums| {
3931 GlobalId::from_str(datums[key_idx].unwrap_str()).expect("GlobalId column")
3932 }),
3933 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3934 }
3935}
3936
3937fn replica_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<(GlobalId, u64)> {
3938 let desc = &REPLICA_STATUS_HISTORY_DESC;
3939 let (replica_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists");
3940 let (process_idx, _) = desc.get_by_name(&"process_id".into()).expect("exists");
3941 let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists");
3942
3943 StatusHistoryDesc {
3944 retention_policy: StatusHistoryRetentionPolicy::TimeWindow(
3945 params.replica_status_history_retention_window,
3946 ),
3947 extract_key: Box::new(move |datums| {
3948 (
3949 GlobalId::from_str(datums[replica_idx].unwrap_str()).expect("GlobalId column"),
3950 datums[process_idx].unwrap_uint64(),
3951 )
3952 }),
3953 extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()),
3954 }
3955}
3956
3957fn swap_updates<T: Timestamp>(
3959 from: &mut Antichain<T>,
3960 mut replace_with: Antichain<T>,
3961) -> ChangeBatch<T> {
3962 let mut update = ChangeBatch::new();
3963 if PartialOrder::less_equal(from, &replace_with) {
3964 update.extend(replace_with.iter().map(|time| (time.clone(), 1)));
3965 std::mem::swap(from, &mut replace_with);
3966 update.extend(replace_with.iter().map(|time| (time.clone(), -1)));
3967 }
3968 update
3969}