1use std::cmp::Reverse;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::num::NonZeroI64;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use async_trait::async_trait;
20use differential_dataflow::lattice::Lattice;
21use futures::future::BoxFuture;
22use futures::stream::{BoxStream, FuturesUnordered};
23use futures::{Future, FutureExt, StreamExt};
24use itertools::Itertools;
25use mz_ore::collections::CollectionExt;
26use mz_ore::metrics::MetricsRegistry;
27use mz_ore::now::{EpochMillis, NowFn};
28use mz_ore::task::AbortOnDropHandle;
29use mz_ore::{assert_none, instrument, soft_assert_or_log};
30use mz_persist_client::cache::PersistClientCache;
31use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
32use mz_persist_client::critical::SinceHandle;
33use mz_persist_client::read::{Cursor, ReadHandle};
34use mz_persist_client::schema::CaESchema;
35use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
36use mz_persist_client::write::WriteHandle;
37use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
38use mz_persist_types::Codec64;
39use mz_persist_types::codec_impls::UnitSchema;
40use mz_persist_types::txn::TxnsCodec;
41use mz_repr::{GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
42use mz_storage_types::StorageDiff;
43use mz_storage_types::configuration::StorageConfiguration;
44use mz_storage_types::connections::ConnectionContext;
45use mz_storage_types::controller::{CollectionMetadata, StorageError, TxnsCodecRow};
46use mz_storage_types::dyncfgs::STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION;
47use mz_storage_types::errors::CollectionMissing;
48use mz_storage_types::parameters::StorageParameters;
49use mz_storage_types::read_holds::ReadHold;
50use mz_storage_types::read_policy::ReadPolicy;
51use mz_storage_types::sources::{GenericSourceConnection, SourceData, SourceEnvelope, Timeline};
52use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
53use mz_txn_wal::metrics::Metrics as TxnMetrics;
54use mz_txn_wal::txn_read::{DataSnapshot, TxnsRead};
55use mz_txn_wal::txns::TxnsHandle;
56use timely::PartialOrder;
57use timely::order::TotalOrder;
58use timely::progress::frontier::MutableAntichain;
59use timely::progress::{Antichain, ChangeBatch, Timestamp as TimelyTimestamp};
60use tokio::sync::{mpsc, oneshot};
61use tokio::time::MissedTickBehavior;
62use tracing::{debug, info, trace, warn};
63
64use crate::client::TimestamplessUpdateBuilder;
65use crate::controller::{
66 CollectionDescription, DataSource, PersistEpoch, StorageMetadata, StorageTxn,
67};
68use crate::storage_collections::metrics::{ShardIdSet, StorageCollectionsMetrics};
69
70mod metrics;
71
72#[async_trait]
86pub trait StorageCollections: Debug + Sync {
87 type Timestamp: TimelyTimestamp;
88
89 async fn initialize_state(
96 &self,
97 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
98 init_ids: BTreeSet<GlobalId>,
99 ) -> Result<(), StorageError<Self::Timestamp>>;
100
101 fn update_parameters(&self, config_params: StorageParameters);
103
104 fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing>;
106
107 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
113
114 fn collection_frontiers(
116 &self,
117 id: GlobalId,
118 ) -> Result<CollectionFrontiers<Self::Timestamp>, CollectionMissing> {
119 let frontiers = self
120 .collections_frontiers(vec![id])?
121 .expect_element(|| "known to exist");
122
123 Ok(frontiers)
124 }
125
126 fn collections_frontiers(
129 &self,
130 id: Vec<GlobalId>,
131 ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, CollectionMissing>;
132
133 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>>;
138
139 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
142
143 async fn snapshot_stats(
146 &self,
147 id: GlobalId,
148 as_of: Antichain<Self::Timestamp>,
149 ) -> Result<SnapshotStats, StorageError<Self::Timestamp>>;
150
151 async fn snapshot_parts_stats(
160 &self,
161 id: GlobalId,
162 as_of: Antichain<Self::Timestamp>,
163 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>;
164
165 fn snapshot(
167 &self,
168 id: GlobalId,
169 as_of: Self::Timestamp,
170 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>;
171
172 async fn snapshot_latest(
175 &self,
176 id: GlobalId,
177 ) -> Result<Vec<Row>, StorageError<Self::Timestamp>>;
178
179 fn snapshot_cursor(
181 &self,
182 id: GlobalId,
183 as_of: Self::Timestamp,
184 ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
185 where
186 Self::Timestamp: Codec64 + TimelyTimestamp + Lattice;
187
188 fn snapshot_and_stream(
193 &self,
194 id: GlobalId,
195 as_of: Self::Timestamp,
196 ) -> BoxFuture<
197 'static,
198 Result<
199 BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
200 StorageError<Self::Timestamp>,
201 >,
202 >;
203
204 fn create_update_builder(
207 &self,
208 id: GlobalId,
209 ) -> BoxFuture<
210 'static,
211 Result<
212 TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
213 StorageError<Self::Timestamp>,
214 >,
215 >
216 where
217 Self::Timestamp: Lattice + Codec64;
218
219 async fn prepare_state(
225 &self,
226 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
227 ids_to_add: BTreeSet<GlobalId>,
228 ids_to_drop: BTreeSet<GlobalId>,
229 ids_to_register: BTreeMap<GlobalId, ShardId>,
230 ) -> Result<(), StorageError<Self::Timestamp>>;
231
232 async fn create_collections_for_bootstrap(
258 &self,
259 storage_metadata: &StorageMetadata,
260 register_ts: Option<Self::Timestamp>,
261 collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
262 migrated_storage_collections: &BTreeSet<GlobalId>,
263 ) -> Result<(), StorageError<Self::Timestamp>>;
264
265 async fn alter_table_desc(
267 &self,
268 existing_collection: GlobalId,
269 new_collection: GlobalId,
270 new_desc: RelationDesc,
271 expected_version: RelationVersion,
272 ) -> Result<(), StorageError<Self::Timestamp>>;
273
274 fn drop_collections_unvalidated(
286 &self,
287 storage_metadata: &StorageMetadata,
288 identifiers: Vec<GlobalId>,
289 );
290
291 fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>);
305
306 fn acquire_read_holds(
309 &self,
310 desired_holds: Vec<GlobalId>,
311 ) -> Result<Vec<ReadHold<Self::Timestamp>>, CollectionMissing>;
312
313 fn determine_time_dependence(
316 &self,
317 id: GlobalId,
318 ) -> Result<Option<TimeDependence>, TimeDependenceError>;
319
320 fn dump(&self) -> Result<serde_json::Value, anyhow::Error>;
322}
323
324pub struct SnapshotCursor<T: Codec64 + TimelyTimestamp + Lattice> {
327 pub _read_handle: ReadHandle<SourceData, (), T, StorageDiff>,
330 pub cursor: Cursor<SourceData, (), T, StorageDiff>,
331}
332
333impl<T: Codec64 + TimelyTimestamp + Lattice + Sync> SnapshotCursor<T> {
334 pub async fn next(
335 &mut self,
336 ) -> Option<impl Iterator<Item = (SourceData, T, StorageDiff)> + Sized + '_> {
337 let iter = self.cursor.next().await?;
338 Some(iter.map(|((k, ()), t, d)| (k, t, d)))
339 }
340}
341
342#[derive(Debug)]
344pub struct CollectionFrontiers<T> {
345 pub id: GlobalId,
347
348 pub write_frontier: Antichain<T>,
350
351 pub implied_capability: Antichain<T>,
358
359 pub read_capabilities: Antichain<T>,
362}
363
364#[derive(Debug, Clone)]
367pub struct StorageCollectionsImpl<
368 T: TimelyTimestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
369> {
370 envd_epoch: NonZeroI64,
373
374 read_only: bool,
380
381 finalizable_shards: Arc<ShardIdSet>,
384
385 finalized_shards: Arc<ShardIdSet>,
390
391 collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
393
394 txns_read: TxnsRead<T>,
396
397 config: Arc<Mutex<StorageConfiguration>>,
399
400 initial_txn_upper: Antichain<T>,
409
410 persist_location: PersistLocation,
412
413 persist: Arc<PersistClientCache>,
415
416 cmd_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
418
419 holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<T>)>,
421
422 _background_task: Arc<AbortOnDropHandle<()>>,
424 _finalize_shards_task: Arc<AbortOnDropHandle<()>>,
425}
426
427type SourceDataStream<T> = BoxStream<'static, (SourceData, T, StorageDiff)>;
436
437impl<T> StorageCollectionsImpl<T>
440where
441 T: TimelyTimestamp
442 + Lattice
443 + Codec64
444 + From<EpochMillis>
445 + TimestampManipulation
446 + Into<mz_repr::Timestamp>
447 + Sync,
448{
449 pub async fn new(
457 persist_location: PersistLocation,
458 persist_clients: Arc<PersistClientCache>,
459 metrics_registry: &MetricsRegistry,
460 _now: NowFn,
461 txns_metrics: Arc<TxnMetrics>,
462 envd_epoch: NonZeroI64,
463 read_only: bool,
464 connection_context: ConnectionContext,
465 txn: &dyn StorageTxn<T>,
466 ) -> Self {
467 let metrics = StorageCollectionsMetrics::register_into(metrics_registry);
468
469 let txns_id = txn
473 .get_txn_wal_shard()
474 .expect("must call prepare initialization before creating StorageCollections");
475
476 let txns_client = persist_clients
477 .open(persist_location.clone())
478 .await
479 .expect("location should be valid");
480
481 let _txns_handle: TxnsHandle<SourceData, (), T, StorageDiff, PersistEpoch, TxnsCodecRow> =
484 TxnsHandle::open(
485 T::minimum(),
486 txns_client.clone(),
487 txns_client.dyncfgs().clone(),
488 Arc::clone(&txns_metrics),
489 txns_id,
490 )
491 .await;
492
493 let (txns_key_schema, txns_val_schema) = TxnsCodecRow::schemas();
495 let mut txns_write = txns_client
496 .open_writer(
497 txns_id,
498 Arc::new(txns_key_schema),
499 Arc::new(txns_val_schema),
500 Diagnostics {
501 shard_name: "txns".to_owned(),
502 handle_purpose: "commit txns".to_owned(),
503 },
504 )
505 .await
506 .expect("txns schema shouldn't change");
507
508 let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
509
510 let collections = Arc::new(std::sync::Mutex::new(BTreeMap::default()));
511 let finalizable_shards =
512 Arc::new(ShardIdSet::new(metrics.finalization_outstanding.clone()));
513 let finalized_shards =
514 Arc::new(ShardIdSet::new(metrics.finalization_pending_commit.clone()));
515 let config = Arc::new(Mutex::new(StorageConfiguration::new(
516 connection_context,
517 mz_dyncfgs::all_dyncfgs(),
518 )));
519
520 let initial_txn_upper = txns_write.fetch_recent_upper().await.to_owned();
521
522 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
523 let (holds_tx, holds_rx) = mpsc::unbounded_channel();
524 let mut background_task = BackgroundTask {
525 config: Arc::clone(&config),
526 cmds_tx: cmd_tx.clone(),
527 cmds_rx: cmd_rx,
528 holds_rx,
529 collections: Arc::clone(&collections),
530 finalizable_shards: Arc::clone(&finalizable_shards),
531 shard_by_id: BTreeMap::new(),
532 since_handles: BTreeMap::new(),
533 txns_handle: Some(txns_write),
534 txns_shards: Default::default(),
535 };
536
537 let background_task =
538 mz_ore::task::spawn(|| "storage_collections::background_task", async move {
539 background_task.run().await
540 });
541
542 let finalize_shards_task = mz_ore::task::spawn(
543 || "storage_collections::finalize_shards_task",
544 finalize_shards_task::<T>(FinalizeShardsTaskConfig {
545 envd_epoch: envd_epoch.clone(),
546 config: Arc::clone(&config),
547 metrics,
548 finalizable_shards: Arc::clone(&finalizable_shards),
549 finalized_shards: Arc::clone(&finalized_shards),
550 persist_location: persist_location.clone(),
551 persist: Arc::clone(&persist_clients),
552 read_only,
553 }),
554 );
555
556 Self {
557 finalizable_shards,
558 finalized_shards,
559 collections,
560 txns_read,
561 envd_epoch,
562 read_only,
563 config,
564 initial_txn_upper,
565 persist_location,
566 persist: persist_clients,
567 cmd_tx,
568 holds_tx,
569 _background_task: Arc::new(background_task.abort_on_drop()),
570 _finalize_shards_task: Arc::new(finalize_shards_task.abort_on_drop()),
571 }
572 }
573
574 async fn open_data_handles(
582 &self,
583 id: &GlobalId,
584 shard: ShardId,
585 since: Option<&Antichain<T>>,
586 relation_desc: RelationDesc,
587 persist_client: &PersistClient,
588 ) -> (
589 WriteHandle<SourceData, (), T, StorageDiff>,
590 SinceHandleWrapper<T>,
591 ) {
592 let since_handle = if self.read_only {
593 let read_handle = self
594 .open_leased_handle(id, shard, relation_desc.clone(), since, persist_client)
595 .await;
596 SinceHandleWrapper::Leased(read_handle)
597 } else {
598 persist_client
601 .upgrade_version::<SourceData, (), T, StorageDiff>(
602 shard,
603 Diagnostics {
604 shard_name: id.to_string(),
605 handle_purpose: format!("controller data for {}", id),
606 },
607 )
608 .await
609 .expect("invalid persist usage");
610
611 let since_handle = self
612 .open_critical_handle(id, shard, since, persist_client)
613 .await;
614
615 SinceHandleWrapper::Critical(since_handle)
616 };
617
618 let mut write_handle = self
619 .open_write_handle(id, shard, relation_desc, persist_client)
620 .await;
621
622 write_handle.fetch_recent_upper().await;
633
634 (write_handle, since_handle)
635 }
636
637 async fn open_write_handle(
639 &self,
640 id: &GlobalId,
641 shard: ShardId,
642 relation_desc: RelationDesc,
643 persist_client: &PersistClient,
644 ) -> WriteHandle<SourceData, (), T, StorageDiff> {
645 let diagnostics = Diagnostics {
646 shard_name: id.to_string(),
647 handle_purpose: format!("controller data for {}", id),
648 };
649
650 let write = persist_client
651 .open_writer(
652 shard,
653 Arc::new(relation_desc),
654 Arc::new(UnitSchema),
655 diagnostics.clone(),
656 )
657 .await
658 .expect("invalid persist usage");
659
660 write
661 }
662
663 async fn open_critical_handle(
671 &self,
672 id: &GlobalId,
673 shard: ShardId,
674 since: Option<&Antichain<T>>,
675 persist_client: &PersistClient,
676 ) -> SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch> {
677 tracing::debug!(%id, ?since, "opening critical handle");
678
679 assert!(
680 !self.read_only,
681 "attempting to open critical SinceHandle in read-only mode"
682 );
683
684 let diagnostics = Diagnostics {
685 shard_name: id.to_string(),
686 handle_purpose: format!("controller data for {}", id),
687 };
688
689 let since_handle = {
692 let mut handle: SinceHandle<_, _, _, _, PersistEpoch> = persist_client
695 .open_critical_since(
696 shard,
697 PersistClient::CONTROLLER_CRITICAL_SINCE,
698 diagnostics.clone(),
699 )
700 .await
701 .expect("invalid persist usage");
702
703 let provided_since = match since {
707 Some(since) => since,
708 None => &Antichain::from_elem(T::minimum()),
709 };
710 let since = handle.since().join(provided_since);
711
712 let our_epoch = self.envd_epoch;
713
714 loop {
715 let current_epoch: PersistEpoch = handle.opaque().clone();
716
717 let unchecked_success = current_epoch.0.map(|e| e <= our_epoch).unwrap_or(true);
719
720 if unchecked_success {
721 let checked_success = handle
724 .compare_and_downgrade_since(
725 ¤t_epoch,
726 (&PersistEpoch::from(our_epoch), &since),
727 )
728 .await
729 .is_ok();
730 if checked_success {
731 break handle;
732 }
733 } else {
734 mz_ore::halt!("fenced by envd @ {current_epoch:?}. ours = {our_epoch}");
735 }
736 }
737 };
738
739 since_handle
740 }
741
742 async fn open_leased_handle(
748 &self,
749 id: &GlobalId,
750 shard: ShardId,
751 relation_desc: RelationDesc,
752 since: Option<&Antichain<T>>,
753 persist_client: &PersistClient,
754 ) -> ReadHandle<SourceData, (), T, StorageDiff> {
755 tracing::debug!(%id, ?since, "opening leased handle");
756
757 let diagnostics = Diagnostics {
758 shard_name: id.to_string(),
759 handle_purpose: format!("controller data for {}", id),
760 };
761
762 let use_critical_since = false;
763 let mut handle: ReadHandle<_, _, _, _> = persist_client
764 .open_leased_reader(
765 shard,
766 Arc::new(relation_desc),
767 Arc::new(UnitSchema),
768 diagnostics.clone(),
769 use_critical_since,
770 )
771 .await
772 .expect("invalid persist usage");
773
774 let provided_since = match since {
778 Some(since) => since,
779 None => &Antichain::from_elem(T::minimum()),
780 };
781 let since = handle.since().join(provided_since);
782
783 handle.downgrade_since(&since).await;
784
785 handle
786 }
787
788 fn register_handles(
789 &self,
790 id: GlobalId,
791 is_in_txns: bool,
792 since_handle: SinceHandleWrapper<T>,
793 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
794 ) {
795 self.send(BackgroundCmd::Register {
796 id,
797 is_in_txns,
798 since_handle,
799 write_handle,
800 });
801 }
802
803 fn send(&self, cmd: BackgroundCmd<T>) {
804 let _ = self.cmd_tx.send(cmd);
805 }
806
807 async fn snapshot_stats_inner(
808 &self,
809 id: GlobalId,
810 as_of: SnapshotStatsAsOf<T>,
811 ) -> Result<SnapshotStats, StorageError<T>> {
812 let (tx, rx) = oneshot::channel();
819 self.send(BackgroundCmd::SnapshotStats(id, as_of, tx));
820 rx.await.expect("BackgroundTask should be live").0.await
821 }
822
823 fn install_collection_dependency_read_holds_inner(
829 &self,
830 self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
831 id: GlobalId,
832 ) -> Result<(), StorageError<T>> {
833 let (deps, collection_implied_capability) = match self_collections.get(&id) {
834 Some(CollectionState {
835 storage_dependencies: deps,
836 implied_capability,
837 ..
838 }) => (deps.clone(), implied_capability),
839 _ => return Ok(()),
840 };
841
842 for dep in deps.iter() {
843 let dep_collection = self_collections
844 .get(dep)
845 .ok_or(StorageError::IdentifierMissing(id))?;
846
847 mz_ore::soft_assert_or_log!(
848 PartialOrder::less_equal(
849 &dep_collection.implied_capability,
850 collection_implied_capability
851 ),
852 "dependency since ({dep}@{:?}) cannot be in advance of dependent's since ({id}@{:?})",
853 dep_collection.implied_capability,
854 collection_implied_capability,
855 );
856 }
857
858 self.install_read_capabilities_inner(
859 self_collections,
860 id,
861 &deps,
862 collection_implied_capability.clone(),
863 )?;
864
865 Ok(())
866 }
867
868 fn determine_collection_dependencies(
870 self_collections: &BTreeMap<GlobalId, CollectionState<T>>,
871 source_id: GlobalId,
872 collection_desc: &CollectionDescription<T>,
873 ) -> Result<Vec<GlobalId>, StorageError<T>> {
874 let mut dependencies = Vec::new();
875
876 if let Some(id) = collection_desc.primary {
877 dependencies.push(id);
878 }
879
880 match &collection_desc.data_source {
881 DataSource::Introspection(_)
882 | DataSource::Webhook
883 | DataSource::Table
884 | DataSource::Progress
885 | DataSource::Other => (),
886 DataSource::IngestionExport {
887 ingestion_id,
888 data_config,
889 ..
890 } => {
891 let source = self_collections
894 .get(ingestion_id)
895 .ok_or(StorageError::IdentifierMissing(*ingestion_id))?;
896 let Some(remap_collection_id) = &source.ingestion_remap_collection_id else {
897 panic!("SourceExport must refer to a primary source that already exists");
898 };
899
900 match data_config.envelope {
901 SourceEnvelope::CdcV2 => (),
902 _ => dependencies.push(*remap_collection_id),
903 }
904 }
905 DataSource::Ingestion(ingestion) => {
907 if ingestion.remap_collection_id != source_id {
908 dependencies.push(ingestion.remap_collection_id);
909 }
910 }
911 DataSource::Sink { desc } => dependencies.push(desc.sink.from),
912 }
913
914 Ok(dependencies)
915 }
916
917 #[instrument(level = "debug")]
919 fn install_read_capabilities_inner(
920 &self,
921 self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
922 from_id: GlobalId,
923 storage_dependencies: &[GlobalId],
924 read_capability: Antichain<T>,
925 ) -> Result<(), StorageError<T>> {
926 let mut changes = ChangeBatch::new();
927 for time in read_capability.iter() {
928 changes.update(time.clone(), 1);
929 }
930
931 if tracing::span_enabled!(tracing::Level::TRACE) {
932 let user_capabilities = self_collections
934 .iter_mut()
935 .filter(|(id, _c)| id.is_user())
936 .map(|(id, c)| {
937 let updates = c.read_capabilities.updates().cloned().collect_vec();
938 (*id, c.implied_capability.clone(), updates)
939 })
940 .collect_vec();
941
942 trace!(
943 %from_id,
944 ?storage_dependencies,
945 ?read_capability,
946 ?user_capabilities,
947 "install_read_capabilities_inner");
948 }
949
950 let mut storage_read_updates = storage_dependencies
951 .iter()
952 .map(|id| (*id, changes.clone()))
953 .collect();
954
955 StorageCollectionsImpl::update_read_capabilities_inner(
956 &self.cmd_tx,
957 self_collections,
958 &mut storage_read_updates,
959 );
960
961 if tracing::span_enabled!(tracing::Level::TRACE) {
962 let user_capabilities = self_collections
964 .iter_mut()
965 .filter(|(id, _c)| id.is_user())
966 .map(|(id, c)| {
967 let updates = c.read_capabilities.updates().cloned().collect_vec();
968 (*id, c.implied_capability.clone(), updates)
969 })
970 .collect_vec();
971
972 trace!(
973 %from_id,
974 ?storage_dependencies,
975 ?read_capability,
976 ?user_capabilities,
977 "after install_read_capabilities_inner!");
978 }
979
980 Ok(())
981 }
982
983 async fn recent_upper(&self, id: GlobalId) -> Result<Antichain<T>, StorageError<T>> {
984 let metadata = &self.collection_metadata(id)?;
985 let persist_client = self
986 .persist
987 .open(metadata.persist_location.clone())
988 .await
989 .unwrap();
990 let diagnostics = Diagnostics {
993 shard_name: id.to_string(),
994 handle_purpose: format!("controller data for {}", id),
995 };
996 let write = persist_client
999 .open_writer::<SourceData, (), T, StorageDiff>(
1000 metadata.data_shard,
1001 Arc::new(metadata.relation_desc.clone()),
1002 Arc::new(UnitSchema),
1003 diagnostics.clone(),
1004 )
1005 .await
1006 .expect("invalid persist usage");
1007 Ok(write.shared_upper())
1008 }
1009
1010 async fn read_handle_for_snapshot(
1011 persist: Arc<PersistClientCache>,
1012 metadata: &CollectionMetadata,
1013 id: GlobalId,
1014 ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
1015 let persist_client = persist
1016 .open(metadata.persist_location.clone())
1017 .await
1018 .unwrap();
1019
1020 let read_handle = persist_client
1026 .open_leased_reader::<SourceData, (), _, _>(
1027 metadata.data_shard,
1028 Arc::new(metadata.relation_desc.clone()),
1029 Arc::new(UnitSchema),
1030 Diagnostics {
1031 shard_name: id.to_string(),
1032 handle_purpose: format!("snapshot {}", id),
1033 },
1034 USE_CRITICAL_SINCE_SNAPSHOT.get(&persist.cfg),
1035 )
1036 .await
1037 .expect("invalid persist usage");
1038 Ok(read_handle)
1039 }
1040
1041 fn snapshot(
1047 &self,
1048 id: GlobalId,
1049 as_of: T,
1050 txns_read: &TxnsRead<T>,
1051 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<T>>>
1052 where
1053 T: Codec64 + From<EpochMillis> + TimestampManipulation,
1054 {
1055 let metadata = match self.collection_metadata(id) {
1056 Ok(metadata) => metadata.clone(),
1057 Err(e) => return async { Err(e.into()) }.boxed(),
1058 };
1059 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1060 assert_eq!(txns_id, txns_read.txns_id());
1061 txns_read.clone()
1062 });
1063 let persist = Arc::clone(&self.persist);
1064 async move {
1065 let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1066 let contents = match txns_read {
1067 None => {
1068 read_handle
1070 .snapshot_and_fetch(Antichain::from_elem(as_of))
1071 .await
1072 }
1073 Some(txns_read) => {
1074 txns_read.update_gt(as_of.clone()).await;
1088 let data_snapshot = txns_read
1089 .data_snapshot(metadata.data_shard, as_of.clone())
1090 .await;
1091 data_snapshot.snapshot_and_fetch(&mut read_handle).await
1092 }
1093 };
1094 match contents {
1095 Ok(contents) => {
1096 let mut snapshot = Vec::with_capacity(contents.len());
1097 for ((data, _), _, diff) in contents {
1098 let row = data.0?;
1101 snapshot.push((row, diff));
1102 }
1103 Ok(snapshot)
1104 }
1105 Err(_) => Err(StorageError::ReadBeforeSince(id)),
1106 }
1107 }
1108 .boxed()
1109 }
1110
1111 fn snapshot_and_stream(
1112 &self,
1113 id: GlobalId,
1114 as_of: T,
1115 txns_read: &TxnsRead<T>,
1116 ) -> BoxFuture<'static, Result<SourceDataStream<T>, StorageError<T>>> {
1117 use futures::stream::StreamExt;
1118
1119 let metadata = match self.collection_metadata(id) {
1120 Ok(metadata) => metadata.clone(),
1121 Err(e) => return async { Err(e.into()) }.boxed(),
1122 };
1123 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1124 assert_eq!(txns_id, txns_read.txns_id());
1125 txns_read.clone()
1126 });
1127 let persist = Arc::clone(&self.persist);
1128
1129 async move {
1130 let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1131 let stream = match txns_read {
1132 None => {
1133 read_handle
1135 .snapshot_and_stream(Antichain::from_elem(as_of))
1136 .await
1137 .map_err(|_| StorageError::ReadBeforeSince(id))?
1138 .boxed()
1139 }
1140 Some(txns_read) => {
1141 txns_read.update_gt(as_of.clone()).await;
1142 let data_snapshot = txns_read
1143 .data_snapshot(metadata.data_shard, as_of.clone())
1144 .await;
1145 data_snapshot
1146 .snapshot_and_stream(&mut read_handle)
1147 .await
1148 .map_err(|_| StorageError::ReadBeforeSince(id))?
1149 .boxed()
1150 }
1151 };
1152
1153 let stream = stream.map(|((data, _v), t, d)| (data, t, d)).boxed();
1155 Ok(stream)
1156 }
1157 .boxed()
1158 }
1159
1160 fn set_read_policies_inner(
1161 &self,
1162 collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1163 policies: Vec<(GlobalId, ReadPolicy<T>)>,
1164 ) {
1165 trace!("set_read_policies: {:?}", policies);
1166
1167 let mut read_capability_changes = BTreeMap::default();
1168
1169 for (id, policy) in policies.into_iter() {
1170 let collection = match collections.get_mut(&id) {
1171 Some(c) => c,
1172 None => {
1173 panic!("Reference to absent collection {id}");
1174 }
1175 };
1176
1177 let mut new_read_capability = policy.frontier(collection.write_frontier.borrow());
1178
1179 if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
1180 let mut update = ChangeBatch::new();
1181 update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
1182 std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
1183 update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
1184 if !update.is_empty() {
1185 read_capability_changes.insert(id, update);
1186 }
1187 }
1188
1189 collection.read_policy = policy;
1190 }
1191
1192 for (id, changes) in read_capability_changes.iter() {
1193 if id.is_user() {
1194 trace!(%id, ?changes, "in set_read_policies, capability changes");
1195 }
1196 }
1197
1198 if !read_capability_changes.is_empty() {
1199 StorageCollectionsImpl::update_read_capabilities_inner(
1200 &self.cmd_tx,
1201 collections,
1202 &mut read_capability_changes,
1203 );
1204 }
1205 }
1206
1207 fn update_read_capabilities_inner(
1211 cmd_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
1212 collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1213 updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>,
1214 ) {
1215 let mut collections_net = BTreeMap::new();
1217
1218 while let Some(id) = updates.keys().rev().next().cloned() {
1223 let mut update = updates.remove(&id).unwrap();
1224
1225 if id.is_user() {
1226 trace!(id = ?id, update = ?update, "update_read_capabilities");
1227 }
1228
1229 let collection = if let Some(c) = collections.get_mut(&id) {
1230 c
1231 } else {
1232 let has_positive_updates = update.iter().any(|(_ts, diff)| *diff > 0);
1233 if has_positive_updates {
1234 panic!(
1235 "reference to absent collection {id} but we have positive updates: {:?}",
1236 update
1237 );
1238 } else {
1239 continue;
1242 }
1243 };
1244
1245 let current_read_capabilities = collection.read_capabilities.frontier().to_owned();
1246 for (time, diff) in update.iter() {
1247 assert!(
1248 collection.read_capabilities.count_for(time) + diff >= 0,
1249 "update {:?} for collection {id} would lead to negative \
1250 read capabilities, read capabilities before applying: {:?}",
1251 update,
1252 collection.read_capabilities
1253 );
1254
1255 if collection.read_capabilities.count_for(time) + diff > 0 {
1256 assert!(
1257 current_read_capabilities.less_equal(time),
1258 "update {:?} for collection {id} is trying to \
1259 install read capabilities before the current \
1260 frontier of read capabilities, read capabilities before applying: {:?}",
1261 update,
1262 collection.read_capabilities
1263 );
1264 }
1265 }
1266
1267 let changes = collection.read_capabilities.update_iter(update.drain());
1268 update.extend(changes);
1269
1270 if id.is_user() {
1271 trace!(
1272 %id,
1273 ?collection.storage_dependencies,
1274 ?update,
1275 "forwarding update to storage dependencies");
1276 }
1277
1278 for id in collection.storage_dependencies.iter() {
1279 updates
1280 .entry(*id)
1281 .or_insert_with(ChangeBatch::new)
1282 .extend(update.iter().cloned());
1283 }
1284
1285 let (changes, frontier) = collections_net
1286 .entry(id)
1287 .or_insert_with(|| (<ChangeBatch<_>>::new(), Antichain::new()));
1288
1289 changes.extend(update.drain());
1290 *frontier = collection.read_capabilities.frontier().to_owned();
1291 }
1292
1293 let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
1296 for (key, (mut changes, frontier)) in collections_net {
1297 if !changes.is_empty() {
1298 let collection = collections.get(&key).expect("must still exist");
1300 let should_emit_persist_compaction = collection.primary.is_none();
1301
1302 if frontier.is_empty() {
1303 info!(id = %key, "removing collection state because the since advanced to []!");
1304 collections.remove(&key).expect("must still exist");
1305 }
1306
1307 if should_emit_persist_compaction {
1308 persist_compaction_commands.push((key, frontier));
1309 }
1310 }
1311 }
1312
1313 if !persist_compaction_commands.is_empty() {
1314 cmd_tx
1315 .send(BackgroundCmd::DowngradeSince(persist_compaction_commands))
1316 .expect("cannot fail to send");
1317 }
1318 }
1319
1320 fn synchronize_finalized_shards(&self, storage_metadata: &StorageMetadata) {
1322 self.finalized_shards
1323 .lock()
1324 .retain(|shard| storage_metadata.unfinalized_shards.contains(shard));
1325 }
1326}
1327
1328#[async_trait]
1330impl<T> StorageCollections for StorageCollectionsImpl<T>
1331where
1332 T: TimelyTimestamp
1333 + Lattice
1334 + Codec64
1335 + From<EpochMillis>
1336 + TimestampManipulation
1337 + Into<mz_repr::Timestamp>
1338 + Sync,
1339{
1340 type Timestamp = T;
1341
1342 async fn initialize_state(
1343 &self,
1344 txn: &mut (dyn StorageTxn<T> + Send),
1345 init_ids: BTreeSet<GlobalId>,
1346 ) -> Result<(), StorageError<T>> {
1347 let metadata = txn.get_collection_metadata();
1348 let existing_metadata: BTreeSet<_> = metadata.into_iter().map(|(id, _)| id).collect();
1349
1350 let new_collections: BTreeSet<GlobalId> =
1352 init_ids.difference(&existing_metadata).cloned().collect();
1353
1354 self.prepare_state(
1355 txn,
1356 new_collections,
1357 BTreeSet::default(),
1358 BTreeMap::default(),
1359 )
1360 .await?;
1361
1362 let unfinalized_shards = txn.get_unfinalized_shards().into_iter().collect_vec();
1370
1371 info!(?unfinalized_shards, "initializing finalizable_shards");
1372
1373 self.finalizable_shards.lock().extend(unfinalized_shards);
1374
1375 Ok(())
1376 }
1377
1378 fn update_parameters(&self, config_params: StorageParameters) {
1379 config_params.dyncfg_updates.apply(self.persist.cfg());
1382
1383 self.config
1384 .lock()
1385 .expect("lock poisoned")
1386 .update(config_params);
1387 }
1388
1389 fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing> {
1390 let collections = self.collections.lock().expect("lock poisoned");
1391
1392 collections
1393 .get(&id)
1394 .map(|c| c.collection_metadata.clone())
1395 .ok_or(CollectionMissing(id))
1396 }
1397
1398 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
1399 let collections = self.collections.lock().expect("lock poisoned");
1400
1401 collections
1402 .iter()
1403 .filter(|(_id, c)| !c.is_dropped())
1404 .map(|(id, c)| (*id, c.collection_metadata.clone()))
1405 .collect()
1406 }
1407
1408 fn collections_frontiers(
1409 &self,
1410 ids: Vec<GlobalId>,
1411 ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, CollectionMissing> {
1412 if ids.is_empty() {
1413 return Ok(vec![]);
1414 }
1415
1416 let collections = self.collections.lock().expect("lock poisoned");
1417
1418 let res = ids
1419 .into_iter()
1420 .map(|id| {
1421 collections
1422 .get(&id)
1423 .map(|c| CollectionFrontiers {
1424 id: id.clone(),
1425 write_frontier: c.write_frontier.clone(),
1426 implied_capability: c.implied_capability.clone(),
1427 read_capabilities: c.read_capabilities.frontier().to_owned(),
1428 })
1429 .ok_or(CollectionMissing(id))
1430 })
1431 .collect::<Result<Vec<_>, _>>()?;
1432
1433 Ok(res)
1434 }
1435
1436 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
1437 let collections = self.collections.lock().expect("lock poisoned");
1438
1439 let res = collections
1440 .iter()
1441 .filter(|(_id, c)| !c.is_dropped())
1442 .map(|(id, c)| CollectionFrontiers {
1443 id: id.clone(),
1444 write_frontier: c.write_frontier.clone(),
1445 implied_capability: c.implied_capability.clone(),
1446 read_capabilities: c.read_capabilities.frontier().to_owned(),
1447 })
1448 .collect_vec();
1449
1450 res
1451 }
1452
1453 async fn snapshot_stats(
1454 &self,
1455 id: GlobalId,
1456 as_of: Antichain<Self::Timestamp>,
1457 ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
1458 let metadata = self.collection_metadata(id)?;
1459
1460 let as_of = match metadata.txns_shard.as_ref() {
1463 None => SnapshotStatsAsOf::Direct(as_of),
1464 Some(txns_id) => {
1465 assert_eq!(txns_id, self.txns_read.txns_id());
1466 let as_of = as_of
1467 .into_option()
1468 .expect("cannot read as_of the empty antichain");
1469 self.txns_read.update_gt(as_of.clone()).await;
1470 let data_snapshot = self
1471 .txns_read
1472 .data_snapshot(metadata.data_shard, as_of.clone())
1473 .await;
1474 SnapshotStatsAsOf::Txns(data_snapshot)
1475 }
1476 };
1477 self.snapshot_stats_inner(id, as_of).await
1478 }
1479
1480 async fn snapshot_parts_stats(
1481 &self,
1482 id: GlobalId,
1483 as_of: Antichain<Self::Timestamp>,
1484 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
1485 let metadata = {
1486 let self_collections = self.collections.lock().expect("lock poisoned");
1487
1488 let collection_metadata = self_collections
1489 .get(&id)
1490 .ok_or(StorageError::IdentifierMissing(id))
1491 .map(|c| c.collection_metadata.clone());
1492
1493 match collection_metadata {
1494 Ok(m) => m,
1495 Err(e) => return Box::pin(async move { Err(e) }),
1496 }
1497 };
1498
1499 let persist = Arc::clone(&self.persist);
1502 let read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await;
1503
1504 let data_snapshot = match (metadata, as_of.as_option()) {
1505 (
1506 CollectionMetadata {
1507 txns_shard: Some(txns_id),
1508 data_shard,
1509 ..
1510 },
1511 Some(as_of),
1512 ) => {
1513 assert_eq!(txns_id, *self.txns_read.txns_id());
1514 self.txns_read.update_gt(as_of.clone()).await;
1515 let data_snapshot = self
1516 .txns_read
1517 .data_snapshot(data_shard, as_of.clone())
1518 .await;
1519 Some(data_snapshot)
1520 }
1521 _ => None,
1522 };
1523
1524 Box::pin(async move {
1525 let read_handle = read_handle?;
1526 let result = match data_snapshot {
1527 Some(data_snapshot) => data_snapshot.snapshot_parts_stats(&read_handle).await,
1528 None => read_handle.snapshot_parts_stats(as_of).await,
1529 };
1530 read_handle.expire().await;
1531 result.map_err(|_| StorageError::ReadBeforeSince(id))
1532 })
1533 }
1534
1535 fn snapshot(
1541 &self,
1542 id: GlobalId,
1543 as_of: Self::Timestamp,
1544 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>> {
1545 self.snapshot(id, as_of, &self.txns_read)
1546 }
1547
1548 async fn snapshot_latest(
1549 &self,
1550 id: GlobalId,
1551 ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
1552 let upper = self.recent_upper(id).await?;
1553 let res = match upper.as_option() {
1554 Some(f) if f > &T::minimum() => {
1555 let as_of = f.step_back().unwrap();
1556
1557 let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap();
1558 snapshot
1559 .into_iter()
1560 .map(|(row, diff)| {
1561 assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
1562 row
1563 })
1564 .collect()
1565 }
1566 Some(_min) => {
1567 Vec::new()
1569 }
1570 _ => {
1573 return Err(StorageError::InvalidUsage(
1574 "collection closed, cannot determine a read timestamp based on the upper"
1575 .to_string(),
1576 ));
1577 }
1578 };
1579
1580 Ok(res)
1581 }
1582
1583 fn snapshot_cursor(
1584 &self,
1585 id: GlobalId,
1586 as_of: Self::Timestamp,
1587 ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
1588 where
1589 Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
1590 {
1591 let metadata = match self.collection_metadata(id) {
1592 Ok(metadata) => metadata.clone(),
1593 Err(e) => return async { Err(e.into()) }.boxed(),
1594 };
1595 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1596 assert_eq!(txns_id, self.txns_read.txns_id());
1599 self.txns_read.clone()
1600 });
1601 let persist = Arc::clone(&self.persist);
1602
1603 async move {
1605 let mut handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1606 let cursor = match txns_read {
1607 None => {
1608 let cursor = handle
1609 .snapshot_cursor(Antichain::from_elem(as_of), |_| true)
1610 .await
1611 .map_err(|_| StorageError::ReadBeforeSince(id))?;
1612 SnapshotCursor {
1613 _read_handle: handle,
1614 cursor,
1615 }
1616 }
1617 Some(txns_read) => {
1618 txns_read.update_gt(as_of.clone()).await;
1619 let data_snapshot = txns_read
1620 .data_snapshot(metadata.data_shard, as_of.clone())
1621 .await;
1622 let cursor = data_snapshot
1623 .snapshot_cursor(&mut handle, |_| true)
1624 .await
1625 .map_err(|_| StorageError::ReadBeforeSince(id))?;
1626 SnapshotCursor {
1627 _read_handle: handle,
1628 cursor,
1629 }
1630 }
1631 };
1632
1633 Ok(cursor)
1634 }
1635 .boxed()
1636 }
1637
1638 fn snapshot_and_stream(
1639 &self,
1640 id: GlobalId,
1641 as_of: Self::Timestamp,
1642 ) -> BoxFuture<
1643 'static,
1644 Result<
1645 BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
1646 StorageError<Self::Timestamp>,
1647 >,
1648 >
1649 where
1650 Self::Timestamp: TimelyTimestamp + Lattice + Codec64 + 'static,
1651 {
1652 self.snapshot_and_stream(id, as_of, &self.txns_read)
1653 }
1654
1655 fn create_update_builder(
1656 &self,
1657 id: GlobalId,
1658 ) -> BoxFuture<
1659 'static,
1660 Result<
1661 TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
1662 StorageError<Self::Timestamp>,
1663 >,
1664 > {
1665 let metadata = match self.collection_metadata(id) {
1666 Ok(m) => m,
1667 Err(e) => return Box::pin(async move { Err(e.into()) }),
1668 };
1669 let persist = Arc::clone(&self.persist);
1670
1671 async move {
1672 let persist_client = persist
1673 .open(metadata.persist_location.clone())
1674 .await
1675 .expect("invalid persist usage");
1676 let write_handle = persist_client
1677 .open_writer::<SourceData, (), Self::Timestamp, StorageDiff>(
1678 metadata.data_shard,
1679 Arc::new(metadata.relation_desc.clone()),
1680 Arc::new(UnitSchema),
1681 Diagnostics {
1682 shard_name: id.to_string(),
1683 handle_purpose: format!("create write batch {}", id),
1684 },
1685 )
1686 .await
1687 .expect("invalid persist usage");
1688 let builder = TimestamplessUpdateBuilder::new(&write_handle);
1689
1690 Ok(builder)
1691 }
1692 .boxed()
1693 }
1694
1695 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
1696 let collections = self.collections.lock().expect("lock poisoned");
1697
1698 if collections.contains_key(&id) {
1699 Ok(())
1700 } else {
1701 Err(StorageError::IdentifierMissing(id))
1702 }
1703 }
1704
1705 async fn prepare_state(
1706 &self,
1707 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
1708 ids_to_add: BTreeSet<GlobalId>,
1709 ids_to_drop: BTreeSet<GlobalId>,
1710 ids_to_register: BTreeMap<GlobalId, ShardId>,
1711 ) -> Result<(), StorageError<T>> {
1712 txn.insert_collection_metadata(
1713 ids_to_add
1714 .into_iter()
1715 .map(|id| (id, ShardId::new()))
1716 .collect(),
1717 )?;
1718 txn.insert_collection_metadata(ids_to_register)?;
1719
1720 let dropped_mappings = txn.delete_collection_metadata(ids_to_drop);
1722
1723 let dropped_shards = dropped_mappings
1724 .into_iter()
1725 .map(|(_id, shard)| shard)
1726 .collect();
1727
1728 txn.insert_unfinalized_shards(dropped_shards)?;
1729
1730 let finalized_shards = self.finalized_shards.lock().iter().copied().collect();
1733 txn.mark_shards_as_finalized(finalized_shards);
1734
1735 Ok(())
1736 }
1737
1738 #[instrument(level = "debug")]
1741 async fn create_collections_for_bootstrap(
1742 &self,
1743 storage_metadata: &StorageMetadata,
1744 register_ts: Option<Self::Timestamp>,
1745 mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
1746 migrated_storage_collections: &BTreeSet<GlobalId>,
1747 ) -> Result<(), StorageError<Self::Timestamp>> {
1748 let is_in_txns = |id, metadata: &CollectionMetadata| {
1749 metadata.txns_shard.is_some()
1750 && !(self.read_only && migrated_storage_collections.contains(&id))
1751 };
1752
1753 collections.sort_by_key(|(id, _)| *id);
1758 collections.dedup();
1759 for pos in 1..collections.len() {
1760 if collections[pos - 1].0 == collections[pos].0 {
1761 return Err(StorageError::CollectionIdReused(collections[pos].0));
1762 }
1763 }
1764
1765 let enriched_with_metadata = collections
1768 .into_iter()
1769 .map(|(id, description)| {
1770 let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
1771
1772 let txns_shard = description
1776 .data_source
1777 .in_txns()
1778 .then(|| *self.txns_read.txns_id());
1779
1780 let metadata = CollectionMetadata {
1781 persist_location: self.persist_location.clone(),
1782 data_shard,
1783 relation_desc: description.desc.clone(),
1784 txns_shard,
1785 };
1786
1787 Ok((id, description, metadata))
1788 })
1789 .collect_vec();
1790
1791 let persist_client = self
1793 .persist
1794 .open(self.persist_location.clone())
1795 .await
1796 .unwrap();
1797 let persist_client = &persist_client;
1798 use futures::stream::{StreamExt, TryStreamExt};
1801 let this = &*self;
1802 let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
1803 .map(|data: Result<_, StorageError<Self::Timestamp>>| {
1804 let register_ts = register_ts.clone();
1805 async move {
1806 let (id, description, metadata) = data?;
1807
1808 debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
1813
1814 let since = if description.primary.is_some() {
1818 None
1819 } else {
1820 description.since.as_ref()
1821 };
1822
1823 let (write, mut since_handle) = this
1824 .open_data_handles(
1825 &id,
1826 metadata.data_shard,
1827 since,
1828 metadata.relation_desc.clone(),
1829 persist_client,
1830 )
1831 .await;
1832
1833 match description.data_source {
1842 DataSource::Introspection(_)
1843 | DataSource::IngestionExport { .. }
1844 | DataSource::Webhook
1845 | DataSource::Ingestion(_)
1846 | DataSource::Progress
1847 | DataSource::Other => {}
1848 DataSource::Sink { .. } => {}
1849 DataSource::Table => {
1850 let register_ts = register_ts.expect(
1851 "caller should have provided a register_ts when creating a table",
1852 );
1853 if since_handle.since().elements() == &[T::minimum()]
1854 && !migrated_storage_collections.contains(&id)
1855 {
1856 debug!("advancing {} to initial since of {:?}", id, register_ts);
1857 let token = since_handle.opaque();
1858 let _ = since_handle
1859 .compare_and_downgrade_since(
1860 &token,
1861 (&token, &Antichain::from_elem(register_ts.clone())),
1862 )
1863 .await;
1864 }
1865 }
1866 }
1867
1868 Ok::<_, StorageError<Self::Timestamp>>((
1869 id,
1870 description,
1871 write,
1872 since_handle,
1873 metadata,
1874 ))
1875 }
1876 })
1877 .buffer_unordered(50)
1879 .try_collect()
1893 .await?;
1894
1895 #[derive(Ord, PartialOrd, Eq, PartialEq)]
1897 enum DependencyOrder {
1898 Table(Reverse<GlobalId>),
1900 Collection(GlobalId),
1902 Sink(GlobalId),
1904 }
1905 to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
1906 DataSource::Table => DependencyOrder::Table(Reverse(*id)),
1907 DataSource::Sink { .. } => DependencyOrder::Sink(*id),
1908 _ => DependencyOrder::Collection(*id),
1909 });
1910
1911 let mut self_collections = self.collections.lock().expect("lock poisoned");
1914
1915 for (id, description, write_handle, since_handle, metadata) in to_register {
1916 let write_frontier = write_handle.upper();
1917 let data_shard_since = since_handle.since().clone();
1918
1919 let storage_dependencies =
1921 Self::determine_collection_dependencies(&*self_collections, id, &description)?;
1922
1923 let initial_since = match storage_dependencies
1925 .iter()
1926 .at_most_one()
1927 .expect("should have at most one dependency")
1928 {
1929 Some(dep) => {
1930 let dependency_collection = self_collections
1931 .get(dep)
1932 .ok_or(StorageError::IdentifierMissing(*dep))?;
1933 let dependency_since = dependency_collection.implied_capability.clone();
1934
1935 if PartialOrder::less_than(&data_shard_since, &dependency_since) {
1946 mz_ore::soft_assert_or_log!(
1965 write_frontier.elements() == &[T::minimum()]
1966 || write_frontier.is_empty()
1967 || PartialOrder::less_than(&dependency_since, write_frontier),
1968 "dependency ({dep}) since has advanced past dependent ({id}) upper \n
1969 dependent ({id}): since {:?}, upper {:?} \n
1970 dependency ({dep}): since {:?}",
1971 data_shard_since,
1972 write_frontier,
1973 dependency_since
1974 );
1975
1976 dependency_since
1977 } else {
1978 data_shard_since
1979 }
1980 }
1981 None => data_shard_since,
1982 };
1983
1984 let time_dependence = {
1986 use DataSource::*;
1987 if let Some(timeline) = &description.timeline
1988 && *timeline != Timeline::EpochMilliseconds
1989 {
1990 None
1992 } else {
1993 match &description.data_source {
1994 Ingestion(ingestion) => {
1995 use GenericSourceConnection::*;
1996 match ingestion.desc.connection {
1997 Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => {
2000 Some(TimeDependence::default())
2001 }
2002 LoadGenerator(_) => None,
2004 }
2005 }
2006 IngestionExport { ingestion_id, .. } => {
2007 let c = self_collections.get(ingestion_id).expect("known to exist");
2008 c.time_dependence.clone()
2009 }
2010 Introspection(_) | Progress | Table { .. } | Webhook { .. } => {
2012 Some(TimeDependence::default())
2013 }
2014 Other => None,
2016 Sink { .. } => None,
2017 }
2018 }
2019 };
2020
2021 let ingestion_remap_collection_id = match &description.data_source {
2022 DataSource::Ingestion(desc) => Some(desc.remap_collection_id),
2023 _ => None,
2024 };
2025
2026 let mut collection_state = CollectionState::new(
2027 description.primary,
2028 time_dependence,
2029 ingestion_remap_collection_id,
2030 initial_since,
2031 write_frontier.clone(),
2032 storage_dependencies,
2033 metadata.clone(),
2034 );
2035
2036 match &description.data_source {
2038 DataSource::Introspection(_) => {
2039 self_collections.insert(id, collection_state);
2040 }
2041 DataSource::Webhook => {
2042 self_collections.insert(id, collection_state);
2043 }
2044 DataSource::IngestionExport { .. } => {
2045 self_collections.insert(id, collection_state);
2046 }
2047 DataSource::Table => {
2048 if is_in_txns(id, &metadata)
2051 && PartialOrder::less_than(
2052 &collection_state.write_frontier,
2053 &self.initial_txn_upper,
2054 )
2055 {
2056 collection_state
2062 .write_frontier
2063 .clone_from(&self.initial_txn_upper);
2064 }
2065 self_collections.insert(id, collection_state);
2066 }
2067 DataSource::Progress | DataSource::Other => {
2068 self_collections.insert(id, collection_state);
2069 }
2070 DataSource::Ingestion(_) => {
2071 self_collections.insert(id, collection_state);
2072 }
2073 DataSource::Sink { .. } => {
2074 self_collections.insert(id, collection_state);
2075 }
2076 }
2077
2078 self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle);
2079
2080 self.install_collection_dependency_read_holds_inner(&mut *self_collections, id)?;
2082 }
2083
2084 drop(self_collections);
2085
2086 self.synchronize_finalized_shards(storage_metadata);
2087
2088 Ok(())
2089 }
2090
2091 async fn alter_table_desc(
2092 &self,
2093 existing_collection: GlobalId,
2094 new_collection: GlobalId,
2095 new_desc: RelationDesc,
2096 expected_version: RelationVersion,
2097 ) -> Result<(), StorageError<Self::Timestamp>> {
2098 let data_shard = {
2099 let self_collections = self.collections.lock().expect("lock poisoned");
2100 let existing = self_collections
2101 .get(&existing_collection)
2102 .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;
2103
2104 existing.collection_metadata.data_shard
2105 };
2106
2107 let persist_client = self
2108 .persist
2109 .open(self.persist_location.clone())
2110 .await
2111 .unwrap();
2112
2113 let diagnostics = Diagnostics {
2115 shard_name: existing_collection.to_string(),
2116 handle_purpose: "alter_table_desc".to_string(),
2117 };
2118 let expected_schema = expected_version.into();
2120 let schema_result = persist_client
2121 .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
2122 data_shard,
2123 expected_schema,
2124 &new_desc,
2125 &UnitSchema,
2126 diagnostics,
2127 )
2128 .await
2129 .map_err(|e| StorageError::InvalidUsage(e.to_string()))?;
2130 tracing::info!(
2131 ?existing_collection,
2132 ?new_collection,
2133 ?new_desc,
2134 "evolved schema"
2135 );
2136
2137 match schema_result {
2138 CaESchema::Ok(id) => id,
2139 CaESchema::ExpectedMismatch {
2141 schema_id,
2142 key,
2143 val,
2144 } => {
2145 mz_ore::soft_panic_or_log!(
2146 "schema expectation mismatch {schema_id:?}, {key:?}, {val:?}"
2147 );
2148 return Err(StorageError::Generic(anyhow::anyhow!(
2149 "schema expected mismatch, {existing_collection:?}",
2150 )));
2151 }
2152 CaESchema::Incompatible => {
2153 mz_ore::soft_panic_or_log!(
2154 "incompatible schema! {existing_collection} {new_desc:?}"
2155 );
2156 return Err(StorageError::Generic(anyhow::anyhow!(
2157 "schema incompatible, {existing_collection:?}"
2158 )));
2159 }
2160 };
2161
2162 let (write_handle, since_handle) = self
2164 .open_data_handles(
2165 &new_collection,
2166 data_shard,
2167 None,
2168 new_desc.clone(),
2169 &persist_client,
2170 )
2171 .await;
2172
2173 {
2179 let mut self_collections = self.collections.lock().expect("lock poisoned");
2180
2181 let existing = self_collections
2183 .get_mut(&existing_collection)
2184 .expect("existing collection missing");
2185
2186 assert_none!(existing.primary);
2188
2189 existing.primary = Some(new_collection);
2191 existing.storage_dependencies.push(new_collection);
2192
2193 let implied_capability = existing.read_capabilities.frontier().to_owned();
2197 let write_frontier = existing.write_frontier.clone();
2198
2199 let mut changes = ChangeBatch::new();
2206 changes.extend(implied_capability.iter().map(|t| (t.clone(), 1)));
2207
2208 let collection_meta = CollectionMetadata {
2210 persist_location: self.persist_location.clone(),
2211 relation_desc: new_desc.clone(),
2212 data_shard,
2213 txns_shard: Some(self.txns_read.txns_id().clone()),
2214 };
2215 let collection_state = CollectionState::new(
2216 None,
2217 existing.time_dependence.clone(),
2218 existing.ingestion_remap_collection_id.clone(),
2219 implied_capability,
2220 write_frontier,
2221 Vec::new(),
2222 collection_meta,
2223 );
2224
2225 self_collections.insert(new_collection, collection_state);
2227
2228 let mut updates = BTreeMap::from([(new_collection, changes)]);
2229 StorageCollectionsImpl::update_read_capabilities_inner(
2230 &self.cmd_tx,
2231 &mut *self_collections,
2232 &mut updates,
2233 );
2234 };
2235
2236 self.register_handles(new_collection, true, since_handle, write_handle);
2238
2239 info!(%existing_collection, %new_collection, ?new_desc, "altered table");
2240
2241 Ok(())
2242 }
2243
2244 fn drop_collections_unvalidated(
2245 &self,
2246 storage_metadata: &StorageMetadata,
2247 identifiers: Vec<GlobalId>,
2248 ) {
2249 debug!(?identifiers, "drop_collections_unvalidated");
2250
2251 let mut self_collections = self.collections.lock().expect("lock poisoned");
2252
2253 let mut finalized_policies = Vec::new();
2261
2262 for id in identifiers {
2263 let Some(collection) = self_collections.get(&id) else {
2265 continue;
2266 };
2267
2268 if collection.primary.is_none() {
2271 let metadata = storage_metadata.get_collection_shard::<T>(id);
2272 mz_ore::soft_assert_or_log!(
2273 matches!(metadata, Err(StorageError::IdentifierMissing(_))),
2274 "dropping {id}, but drop was not synchronized with storage \
2275 controller via `prepare_state`"
2276 );
2277 }
2278
2279 finalized_policies.push((id, ReadPolicy::ValidFrom(Antichain::new())));
2280 }
2281
2282 self.set_read_policies_inner(&mut self_collections, finalized_policies);
2283
2284 drop(self_collections);
2285
2286 self.synchronize_finalized_shards(storage_metadata);
2287 }
2288
2289 fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
2290 let mut collections = self.collections.lock().expect("lock poisoned");
2291
2292 if tracing::enabled!(tracing::Level::TRACE) {
2293 let user_capabilities = collections
2294 .iter_mut()
2295 .filter(|(id, _c)| id.is_user())
2296 .map(|(id, c)| {
2297 let updates = c.read_capabilities.updates().cloned().collect_vec();
2298 (*id, c.implied_capability.clone(), updates)
2299 })
2300 .collect_vec();
2301
2302 trace!(?policies, ?user_capabilities, "set_read_policies");
2303 }
2304
2305 self.set_read_policies_inner(&mut collections, policies);
2306
2307 if tracing::enabled!(tracing::Level::TRACE) {
2308 let user_capabilities = collections
2309 .iter_mut()
2310 .filter(|(id, _c)| id.is_user())
2311 .map(|(id, c)| {
2312 let updates = c.read_capabilities.updates().cloned().collect_vec();
2313 (*id, c.implied_capability.clone(), updates)
2314 })
2315 .collect_vec();
2316
2317 trace!(?user_capabilities, "after! set_read_policies");
2318 }
2319 }
2320
2321 fn acquire_read_holds(
2322 &self,
2323 desired_holds: Vec<GlobalId>,
2324 ) -> Result<Vec<ReadHold<Self::Timestamp>>, CollectionMissing> {
2325 if desired_holds.is_empty() {
2326 return Ok(vec![]);
2327 }
2328
2329 let mut collections = self.collections.lock().expect("lock poisoned");
2330
2331 let mut advanced_holds = Vec::new();
2332 for id in desired_holds.iter() {
2343 let collection = collections.get(id).ok_or(CollectionMissing(*id))?;
2344 let since = collection.read_capabilities.frontier().to_owned();
2345 advanced_holds.push((*id, since));
2346 }
2347
2348 let mut updates = advanced_holds
2349 .iter()
2350 .map(|(id, hold)| {
2351 let mut changes = ChangeBatch::new();
2352 changes.extend(hold.iter().map(|time| (time.clone(), 1)));
2353 (*id, changes)
2354 })
2355 .collect::<BTreeMap<_, _>>();
2356
2357 StorageCollectionsImpl::update_read_capabilities_inner(
2358 &self.cmd_tx,
2359 &mut collections,
2360 &mut updates,
2361 );
2362
2363 let acquired_holds = advanced_holds
2364 .into_iter()
2365 .map(|(id, since)| ReadHold::with_channel(id, since, self.holds_tx.clone()))
2366 .collect_vec();
2367
2368 trace!(?desired_holds, ?acquired_holds, "acquire_read_holds");
2369
2370 Ok(acquired_holds)
2371 }
2372
2373 fn determine_time_dependence(
2375 &self,
2376 id: GlobalId,
2377 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
2378 use TimeDependenceError::CollectionMissing;
2379 let collections = self.collections.lock().expect("lock poisoned");
2380 let state = collections.get(&id).ok_or(CollectionMissing(id))?;
2381 Ok(state.time_dependence.clone())
2382 }
2383
2384 fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2385 let Self {
2387 envd_epoch,
2388 read_only,
2389 finalizable_shards,
2390 finalized_shards,
2391 collections,
2392 txns_read: _,
2393 config,
2394 initial_txn_upper,
2395 persist_location,
2396 persist: _,
2397 cmd_tx: _,
2398 holds_tx: _,
2399 _background_task: _,
2400 _finalize_shards_task: _,
2401 } = self;
2402
2403 let finalizable_shards: Vec<_> = finalizable_shards
2404 .lock()
2405 .iter()
2406 .map(ToString::to_string)
2407 .collect();
2408 let finalized_shards: Vec<_> = finalized_shards
2409 .lock()
2410 .iter()
2411 .map(ToString::to_string)
2412 .collect();
2413 let collections: BTreeMap<_, _> = collections
2414 .lock()
2415 .expect("poisoned")
2416 .iter()
2417 .map(|(id, c)| (id.to_string(), format!("{c:?}")))
2418 .collect();
2419 let config = format!("{:?}", config.lock().expect("poisoned"));
2420
2421 Ok(serde_json::json!({
2422 "envd_epoch": envd_epoch,
2423 "read_only": read_only,
2424 "finalizable_shards": finalizable_shards,
2425 "finalized_shards": finalized_shards,
2426 "collections": collections,
2427 "config": config,
2428 "initial_txn_upper": initial_txn_upper,
2429 "persist_location": format!("{persist_location:?}"),
2430 }))
2431 }
2432}
2433
2434#[derive(Debug)]
2441enum SinceHandleWrapper<T>
2442where
2443 T: TimelyTimestamp + Lattice + Codec64,
2444{
2445 Critical(SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch>),
2446 Leased(ReadHandle<SourceData, (), T, StorageDiff>),
2447}
2448
2449impl<T> SinceHandleWrapper<T>
2450where
2451 T: TimelyTimestamp + Lattice + Codec64 + TotalOrder + Sync,
2452{
2453 pub fn since(&self) -> &Antichain<T> {
2454 match self {
2455 Self::Critical(handle) => handle.since(),
2456 Self::Leased(handle) => handle.since(),
2457 }
2458 }
2459
2460 pub fn opaque(&self) -> PersistEpoch {
2461 match self {
2462 Self::Critical(handle) => handle.opaque().clone(),
2463 Self::Leased(_handle) => {
2464 PersistEpoch(None)
2469 }
2470 }
2471 }
2472
2473 pub async fn compare_and_downgrade_since(
2474 &mut self,
2475 expected: &PersistEpoch,
2476 new: (&PersistEpoch, &Antichain<T>),
2477 ) -> Result<Antichain<T>, PersistEpoch> {
2478 match self {
2479 Self::Critical(handle) => handle.compare_and_downgrade_since(expected, new).await,
2480 Self::Leased(handle) => {
2481 let (opaque, since) = new;
2482 assert_none!(opaque.0);
2483
2484 handle.downgrade_since(since).await;
2485
2486 Ok(since.clone())
2487 }
2488 }
2489 }
2490
2491 pub async fn maybe_compare_and_downgrade_since(
2492 &mut self,
2493 expected: &PersistEpoch,
2494 new: (&PersistEpoch, &Antichain<T>),
2495 ) -> Option<Result<Antichain<T>, PersistEpoch>> {
2496 match self {
2497 Self::Critical(handle) => {
2498 handle
2499 .maybe_compare_and_downgrade_since(expected, new)
2500 .await
2501 }
2502 Self::Leased(handle) => {
2503 let (opaque, since) = new;
2504 assert_none!(opaque.0);
2505
2506 handle.maybe_downgrade_since(since).await;
2507
2508 Some(Ok(since.clone()))
2509 }
2510 }
2511 }
2512
2513 pub fn snapshot_stats(
2514 &self,
2515 id: GlobalId,
2516 as_of: Option<Antichain<T>>,
2517 ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2518 match self {
2519 Self::Critical(handle) => {
2520 let res = handle
2521 .snapshot_stats(as_of)
2522 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2523 Box::pin(res)
2524 }
2525 Self::Leased(handle) => {
2526 let res = handle
2527 .snapshot_stats(as_of)
2528 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2529 Box::pin(res)
2530 }
2531 }
2532 }
2533
2534 pub fn snapshot_stats_from_txn(
2535 &self,
2536 id: GlobalId,
2537 data_snapshot: DataSnapshot<T>,
2538 ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2539 match self {
2540 Self::Critical(handle) => Box::pin(
2541 data_snapshot
2542 .snapshot_stats_from_critical(handle)
2543 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2544 ),
2545 Self::Leased(handle) => Box::pin(
2546 data_snapshot
2547 .snapshot_stats_from_leased(handle)
2548 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2549 ),
2550 }
2551 }
2552}
2553
2554#[derive(Debug, Clone)]
2556struct CollectionState<T> {
2557 primary: Option<GlobalId>,
2564
2565 time_dependence: Option<TimeDependence>,
2567 ingestion_remap_collection_id: Option<GlobalId>,
2569
2570 pub read_capabilities: MutableAntichain<T>,
2576
2577 pub implied_capability: Antichain<T>,
2581
2582 pub read_policy: ReadPolicy<T>,
2584
2585 pub storage_dependencies: Vec<GlobalId>,
2587
2588 pub write_frontier: Antichain<T>,
2590
2591 pub collection_metadata: CollectionMetadata,
2592}
2593
2594impl<T: TimelyTimestamp> CollectionState<T> {
2595 pub fn new(
2598 primary: Option<GlobalId>,
2599 time_dependence: Option<TimeDependence>,
2600 ingestion_remap_collection_id: Option<GlobalId>,
2601 since: Antichain<T>,
2602 write_frontier: Antichain<T>,
2603 storage_dependencies: Vec<GlobalId>,
2604 metadata: CollectionMetadata,
2605 ) -> Self {
2606 let mut read_capabilities = MutableAntichain::new();
2607 read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2608 Self {
2609 primary,
2610 time_dependence,
2611 ingestion_remap_collection_id,
2612 read_capabilities,
2613 implied_capability: since.clone(),
2614 read_policy: ReadPolicy::NoPolicy {
2615 initial_since: since,
2616 },
2617 storage_dependencies,
2618 write_frontier,
2619 collection_metadata: metadata,
2620 }
2621 }
2622
2623 pub fn is_dropped(&self) -> bool {
2625 self.read_capabilities.is_empty()
2626 }
2627}
2628
2629#[derive(Debug)]
2635struct BackgroundTask<T: TimelyTimestamp + Lattice + Codec64> {
2636 config: Arc<Mutex<StorageConfiguration>>,
2637 cmds_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
2638 cmds_rx: mpsc::UnboundedReceiver<BackgroundCmd<T>>,
2639 holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch<T>)>,
2640 finalizable_shards: Arc<ShardIdSet>,
2641 collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
2642 shard_by_id: BTreeMap<GlobalId, ShardId>,
2645 since_handles: BTreeMap<GlobalId, SinceHandleWrapper<T>>,
2646 txns_handle: Option<WriteHandle<SourceData, (), T, StorageDiff>>,
2647 txns_shards: BTreeSet<GlobalId>,
2648}
2649
2650#[derive(Debug)]
2651enum BackgroundCmd<T: TimelyTimestamp + Lattice + Codec64> {
2652 Register {
2653 id: GlobalId,
2654 is_in_txns: bool,
2655 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
2656 since_handle: SinceHandleWrapper<T>,
2657 },
2658 DowngradeSince(Vec<(GlobalId, Antichain<T>)>),
2659 SnapshotStats(
2660 GlobalId,
2661 SnapshotStatsAsOf<T>,
2662 oneshot::Sender<SnapshotStatsRes<T>>,
2663 ),
2664}
2665
2666pub(crate) struct SnapshotStatsRes<T>(BoxFuture<'static, Result<SnapshotStats, StorageError<T>>>);
2668
2669impl<T> Debug for SnapshotStatsRes<T> {
2670 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2671 f.debug_struct("SnapshotStatsRes").finish_non_exhaustive()
2672 }
2673}
2674
2675impl<T> BackgroundTask<T>
2676where
2677 T: TimelyTimestamp
2678 + Lattice
2679 + Codec64
2680 + From<EpochMillis>
2681 + TimestampManipulation
2682 + Into<mz_repr::Timestamp>
2683 + Sync,
2684{
2685 async fn run(&mut self) {
2686 let mut upper_futures: FuturesUnordered<
2688 std::pin::Pin<
2689 Box<
2690 dyn Future<
2691 Output = (
2692 GlobalId,
2693 WriteHandle<SourceData, (), T, StorageDiff>,
2694 Antichain<T>,
2695 ),
2696 > + Send,
2697 >,
2698 >,
2699 > = FuturesUnordered::new();
2700
2701 let gen_upper_future =
2702 |id, mut handle: WriteHandle<_, _, _, _>, prev_upper: Antichain<T>| {
2703 let fut = async move {
2704 soft_assert_or_log!(
2705 !prev_upper.is_empty(),
2706 "cannot await progress when upper is already empty"
2707 );
2708 handle.wait_for_upper_past(&prev_upper).await;
2709 let new_upper = handle.shared_upper();
2710 (id, handle, new_upper)
2711 };
2712
2713 fut
2714 };
2715
2716 let mut txns_upper_future = match self.txns_handle.take() {
2717 Some(txns_handle) => {
2718 let upper = txns_handle.upper().clone();
2719 let txns_upper_future =
2720 gen_upper_future(GlobalId::Transient(1), txns_handle, upper);
2721 txns_upper_future.boxed()
2722 }
2723 None => async { std::future::pending().await }.boxed(),
2724 };
2725
2726 loop {
2727 tokio::select! {
2728 (id, handle, upper) = &mut txns_upper_future => {
2729 trace!("new upper from txns shard: {:?}", upper);
2730 let mut uppers = Vec::new();
2731 for id in self.txns_shards.iter() {
2732 uppers.push((*id, &upper));
2733 }
2734 self.update_write_frontiers(&uppers).await;
2735
2736 let fut = gen_upper_future(id, handle, upper);
2737 txns_upper_future = fut.boxed();
2738 }
2739 Some((id, handle, upper)) = upper_futures.next() => {
2740 if id.is_user() {
2741 trace!("new upper for collection {id}: {:?}", upper);
2742 }
2743 let current_shard = self.shard_by_id.get(&id);
2744 if let Some(shard_id) = current_shard {
2745 if shard_id == &handle.shard_id() {
2746 let uppers = &[(id, &upper)];
2749 self.update_write_frontiers(uppers).await;
2750 if !upper.is_empty() {
2751 let fut = gen_upper_future(id, handle, upper);
2752 upper_futures.push(fut.boxed());
2753 }
2754 } else {
2755 handle.expire().await;
2759 }
2760 }
2761 }
2762 cmd = self.cmds_rx.recv() => {
2763 let cmd = if let Some(cmd) = cmd {
2764 cmd
2765 } else {
2766 break;
2768 };
2769
2770 match cmd {
2771 BackgroundCmd::Register{ id, is_in_txns, write_handle, since_handle } => {
2772 debug!("registering handles for {}", id);
2773 let previous = self.shard_by_id.insert(id, write_handle.shard_id());
2774 if previous.is_some() {
2775 panic!("already registered a WriteHandle for collection {id}");
2776 }
2777
2778 let previous = self.since_handles.insert(id, since_handle);
2779 if previous.is_some() {
2780 panic!("already registered a SinceHandle for collection {id}");
2781 }
2782
2783 if is_in_txns {
2784 self.txns_shards.insert(id);
2785 } else {
2786 let upper = write_handle.upper().clone();
2787 if !upper.is_empty() {
2788 let fut = gen_upper_future(id, write_handle, upper);
2789 upper_futures.push(fut.boxed());
2790 }
2791 }
2792
2793 }
2794 BackgroundCmd::DowngradeSince(cmds) => {
2795 self.downgrade_sinces(cmds).await;
2796 }
2797 BackgroundCmd::SnapshotStats(id, as_of, tx) => {
2798 let res = match self.since_handles.get(&id) {
2804 Some(x) => {
2805 let fut: BoxFuture<
2806 'static,
2807 Result<SnapshotStats, StorageError<T>>,
2808 > = match as_of {
2809 SnapshotStatsAsOf::Direct(as_of) => {
2810 x.snapshot_stats(id, Some(as_of))
2811 }
2812 SnapshotStatsAsOf::Txns(data_snapshot) => {
2813 x.snapshot_stats_from_txn(id, data_snapshot)
2814 }
2815 };
2816 SnapshotStatsRes(fut)
2817 }
2818 None => SnapshotStatsRes(Box::pin(futures::future::ready(Err(
2819 StorageError::IdentifierMissing(id),
2820 )))),
2821 };
2822 let _ = tx.send(res);
2824 }
2825 }
2826 }
2827 Some(holds_changes) = self.holds_rx.recv() => {
2828 let mut batched_changes = BTreeMap::new();
2829 batched_changes.insert(holds_changes.0, holds_changes.1);
2830
2831 while let Ok(mut holds_changes) = self.holds_rx.try_recv() {
2832 let entry = batched_changes.entry(holds_changes.0);
2833 entry
2834 .and_modify(|existing| existing.extend(holds_changes.1.drain()))
2835 .or_insert_with(|| holds_changes.1);
2836 }
2837
2838 let mut collections = self.collections.lock().expect("lock poisoned");
2839
2840 let user_changes = batched_changes
2841 .iter()
2842 .filter(|(id, _c)| id.is_user())
2843 .map(|(id, c)| {
2844 (id.clone(), c.clone())
2845 })
2846 .collect_vec();
2847
2848 if !user_changes.is_empty() {
2849 trace!(?user_changes, "applying holds changes from channel");
2850 }
2851
2852 StorageCollectionsImpl::update_read_capabilities_inner(
2853 &self.cmds_tx,
2854 &mut collections,
2855 &mut batched_changes,
2856 );
2857 }
2858 }
2859 }
2860
2861 warn!("BackgroundTask shutting down");
2862 }
2863
2864 #[instrument(level = "debug")]
2865 async fn update_write_frontiers(&self, updates: &[(GlobalId, &Antichain<T>)]) {
2866 let mut read_capability_changes = BTreeMap::default();
2867
2868 let mut self_collections = self.collections.lock().expect("lock poisoned");
2869
2870 for (id, new_upper) in updates.iter() {
2871 let collection = if let Some(c) = self_collections.get_mut(id) {
2872 c
2873 } else {
2874 trace!(
2875 "Reference to absent collection {id}, due to concurrent removal of that collection"
2876 );
2877 continue;
2878 };
2879
2880 if PartialOrder::less_than(&collection.write_frontier, *new_upper) {
2881 collection.write_frontier.clone_from(new_upper);
2882 }
2883
2884 let mut new_read_capability = collection
2885 .read_policy
2886 .frontier(collection.write_frontier.borrow());
2887
2888 if id.is_user() {
2889 trace!(
2890 %id,
2891 implied_capability = ?collection.implied_capability,
2892 policy = ?collection.read_policy,
2893 write_frontier = ?collection.write_frontier,
2894 ?new_read_capability,
2895 "update_write_frontiers");
2896 }
2897
2898 if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
2899 let mut update = ChangeBatch::new();
2900 update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
2901 std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
2902 update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
2903
2904 if !update.is_empty() {
2905 read_capability_changes.insert(*id, update);
2906 }
2907 }
2908 }
2909
2910 if !read_capability_changes.is_empty() {
2911 StorageCollectionsImpl::update_read_capabilities_inner(
2912 &self.cmds_tx,
2913 &mut self_collections,
2914 &mut read_capability_changes,
2915 );
2916 }
2917 }
2918
2919 async fn downgrade_sinces(&mut self, cmds: Vec<(GlobalId, Antichain<T>)>) {
2920 for (id, new_since) in cmds {
2921 let since_handle = if let Some(c) = self.since_handles.get_mut(&id) {
2922 c
2923 } else {
2924 trace!("downgrade_sinces: reference to absent collection {id}");
2926 continue;
2927 };
2928
2929 if id.is_user() {
2930 trace!("downgrading since of {} to {:?}", id, new_since);
2931 }
2932
2933 let epoch = since_handle.opaque().clone();
2934 let result = if new_since.is_empty() {
2935 let res = Some(
2939 since_handle
2940 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2941 .await,
2942 );
2943
2944 info!(%id, "removing persist handles because the since advanced to []!");
2945
2946 let _since_handle = self.since_handles.remove(&id).expect("known to exist");
2947 let dropped_shard_id = if let Some(shard_id) = self.shard_by_id.remove(&id) {
2948 shard_id
2949 } else {
2950 panic!("missing GlobalId -> ShardId mapping for id {id}");
2951 };
2952
2953 self.txns_shards.remove(&id);
2958
2959 if !self
2960 .config
2961 .lock()
2962 .expect("lock poisoned")
2963 .parameters
2964 .finalize_shards
2965 {
2966 info!(
2967 "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
2968 );
2969 return;
2970 }
2971
2972 info!(%id, %dropped_shard_id, "enqueing shard finalization due to dropped collection and dropped persist handle");
2973
2974 self.finalizable_shards.lock().insert(dropped_shard_id);
2975
2976 res
2977 } else {
2978 since_handle
2979 .maybe_compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2980 .await
2981 };
2982
2983 if let Some(Err(other_epoch)) = result {
2984 mz_ore::halt!("fenced by envd @ {other_epoch:?}. ours = {epoch:?}");
2985 }
2986 }
2987 }
2988}
2989
2990struct FinalizeShardsTaskConfig {
2991 envd_epoch: NonZeroI64,
2992 config: Arc<Mutex<StorageConfiguration>>,
2993 metrics: StorageCollectionsMetrics,
2994 finalizable_shards: Arc<ShardIdSet>,
2995 finalized_shards: Arc<ShardIdSet>,
2996 persist_location: PersistLocation,
2997 persist: Arc<PersistClientCache>,
2998 read_only: bool,
2999}
3000
3001async fn finalize_shards_task<T>(
3002 FinalizeShardsTaskConfig {
3003 envd_epoch,
3004 config,
3005 metrics,
3006 finalizable_shards,
3007 finalized_shards,
3008 persist_location,
3009 persist,
3010 read_only,
3011 }: FinalizeShardsTaskConfig,
3012) where
3013 T: TimelyTimestamp + TotalOrder + Lattice + Codec64 + Sync,
3014{
3015 if read_only {
3016 info!("disabling shard finalization in read only mode");
3017 return;
3018 }
3019
3020 let mut interval = tokio::time::interval(Duration::from_secs(5));
3021 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
3022 loop {
3023 interval.tick().await;
3024
3025 if !config
3026 .lock()
3027 .expect("lock poisoned")
3028 .parameters
3029 .finalize_shards
3030 {
3031 debug!(
3032 "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3033 );
3034 continue;
3035 }
3036
3037 let current_finalizable_shards = {
3038 finalizable_shards.lock().iter().cloned().collect_vec()
3041 };
3042
3043 if current_finalizable_shards.is_empty() {
3044 debug!("no shards to finalize");
3045 continue;
3046 }
3047
3048 debug!(?current_finalizable_shards, "attempting to finalize shards");
3049
3050 let persist_client = persist.open(persist_location.clone()).await.unwrap();
3052
3053 let metrics = &metrics;
3054 let finalizable_shards = &finalizable_shards;
3055 let finalized_shards = &finalized_shards;
3056 let persist_client = &persist_client;
3057 let diagnostics = &Diagnostics::from_purpose("finalizing shards");
3058
3059 let force_downgrade_since = STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION
3060 .get(config.lock().expect("lock poisoned").config_set());
3061
3062 let epoch = &PersistEpoch::from(envd_epoch);
3063
3064 futures::stream::iter(current_finalizable_shards.clone())
3065 .map(|shard_id| async move {
3066 let persist_client = persist_client.clone();
3067 let diagnostics = diagnostics.clone();
3068 let epoch = epoch.clone();
3069
3070 metrics.finalization_started.inc();
3071
3072 let is_finalized = persist_client
3073 .is_finalized::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
3074 .await
3075 .expect("invalid persist usage");
3076
3077 if is_finalized {
3078 debug!(%shard_id, "shard is already finalized!");
3079 Some(shard_id)
3080 } else {
3081 debug!(%shard_id, "finalizing shard");
3082 let finalize = || async move {
3083 let diagnostics = Diagnostics::from_purpose("finalizing shards");
3085
3086 let mut write_handle: WriteHandle<SourceData, (), T, StorageDiff> =
3089 persist_client
3090 .open_writer(
3091 shard_id,
3092 Arc::new(RelationDesc::empty()),
3093 Arc::new(UnitSchema),
3094 diagnostics,
3095 )
3096 .await
3097 .expect("invalid persist usage");
3098 write_handle.advance_upper(&Antichain::new()).await;
3099 write_handle.expire().await;
3100
3101 if force_downgrade_since {
3102 let mut since_handle: SinceHandle<
3103 SourceData,
3104 (),
3105 T,
3106 StorageDiff,
3107 PersistEpoch,
3108 > = persist_client
3109 .open_critical_since(
3110 shard_id,
3111 PersistClient::CONTROLLER_CRITICAL_SINCE,
3112 Diagnostics::from_purpose("finalizing shards"),
3113 )
3114 .await
3115 .expect("invalid persist usage");
3116 let handle_epoch = since_handle.opaque().clone();
3117 let our_epoch = epoch.clone();
3118 let epoch = if our_epoch.0 > handle_epoch.0 {
3119 handle_epoch
3122 } else {
3123 our_epoch
3128 };
3129 let new_since = Antichain::new();
3130 let downgrade = since_handle
3131 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3132 .await;
3133 if let Err(e) = downgrade {
3134 warn!("tried to finalize a shard with an advancing epoch: {e:?}");
3135 return Ok(());
3136 }
3137 }
3140
3141 persist_client
3142 .finalize_shard::<SourceData, (), T, StorageDiff>(
3143 shard_id,
3144 Diagnostics::from_purpose("finalizing shards"),
3145 )
3146 .await
3147 };
3148
3149 match finalize().await {
3150 Err(e) => {
3151 warn!("error during finalization of shard {shard_id}: {e:?}");
3154 None
3155 }
3156 Ok(()) => {
3157 debug!(%shard_id, "finalize success!");
3158 Some(shard_id)
3159 }
3160 }
3161 }
3162 })
3163 .buffer_unordered(10)
3168 .for_each(|shard_id| async move {
3172 match shard_id {
3173 None => metrics.finalization_failed.inc(),
3174 Some(shard_id) => {
3175 {
3182 let mut finalizable_shards = finalizable_shards.lock();
3183 let mut finalized_shards = finalized_shards.lock();
3184 finalizable_shards.remove(&shard_id);
3185 finalized_shards.insert(shard_id);
3186 }
3187
3188 metrics.finalization_succeeded.inc();
3189 }
3190 }
3191 })
3192 .await;
3193
3194 debug!("done finalizing shards");
3195 }
3196}
3197
3198#[derive(Debug)]
3199pub(crate) enum SnapshotStatsAsOf<T: TimelyTimestamp + Lattice + Codec64> {
3200 Direct(Antichain<T>),
3203 Txns(DataSnapshot<T>),
3206}
3207
3208#[cfg(test)]
3209mod tests {
3210 use std::str::FromStr;
3211 use std::sync::Arc;
3212
3213 use mz_build_info::DUMMY_BUILD_INFO;
3214 use mz_dyncfg::ConfigSet;
3215 use mz_ore::assert_err;
3216 use mz_ore::metrics::{MetricsRegistry, UIntGauge};
3217 use mz_ore::now::SYSTEM_TIME;
3218 use mz_ore::url::SensitiveUrl;
3219 use mz_persist_client::cache::PersistClientCache;
3220 use mz_persist_client::cfg::PersistConfig;
3221 use mz_persist_client::rpc::PubSubClientConnection;
3222 use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
3223 use mz_persist_types::codec_impls::UnitSchema;
3224 use mz_repr::{RelationDesc, Row};
3225 use mz_secrets::InMemorySecretsController;
3226
3227 use super::*;
3228
3229 #[mz_ore::test(tokio::test)]
3230 #[cfg_attr(miri, ignore)] async fn test_snapshot_stats(&self) {
3232 let persist_location = PersistLocation {
3233 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3234 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3235 };
3236 let persist_client = PersistClientCache::new(
3237 PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
3238 &MetricsRegistry::new(),
3239 |_, _| PubSubClientConnection::noop(),
3240 );
3241 let persist_client = Arc::new(persist_client);
3242
3243 let (cmds_tx, mut background_task) =
3244 BackgroundTask::new_for_test(persist_location.clone(), Arc::clone(&persist_client));
3245 let background_task =
3246 mz_ore::task::spawn(|| "storage_collections::background_task", async move {
3247 background_task.run().await
3248 });
3249
3250 let persist = persist_client.open(persist_location).await.unwrap();
3251
3252 let shard_id = ShardId::new();
3253 let since_handle = persist
3254 .open_critical_since(
3255 shard_id,
3256 PersistClient::CONTROLLER_CRITICAL_SINCE,
3257 Diagnostics::for_tests(),
3258 )
3259 .await
3260 .unwrap();
3261 let write_handle = persist
3262 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3263 shard_id,
3264 Arc::new(RelationDesc::empty()),
3265 Arc::new(UnitSchema),
3266 Diagnostics::for_tests(),
3267 )
3268 .await
3269 .unwrap();
3270
3271 cmds_tx
3272 .send(BackgroundCmd::Register {
3273 id: GlobalId::User(1),
3274 is_in_txns: false,
3275 since_handle: SinceHandleWrapper::Critical(since_handle),
3276 write_handle,
3277 })
3278 .unwrap();
3279
3280 let mut write_handle = persist
3281 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3282 shard_id,
3283 Arc::new(RelationDesc::empty()),
3284 Arc::new(UnitSchema),
3285 Diagnostics::for_tests(),
3286 )
3287 .await
3288 .unwrap();
3289
3290 let stats =
3292 snapshot_stats(&cmds_tx, GlobalId::User(2), Antichain::from_elem(0.into())).await;
3293 assert_err!(stats);
3294
3295 let stats_fut = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3297 assert_none!(stats_fut.now_or_never());
3298
3299 let stats_ts1_fut =
3301 snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3302
3303 let data = (
3305 (SourceData(Ok(Row::default())), ()),
3306 mz_repr::Timestamp::from(0),
3307 1i64,
3308 );
3309 let () = write_handle
3310 .compare_and_append(
3311 &[data],
3312 Antichain::from_elem(0.into()),
3313 Antichain::from_elem(1.into()),
3314 )
3315 .await
3316 .unwrap()
3317 .unwrap();
3318
3319 let stats = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(0.into()))
3321 .await
3322 .unwrap();
3323 assert_eq!(stats.num_updates, 1);
3324
3325 let data = (
3327 (SourceData(Ok(Row::default())), ()),
3328 mz_repr::Timestamp::from(1),
3329 1i64,
3330 );
3331 let () = write_handle
3332 .compare_and_append(
3333 &[data],
3334 Antichain::from_elem(1.into()),
3335 Antichain::from_elem(2.into()),
3336 )
3337 .await
3338 .unwrap()
3339 .unwrap();
3340
3341 let stats = stats_ts1_fut.await.unwrap();
3342 assert_eq!(stats.num_updates, 2);
3343
3344 drop(background_task);
3346 }
3347
3348 async fn snapshot_stats<T: TimelyTimestamp + Lattice + Codec64>(
3349 cmds_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
3350 id: GlobalId,
3351 as_of: Antichain<T>,
3352 ) -> Result<SnapshotStats, StorageError<T>> {
3353 let (tx, rx) = oneshot::channel();
3354 cmds_tx
3355 .send(BackgroundCmd::SnapshotStats(
3356 id,
3357 SnapshotStatsAsOf::Direct(as_of),
3358 tx,
3359 ))
3360 .unwrap();
3361 let res = rx.await.expect("BackgroundTask should be live").0;
3362
3363 res.await
3364 }
3365
3366 impl<T: TimelyTimestamp + Lattice + Codec64> BackgroundTask<T> {
3367 fn new_for_test(
3368 _persist_location: PersistLocation,
3369 _persist_client: Arc<PersistClientCache>,
3370 ) -> (mpsc::UnboundedSender<BackgroundCmd<T>>, Self) {
3371 let (cmds_tx, cmds_rx) = mpsc::unbounded_channel();
3372 let (_holds_tx, holds_rx) = mpsc::unbounded_channel();
3373 let connection_context =
3374 ConnectionContext::for_tests(Arc::new(InMemorySecretsController::new()));
3375
3376 let task = Self {
3377 config: Arc::new(Mutex::new(StorageConfiguration::new(
3378 connection_context,
3379 ConfigSet::default(),
3380 ))),
3381 cmds_tx: cmds_tx.clone(),
3382 cmds_rx,
3383 holds_rx,
3384 finalizable_shards: Arc::new(ShardIdSet::new(
3385 UIntGauge::new("finalizable_shards", "dummy gauge for tests").unwrap(),
3386 )),
3387 collections: Arc::new(Mutex::new(BTreeMap::new())),
3388 shard_by_id: BTreeMap::new(),
3389 since_handles: BTreeMap::new(),
3390 txns_handle: None,
3391 txns_shards: BTreeSet::new(),
3392 };
3393
3394 (cmds_tx, task)
3395 }
3396 }
3397}