1use std::cmp::Reverse;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::num::NonZeroI64;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use async_trait::async_trait;
20use differential_dataflow::lattice::Lattice;
21use futures::future::BoxFuture;
22use futures::stream::{BoxStream, FuturesUnordered};
23use futures::{Future, FutureExt, StreamExt};
24use itertools::Itertools;
25
26use mz_ore::collections::CollectionExt;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::now::{EpochMillis, NowFn};
29use mz_ore::task::AbortOnDropHandle;
30use mz_ore::{assert_none, instrument, soft_assert_or_log};
31use mz_persist_client::cache::PersistClientCache;
32use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
33use mz_persist_client::critical::SinceHandle;
34use mz_persist_client::read::{Cursor, ReadHandle};
35use mz_persist_client::schema::CaESchema;
36use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
37use mz_persist_client::write::WriteHandle;
38use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
39use mz_persist_types::Codec64;
40use mz_persist_types::codec_impls::UnitSchema;
41use mz_persist_types::txn::TxnsCodec;
42use mz_repr::{GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
43use mz_storage_types::StorageDiff;
44use mz_storage_types::configuration::StorageConfiguration;
45use mz_storage_types::connections::ConnectionContext;
46use mz_storage_types::connections::inline::InlinedConnection;
47use mz_storage_types::controller::{CollectionMetadata, StorageError, TxnsCodecRow};
48use mz_storage_types::dyncfgs::STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION;
49use mz_storage_types::parameters::StorageParameters;
50use mz_storage_types::read_holds::{ReadHold, ReadHoldError};
51use mz_storage_types::read_policy::ReadPolicy;
52use mz_storage_types::sources::{
53 GenericSourceConnection, IngestionDescription, SourceData, SourceDesc, SourceEnvelope,
54 SourceExport, SourceExportDataConfig, Timeline,
55};
56use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
57use mz_txn_wal::metrics::Metrics as TxnMetrics;
58use mz_txn_wal::txn_read::{DataSnapshot, TxnsRead};
59use mz_txn_wal::txns::TxnsHandle;
60use timely::PartialOrder;
61use timely::order::TotalOrder;
62use timely::progress::frontier::MutableAntichain;
63use timely::progress::{Antichain, ChangeBatch, Timestamp as TimelyTimestamp};
64use tokio::sync::{mpsc, oneshot};
65use tokio::time::MissedTickBehavior;
66use tracing::{debug, info, trace, warn};
67
68use crate::client::TimestamplessUpdateBuilder;
69use crate::controller::{
70 CollectionDescription, DataSource, PersistEpoch, StorageMetadata, StorageTxn,
71};
72use crate::storage_collections::metrics::{ShardIdSet, StorageCollectionsMetrics};
73
74mod metrics;
75
76#[async_trait]
90pub trait StorageCollections: Debug {
91 type Timestamp: TimelyTimestamp;
92
93 async fn initialize_state(
100 &self,
101 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
102 init_ids: BTreeSet<GlobalId>,
103 ) -> Result<(), StorageError<Self::Timestamp>>;
104
105 fn update_parameters(&self, config_params: StorageParameters);
107
108 fn collection_metadata(
110 &self,
111 id: GlobalId,
112 ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>>;
113
114 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
120
121 fn collection_frontiers(
123 &self,
124 id: GlobalId,
125 ) -> Result<CollectionFrontiers<Self::Timestamp>, StorageError<Self::Timestamp>> {
126 let frontiers = self
127 .collections_frontiers(vec![id])?
128 .expect_element(|| "known to exist");
129
130 Ok(frontiers)
131 }
132
133 fn collections_frontiers(
136 &self,
137 id: Vec<GlobalId>,
138 ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>>;
139
140 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>>;
145
146 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
149
150 async fn snapshot_stats(
153 &self,
154 id: GlobalId,
155 as_of: Antichain<Self::Timestamp>,
156 ) -> Result<SnapshotStats, StorageError<Self::Timestamp>>;
157
158 async fn snapshot_parts_stats(
167 &self,
168 id: GlobalId,
169 as_of: Antichain<Self::Timestamp>,
170 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>;
171
172 fn snapshot(
174 &self,
175 id: GlobalId,
176 as_of: Self::Timestamp,
177 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>;
178
179 async fn snapshot_latest(
182 &self,
183 id: GlobalId,
184 ) -> Result<Vec<Row>, StorageError<Self::Timestamp>>;
185
186 fn snapshot_cursor(
188 &self,
189 id: GlobalId,
190 as_of: Self::Timestamp,
191 ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
192 where
193 Self::Timestamp: Codec64 + TimelyTimestamp + Lattice;
194
195 fn snapshot_and_stream(
200 &self,
201 id: GlobalId,
202 as_of: Self::Timestamp,
203 ) -> BoxFuture<
204 'static,
205 Result<
206 BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
207 StorageError<Self::Timestamp>,
208 >,
209 >;
210
211 fn create_update_builder(
214 &self,
215 id: GlobalId,
216 ) -> BoxFuture<
217 'static,
218 Result<
219 TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
220 StorageError<Self::Timestamp>,
221 >,
222 >
223 where
224 Self::Timestamp: Lattice + Codec64;
225
226 async fn prepare_state(
232 &self,
233 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
234 ids_to_add: BTreeSet<GlobalId>,
235 ids_to_drop: BTreeSet<GlobalId>,
236 ids_to_register: BTreeMap<GlobalId, ShardId>,
237 ) -> Result<(), StorageError<Self::Timestamp>>;
238
239 async fn create_collections_for_bootstrap(
265 &self,
266 storage_metadata: &StorageMetadata,
267 register_ts: Option<Self::Timestamp>,
268 collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
269 migrated_storage_collections: &BTreeSet<GlobalId>,
270 ) -> Result<(), StorageError<Self::Timestamp>>;
271
272 async fn alter_ingestion_source_desc(
280 &self,
281 ingestion_id: GlobalId,
282 source_desc: SourceDesc,
283 ) -> Result<(), StorageError<Self::Timestamp>>;
284
285 async fn alter_ingestion_export_data_configs(
287 &self,
288 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
289 ) -> Result<(), StorageError<Self::Timestamp>>;
290
291 async fn alter_ingestion_connections(
296 &self,
297 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
298 ) -> Result<(), StorageError<Self::Timestamp>>;
299
300 async fn alter_table_desc(
302 &self,
303 existing_collection: GlobalId,
304 new_collection: GlobalId,
305 new_desc: RelationDesc,
306 expected_version: RelationVersion,
307 ) -> Result<(), StorageError<Self::Timestamp>>;
308
309 fn drop_collections_unvalidated(
321 &self,
322 storage_metadata: &StorageMetadata,
323 identifiers: Vec<GlobalId>,
324 );
325
326 fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>);
340
341 fn acquire_read_holds(
344 &self,
345 desired_holds: Vec<GlobalId>,
346 ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError>;
347
348 fn determine_time_dependence(
351 &self,
352 id: GlobalId,
353 ) -> Result<Option<TimeDependence>, TimeDependenceError>;
354}
355
356pub struct SnapshotCursor<T: Codec64 + TimelyTimestamp + Lattice> {
359 pub _read_handle: ReadHandle<SourceData, (), T, StorageDiff>,
362 pub cursor: Cursor<SourceData, (), T, StorageDiff>,
363}
364
365impl<T: Codec64 + TimelyTimestamp + Lattice + Sync> SnapshotCursor<T> {
366 pub async fn next(
367 &mut self,
368 ) -> Option<
369 impl Iterator<
370 Item = (
371 (Result<SourceData, String>, Result<(), String>),
372 T,
373 StorageDiff,
374 ),
375 > + Sized
376 + '_,
377 > {
378 self.cursor.next().await
379 }
380}
381
382#[derive(Debug)]
384pub struct CollectionFrontiers<T> {
385 pub id: GlobalId,
387
388 pub write_frontier: Antichain<T>,
390
391 pub implied_capability: Antichain<T>,
398
399 pub read_capabilities: Antichain<T>,
402}
403
404#[derive(Debug, Clone)]
407pub struct StorageCollectionsImpl<
408 T: TimelyTimestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
409> {
410 envd_epoch: NonZeroI64,
413
414 read_only: bool,
420
421 finalizable_shards: Arc<ShardIdSet>,
424
425 finalized_shards: Arc<ShardIdSet>,
430
431 collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
433
434 txns_read: TxnsRead<T>,
436
437 config: Arc<Mutex<StorageConfiguration>>,
439
440 initial_txn_upper: Antichain<T>,
449
450 persist_location: PersistLocation,
452
453 persist: Arc<PersistClientCache>,
455
456 cmd_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
458
459 holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<T>)>,
461
462 _background_task: Arc<AbortOnDropHandle<()>>,
464 _finalize_shards_task: Arc<AbortOnDropHandle<()>>,
465}
466
467impl<T> StorageCollectionsImpl<T>
477where
478 T: TimelyTimestamp
479 + Lattice
480 + Codec64
481 + From<EpochMillis>
482 + TimestampManipulation
483 + Into<mz_repr::Timestamp>
484 + Sync,
485{
486 pub async fn new(
494 persist_location: PersistLocation,
495 persist_clients: Arc<PersistClientCache>,
496 metrics_registry: &MetricsRegistry,
497 _now: NowFn,
498 txns_metrics: Arc<TxnMetrics>,
499 envd_epoch: NonZeroI64,
500 read_only: bool,
501 connection_context: ConnectionContext,
502 txn: &dyn StorageTxn<T>,
503 ) -> Self {
504 let metrics = StorageCollectionsMetrics::register_into(metrics_registry);
505
506 let txns_id = txn
510 .get_txn_wal_shard()
511 .expect("must call prepare initialization before creating StorageCollections");
512
513 let txns_client = persist_clients
514 .open(persist_location.clone())
515 .await
516 .expect("location should be valid");
517
518 let _txns_handle: TxnsHandle<SourceData, (), T, StorageDiff, PersistEpoch, TxnsCodecRow> =
521 TxnsHandle::open(
522 T::minimum(),
523 txns_client.clone(),
524 txns_client.dyncfgs().clone(),
525 Arc::clone(&txns_metrics),
526 txns_id,
527 )
528 .await;
529
530 let (txns_key_schema, txns_val_schema) = TxnsCodecRow::schemas();
532 let mut txns_write = txns_client
533 .open_writer(
534 txns_id,
535 Arc::new(txns_key_schema),
536 Arc::new(txns_val_schema),
537 Diagnostics {
538 shard_name: "txns".to_owned(),
539 handle_purpose: "commit txns".to_owned(),
540 },
541 )
542 .await
543 .expect("txns schema shouldn't change");
544
545 let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
546
547 let collections = Arc::new(std::sync::Mutex::new(BTreeMap::default()));
548 let finalizable_shards =
549 Arc::new(ShardIdSet::new(metrics.finalization_outstanding.clone()));
550 let finalized_shards =
551 Arc::new(ShardIdSet::new(metrics.finalization_pending_commit.clone()));
552 let config = Arc::new(Mutex::new(StorageConfiguration::new(
553 connection_context,
554 mz_dyncfgs::all_dyncfgs(),
555 )));
556
557 let initial_txn_upper = txns_write.fetch_recent_upper().await.to_owned();
558
559 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
560 let (holds_tx, holds_rx) = mpsc::unbounded_channel();
561 let mut background_task = BackgroundTask {
562 config: Arc::clone(&config),
563 cmds_tx: cmd_tx.clone(),
564 cmds_rx: cmd_rx,
565 holds_rx,
566 collections: Arc::clone(&collections),
567 finalizable_shards: Arc::clone(&finalizable_shards),
568 shard_by_id: BTreeMap::new(),
569 since_handles: BTreeMap::new(),
570 txns_handle: Some(txns_write),
571 txns_shards: Default::default(),
572 };
573
574 let background_task =
575 mz_ore::task::spawn(|| "storage_collections::background_task", async move {
576 background_task.run().await
577 });
578
579 let finalize_shards_task = mz_ore::task::spawn(
580 || "storage_collections::finalize_shards_task",
581 finalize_shards_task::<T>(FinalizeShardsTaskConfig {
582 envd_epoch: envd_epoch.clone(),
583 config: Arc::clone(&config),
584 metrics,
585 finalizable_shards: Arc::clone(&finalizable_shards),
586 finalized_shards: Arc::clone(&finalized_shards),
587 persist_location: persist_location.clone(),
588 persist: Arc::clone(&persist_clients),
589 read_only,
590 }),
591 );
592
593 Self {
594 finalizable_shards,
595 finalized_shards,
596 collections,
597 txns_read,
598 envd_epoch,
599 read_only,
600 config,
601 initial_txn_upper,
602 persist_location,
603 persist: persist_clients,
604 cmd_tx,
605 holds_tx,
606 _background_task: Arc::new(background_task.abort_on_drop()),
607 _finalize_shards_task: Arc::new(finalize_shards_task.abort_on_drop()),
608 }
609 }
610
611 async fn open_data_handles(
619 &self,
620 id: &GlobalId,
621 shard: ShardId,
622 since: Option<&Antichain<T>>,
623 relation_desc: RelationDesc,
624 persist_client: &PersistClient,
625 ) -> (
626 WriteHandle<SourceData, (), T, StorageDiff>,
627 SinceHandleWrapper<T>,
628 ) {
629 let since_handle = if self.read_only {
630 let read_handle = self
631 .open_leased_handle(id, shard, relation_desc.clone(), since, persist_client)
632 .await;
633 SinceHandleWrapper::Leased(read_handle)
634 } else {
635 let since_handle = self
636 .open_critical_handle(id, shard, since, persist_client)
637 .await;
638
639 SinceHandleWrapper::Critical(since_handle)
640 };
641
642 let mut write_handle = self
643 .open_write_handle(id, shard, relation_desc, persist_client)
644 .await;
645
646 write_handle.fetch_recent_upper().await;
657
658 (write_handle, since_handle)
659 }
660
661 async fn open_write_handle(
663 &self,
664 id: &GlobalId,
665 shard: ShardId,
666 relation_desc: RelationDesc,
667 persist_client: &PersistClient,
668 ) -> WriteHandle<SourceData, (), T, StorageDiff> {
669 let diagnostics = Diagnostics {
670 shard_name: id.to_string(),
671 handle_purpose: format!("controller data for {}", id),
672 };
673
674 let write = persist_client
675 .open_writer(
676 shard,
677 Arc::new(relation_desc),
678 Arc::new(UnitSchema),
679 diagnostics.clone(),
680 )
681 .await
682 .expect("invalid persist usage");
683
684 write
685 }
686
687 async fn open_critical_handle(
695 &self,
696 id: &GlobalId,
697 shard: ShardId,
698 since: Option<&Antichain<T>>,
699 persist_client: &PersistClient,
700 ) -> SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch> {
701 tracing::debug!(%id, ?since, "opening critical handle");
702
703 assert!(
704 !self.read_only,
705 "attempting to open critical SinceHandle in read-only mode"
706 );
707
708 let diagnostics = Diagnostics {
709 shard_name: id.to_string(),
710 handle_purpose: format!("controller data for {}", id),
711 };
712
713 let since_handle = {
716 let mut handle: SinceHandle<_, _, _, _, PersistEpoch> = persist_client
719 .open_critical_since(
720 shard,
721 PersistClient::CONTROLLER_CRITICAL_SINCE,
722 diagnostics.clone(),
723 )
724 .await
725 .expect("invalid persist usage");
726
727 let provided_since = match since {
731 Some(since) => since,
732 None => &Antichain::from_elem(T::minimum()),
733 };
734 let since = handle.since().join(provided_since);
735
736 let our_epoch = self.envd_epoch;
737
738 loop {
739 let current_epoch: PersistEpoch = handle.opaque().clone();
740
741 let unchecked_success = current_epoch.0.map(|e| e <= our_epoch).unwrap_or(true);
743
744 if unchecked_success {
745 let checked_success = handle
748 .compare_and_downgrade_since(
749 ¤t_epoch,
750 (&PersistEpoch::from(our_epoch), &since),
751 )
752 .await
753 .is_ok();
754 if checked_success {
755 break handle;
756 }
757 } else {
758 mz_ore::halt!("fenced by envd @ {current_epoch:?}. ours = {our_epoch}");
759 }
760 }
761 };
762
763 since_handle
764 }
765
766 async fn open_leased_handle(
772 &self,
773 id: &GlobalId,
774 shard: ShardId,
775 relation_desc: RelationDesc,
776 since: Option<&Antichain<T>>,
777 persist_client: &PersistClient,
778 ) -> ReadHandle<SourceData, (), T, StorageDiff> {
779 tracing::debug!(%id, ?since, "opening leased handle");
780
781 let diagnostics = Diagnostics {
782 shard_name: id.to_string(),
783 handle_purpose: format!("controller data for {}", id),
784 };
785
786 let use_critical_since = false;
787 let mut handle: ReadHandle<_, _, _, _> = persist_client
788 .open_leased_reader(
789 shard,
790 Arc::new(relation_desc),
791 Arc::new(UnitSchema),
792 diagnostics.clone(),
793 use_critical_since,
794 )
795 .await
796 .expect("invalid persist usage");
797
798 let provided_since = match since {
802 Some(since) => since,
803 None => &Antichain::from_elem(T::minimum()),
804 };
805 let since = handle.since().join(provided_since);
806
807 handle.downgrade_since(&since).await;
808
809 handle
810 }
811
812 fn register_handles(
813 &self,
814 id: GlobalId,
815 is_in_txns: bool,
816 since_handle: SinceHandleWrapper<T>,
817 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
818 ) {
819 self.send(BackgroundCmd::Register {
820 id,
821 is_in_txns,
822 since_handle,
823 write_handle,
824 });
825 }
826
827 fn send(&self, cmd: BackgroundCmd<T>) {
828 let _ = self.cmd_tx.send(cmd);
829 }
830
831 async fn snapshot_stats_inner(
832 &self,
833 id: GlobalId,
834 as_of: SnapshotStatsAsOf<T>,
835 ) -> Result<SnapshotStats, StorageError<T>> {
836 let (tx, rx) = oneshot::channel();
843 self.send(BackgroundCmd::SnapshotStats(id, as_of, tx));
844 rx.await.expect("BackgroundTask should be live").0.await
845 }
846
847 fn install_collection_dependency_read_holds_inner(
853 &self,
854 self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
855 id: GlobalId,
856 ) -> Result<(), StorageError<T>> {
857 let (deps, collection_implied_capability) = match self_collections.get(&id) {
858 Some(CollectionState {
859 storage_dependencies: deps,
860 implied_capability,
861 ..
862 }) => (deps.clone(), implied_capability),
863 _ => return Ok(()),
864 };
865
866 for dep in deps.iter() {
867 let dep_collection = self_collections
868 .get(dep)
869 .ok_or(StorageError::IdentifierMissing(id))?;
870
871 mz_ore::soft_assert_or_log!(
872 PartialOrder::less_equal(
873 &dep_collection.implied_capability,
874 collection_implied_capability
875 ),
876 "dependency since ({dep}@{:?}) cannot be in advance of dependent's since ({id}@{:?})",
877 dep_collection.implied_capability,
878 collection_implied_capability,
879 );
880 }
881
882 self.install_read_capabilities_inner(
883 self_collections,
884 id,
885 &deps,
886 collection_implied_capability.clone(),
887 )?;
888
889 Ok(())
890 }
891
892 fn determine_collection_dependencies(
896 &self,
897 self_collections: &BTreeMap<GlobalId, CollectionState<T>>,
898 data_source: &DataSource<T>,
899 ) -> Result<Vec<GlobalId>, StorageError<T>> {
900 let dependencies = match &data_source {
901 DataSource::Introspection(_)
902 | DataSource::Webhook
903 | DataSource::Table { primary: None }
904 | DataSource::Progress
905 | DataSource::Other => Vec::new(),
906 DataSource::Table {
907 primary: Some(primary),
908 } => vec![*primary],
909 DataSource::IngestionExport {
910 ingestion_id,
911 data_config,
912 ..
913 } => {
914 let source = self_collections
917 .get(ingestion_id)
918 .ok_or(StorageError::IdentifierMissing(*ingestion_id))?;
919 let DataSource::Ingestion(ingestion) = &source.description.data_source else {
920 panic!("SourceExport must refer to a primary source that already exists");
921 };
922
923 match data_config.envelope {
924 SourceEnvelope::CdcV2 => Vec::new(),
925 _ => vec![ingestion.remap_collection_id],
926 }
927 }
928 DataSource::Ingestion(ingestion) => vec![ingestion.remap_collection_id],
930 DataSource::Sink { desc } => vec![desc.sink.from],
931 };
932
933 Ok(dependencies)
934 }
935
936 #[instrument(level = "debug")]
938 fn install_read_capabilities_inner(
939 &self,
940 self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
941 from_id: GlobalId,
942 storage_dependencies: &[GlobalId],
943 read_capability: Antichain<T>,
944 ) -> Result<(), StorageError<T>> {
945 let mut changes = ChangeBatch::new();
946 for time in read_capability.iter() {
947 changes.update(time.clone(), 1);
948 }
949
950 if tracing::span_enabled!(tracing::Level::TRACE) {
951 let user_capabilities = self_collections
953 .iter_mut()
954 .filter(|(id, _c)| id.is_user())
955 .map(|(id, c)| {
956 let updates = c.read_capabilities.updates().cloned().collect_vec();
957 (*id, c.implied_capability.clone(), updates)
958 })
959 .collect_vec();
960
961 trace!(
962 %from_id,
963 ?storage_dependencies,
964 ?read_capability,
965 ?user_capabilities,
966 "install_read_capabilities_inner");
967 }
968
969 let mut storage_read_updates = storage_dependencies
970 .iter()
971 .map(|id| (*id, changes.clone()))
972 .collect();
973
974 StorageCollectionsImpl::update_read_capabilities_inner(
975 &self.cmd_tx,
976 self_collections,
977 &mut storage_read_updates,
978 );
979
980 if tracing::span_enabled!(tracing::Level::TRACE) {
981 let user_capabilities = self_collections
983 .iter_mut()
984 .filter(|(id, _c)| id.is_user())
985 .map(|(id, c)| {
986 let updates = c.read_capabilities.updates().cloned().collect_vec();
987 (*id, c.implied_capability.clone(), updates)
988 })
989 .collect_vec();
990
991 trace!(
992 %from_id,
993 ?storage_dependencies,
994 ?read_capability,
995 ?user_capabilities,
996 "after install_read_capabilities_inner!");
997 }
998
999 Ok(())
1000 }
1001
1002 async fn recent_upper(&self, id: GlobalId) -> Result<Antichain<T>, StorageError<T>> {
1003 let metadata = &self.collection_metadata(id)?;
1004 let persist_client = self
1005 .persist
1006 .open(metadata.persist_location.clone())
1007 .await
1008 .unwrap();
1009 let diagnostics = Diagnostics {
1012 shard_name: id.to_string(),
1013 handle_purpose: format!("controller data for {}", id),
1014 };
1015 let write = persist_client
1018 .open_writer::<SourceData, (), T, StorageDiff>(
1019 metadata.data_shard,
1020 Arc::new(metadata.relation_desc.clone()),
1021 Arc::new(UnitSchema),
1022 diagnostics.clone(),
1023 )
1024 .await
1025 .expect("invalid persist usage");
1026 Ok(write.shared_upper())
1027 }
1028
1029 async fn read_handle_for_snapshot(
1030 persist: Arc<PersistClientCache>,
1031 metadata: &CollectionMetadata,
1032 id: GlobalId,
1033 ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
1034 let persist_client = persist
1035 .open(metadata.persist_location.clone())
1036 .await
1037 .unwrap();
1038
1039 let read_handle = persist_client
1045 .open_leased_reader::<SourceData, (), _, _>(
1046 metadata.data_shard,
1047 Arc::new(metadata.relation_desc.clone()),
1048 Arc::new(UnitSchema),
1049 Diagnostics {
1050 shard_name: id.to_string(),
1051 handle_purpose: format!("snapshot {}", id),
1052 },
1053 USE_CRITICAL_SINCE_SNAPSHOT.get(&persist.cfg),
1054 )
1055 .await
1056 .expect("invalid persist usage");
1057 Ok(read_handle)
1058 }
1059
1060 fn snapshot(
1066 &self,
1067 id: GlobalId,
1068 as_of: T,
1069 txns_read: &TxnsRead<T>,
1070 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<T>>>
1071 where
1072 T: Codec64 + From<EpochMillis> + TimestampManipulation,
1073 {
1074 let metadata = match self.collection_metadata(id) {
1075 Ok(metadata) => metadata.clone(),
1076 Err(e) => return async { Err(e) }.boxed(),
1077 };
1078 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1079 assert_eq!(txns_id, txns_read.txns_id());
1080 txns_read.clone()
1081 });
1082 let persist = Arc::clone(&self.persist);
1083 async move {
1084 let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1085 let contents = match txns_read {
1086 None => {
1087 read_handle
1089 .snapshot_and_fetch(Antichain::from_elem(as_of))
1090 .await
1091 }
1092 Some(txns_read) => {
1093 txns_read.update_gt(as_of.clone()).await;
1107 let data_snapshot = txns_read
1108 .data_snapshot(metadata.data_shard, as_of.clone())
1109 .await;
1110 data_snapshot.snapshot_and_fetch(&mut read_handle).await
1111 }
1112 };
1113 match contents {
1114 Ok(contents) => {
1115 let mut snapshot = Vec::with_capacity(contents.len());
1116 for ((data, _), _, diff) in contents {
1117 let row = data.expect("invalid protobuf data").0?;
1120 snapshot.push((row, diff));
1121 }
1122 Ok(snapshot)
1123 }
1124 Err(_) => Err(StorageError::ReadBeforeSince(id)),
1125 }
1126 }
1127 .boxed()
1128 }
1129
1130 fn snapshot_and_stream(
1131 &self,
1132 id: GlobalId,
1133 as_of: T,
1134 txns_read: &TxnsRead<T>,
1135 ) -> BoxFuture<'static, Result<BoxStream<'static, (SourceData, T, StorageDiff)>, StorageError<T>>>
1136 {
1137 use futures::stream::StreamExt;
1138
1139 let metadata = match self.collection_metadata(id) {
1140 Ok(metadata) => metadata.clone(),
1141 Err(e) => return async { Err(e) }.boxed(),
1142 };
1143 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1144 assert_eq!(txns_id, txns_read.txns_id());
1145 txns_read.clone()
1146 });
1147 let persist = Arc::clone(&self.persist);
1148
1149 async move {
1150 let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1151 let stream = match txns_read {
1152 None => {
1153 read_handle
1155 .snapshot_and_stream(Antichain::from_elem(as_of))
1156 .await
1157 .map_err(|_| StorageError::ReadBeforeSince(id))?
1158 .boxed()
1159 }
1160 Some(txns_read) => {
1161 txns_read.update_gt(as_of.clone()).await;
1162 let data_snapshot = txns_read
1163 .data_snapshot(metadata.data_shard, as_of.clone())
1164 .await;
1165 data_snapshot
1166 .snapshot_and_stream(&mut read_handle)
1167 .await
1168 .map_err(|_| StorageError::ReadBeforeSince(id))?
1169 .boxed()
1170 }
1171 };
1172
1173 let stream = stream
1175 .map(|((k, _v), t, d)| {
1176 let data = k.expect("error while streaming from Persist");
1179 (data, t, d)
1180 })
1181 .boxed();
1182 Ok(stream)
1183 }
1184 .boxed()
1185 }
1186
1187 fn set_read_policies_inner(
1188 &self,
1189 collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1190 policies: Vec<(GlobalId, ReadPolicy<T>)>,
1191 ) {
1192 trace!("set_read_policies: {:?}", policies);
1193
1194 let mut read_capability_changes = BTreeMap::default();
1195
1196 for (id, policy) in policies.into_iter() {
1197 let collection = match collections.get_mut(&id) {
1198 Some(c) => c,
1199 None => {
1200 panic!("Reference to absent collection {id}");
1201 }
1202 };
1203
1204 let mut new_read_capability = policy.frontier(collection.write_frontier.borrow());
1205
1206 if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
1207 let mut update = ChangeBatch::new();
1208 update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
1209 std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
1210 update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
1211 if !update.is_empty() {
1212 read_capability_changes.insert(id, update);
1213 }
1214 }
1215
1216 collection.read_policy = policy;
1217 }
1218
1219 for (id, changes) in read_capability_changes.iter() {
1220 if id.is_user() {
1221 trace!(%id, ?changes, "in set_read_policies, capability changes");
1222 }
1223 }
1224
1225 if !read_capability_changes.is_empty() {
1226 StorageCollectionsImpl::update_read_capabilities_inner(
1227 &self.cmd_tx,
1228 collections,
1229 &mut read_capability_changes,
1230 );
1231 }
1232 }
1233
1234 fn update_read_capabilities_inner(
1238 cmd_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
1239 collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1240 updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>,
1241 ) {
1242 let mut collections_net = BTreeMap::new();
1244
1245 while let Some(id) = updates.keys().rev().next().cloned() {
1250 let mut update = updates.remove(&id).unwrap();
1251
1252 if id.is_user() {
1253 trace!(id = ?id, update = ?update, "update_read_capabilities");
1254 }
1255
1256 let collection = if let Some(c) = collections.get_mut(&id) {
1257 c
1258 } else {
1259 let has_positive_updates = update.iter().any(|(_ts, diff)| *diff > 0);
1260 if has_positive_updates {
1261 panic!(
1262 "reference to absent collection {id} but we have positive updates: {:?}",
1263 update
1264 );
1265 } else {
1266 continue;
1269 }
1270 };
1271
1272 let current_read_capabilities = collection.read_capabilities.frontier().to_owned();
1273 for (time, diff) in update.iter() {
1274 assert!(
1275 collection.read_capabilities.count_for(time) + diff >= 0,
1276 "update {:?} for collection {id} would lead to negative \
1277 read capabilities, read capabilities before applying: {:?}",
1278 update,
1279 collection.read_capabilities
1280 );
1281
1282 if collection.read_capabilities.count_for(time) + diff > 0 {
1283 assert!(
1284 current_read_capabilities.less_equal(time),
1285 "update {:?} for collection {id} is trying to \
1286 install read capabilities before the current \
1287 frontier of read capabilities, read capabilities before applying: {:?}",
1288 update,
1289 collection.read_capabilities
1290 );
1291 }
1292 }
1293
1294 let changes = collection.read_capabilities.update_iter(update.drain());
1295 update.extend(changes);
1296
1297 if id.is_user() {
1298 trace!(
1299 %id,
1300 ?collection.storage_dependencies,
1301 ?update,
1302 "forwarding update to storage dependencies");
1303 }
1304
1305 for id in collection.storage_dependencies.iter() {
1306 updates
1307 .entry(*id)
1308 .or_insert_with(ChangeBatch::new)
1309 .extend(update.iter().cloned());
1310 }
1311
1312 let (changes, frontier) = collections_net
1313 .entry(id)
1314 .or_insert_with(|| (<ChangeBatch<_>>::new(), Antichain::new()));
1315
1316 changes.extend(update.drain());
1317 *frontier = collection.read_capabilities.frontier().to_owned();
1318 }
1319
1320 let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
1323 for (key, (mut changes, frontier)) in collections_net {
1324 if !changes.is_empty() {
1325 let collection = collections.get(&key).expect("must still exist");
1327 let should_emit_persist_compaction = !matches!(
1328 collection.description.data_source,
1329 DataSource::Table { primary: Some(_) }
1330 );
1331
1332 if frontier.is_empty() {
1333 info!(id = %key, "removing collection state because the since advanced to []!");
1334 collections.remove(&key).expect("must still exist");
1335 }
1336
1337 if should_emit_persist_compaction {
1338 persist_compaction_commands.push((key, frontier));
1339 }
1340 }
1341 }
1342
1343 if !persist_compaction_commands.is_empty() {
1344 cmd_tx
1345 .send(BackgroundCmd::DowngradeSince(persist_compaction_commands))
1346 .expect("cannot fail to send");
1347 }
1348 }
1349
1350 fn synchronize_finalized_shards(&self, storage_metadata: &StorageMetadata) {
1352 self.finalized_shards
1353 .lock()
1354 .retain(|shard| storage_metadata.unfinalized_shards.contains(shard));
1355 }
1356}
1357
1358#[async_trait]
1360impl<T> StorageCollections for StorageCollectionsImpl<T>
1361where
1362 T: TimelyTimestamp
1363 + Lattice
1364 + Codec64
1365 + From<EpochMillis>
1366 + TimestampManipulation
1367 + Into<mz_repr::Timestamp>
1368 + Sync,
1369{
1370 type Timestamp = T;
1371
1372 async fn initialize_state(
1373 &self,
1374 txn: &mut (dyn StorageTxn<T> + Send),
1375 init_ids: BTreeSet<GlobalId>,
1376 ) -> Result<(), StorageError<T>> {
1377 let metadata = txn.get_collection_metadata();
1378 let existing_metadata: BTreeSet<_> = metadata.into_iter().map(|(id, _)| id).collect();
1379
1380 let new_collections: BTreeSet<GlobalId> =
1382 init_ids.difference(&existing_metadata).cloned().collect();
1383
1384 self.prepare_state(
1385 txn,
1386 new_collections,
1387 BTreeSet::default(),
1388 BTreeMap::default(),
1389 )
1390 .await?;
1391
1392 let unfinalized_shards = txn.get_unfinalized_shards().into_iter().collect_vec();
1400
1401 info!(?unfinalized_shards, "initializing finalizable_shards");
1402
1403 self.finalizable_shards.lock().extend(unfinalized_shards);
1404
1405 Ok(())
1406 }
1407
1408 fn update_parameters(&self, config_params: StorageParameters) {
1409 config_params.dyncfg_updates.apply(self.persist.cfg());
1412
1413 self.config
1414 .lock()
1415 .expect("lock poisoned")
1416 .update(config_params);
1417 }
1418
1419 fn collection_metadata(
1420 &self,
1421 id: GlobalId,
1422 ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>> {
1423 let collections = self.collections.lock().expect("lock poisoned");
1424
1425 collections
1426 .get(&id)
1427 .map(|c| c.collection_metadata.clone())
1428 .ok_or(StorageError::IdentifierMissing(id))
1429 }
1430
1431 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
1432 let collections = self.collections.lock().expect("lock poisoned");
1433
1434 collections
1435 .iter()
1436 .filter(|(_id, c)| !c.is_dropped())
1437 .map(|(id, c)| (*id, c.collection_metadata.clone()))
1438 .collect()
1439 }
1440
1441 fn collections_frontiers(
1442 &self,
1443 ids: Vec<GlobalId>,
1444 ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>> {
1445 let collections = self.collections.lock().expect("lock poisoned");
1446
1447 let res = ids
1448 .into_iter()
1449 .map(|id| {
1450 collections
1451 .get(&id)
1452 .map(|c| CollectionFrontiers {
1453 id: id.clone(),
1454 write_frontier: c.write_frontier.clone(),
1455 implied_capability: c.implied_capability.clone(),
1456 read_capabilities: c.read_capabilities.frontier().to_owned(),
1457 })
1458 .ok_or(StorageError::IdentifierMissing(id))
1459 })
1460 .collect::<Result<Vec<_>, _>>()?;
1461
1462 Ok(res)
1463 }
1464
1465 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
1466 let collections = self.collections.lock().expect("lock poisoned");
1467
1468 let res = collections
1469 .iter()
1470 .filter(|(_id, c)| !c.is_dropped())
1471 .map(|(id, c)| CollectionFrontiers {
1472 id: id.clone(),
1473 write_frontier: c.write_frontier.clone(),
1474 implied_capability: c.implied_capability.clone(),
1475 read_capabilities: c.read_capabilities.frontier().to_owned(),
1476 })
1477 .collect_vec();
1478
1479 res
1480 }
1481
1482 async fn snapshot_stats(
1483 &self,
1484 id: GlobalId,
1485 as_of: Antichain<Self::Timestamp>,
1486 ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
1487 let metadata = self.collection_metadata(id)?;
1488
1489 let as_of = match metadata.txns_shard.as_ref() {
1492 None => SnapshotStatsAsOf::Direct(as_of),
1493 Some(txns_id) => {
1494 assert_eq!(txns_id, self.txns_read.txns_id());
1495 let as_of = as_of
1496 .into_option()
1497 .expect("cannot read as_of the empty antichain");
1498 self.txns_read.update_gt(as_of.clone()).await;
1499 let data_snapshot = self
1500 .txns_read
1501 .data_snapshot(metadata.data_shard, as_of.clone())
1502 .await;
1503 SnapshotStatsAsOf::Txns(data_snapshot)
1504 }
1505 };
1506 self.snapshot_stats_inner(id, as_of).await
1507 }
1508
1509 async fn snapshot_parts_stats(
1510 &self,
1511 id: GlobalId,
1512 as_of: Antichain<Self::Timestamp>,
1513 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
1514 let metadata = {
1515 let self_collections = self.collections.lock().expect("lock poisoned");
1516
1517 let collection_metadata = self_collections
1518 .get(&id)
1519 .ok_or(StorageError::IdentifierMissing(id))
1520 .map(|c| c.collection_metadata.clone());
1521
1522 match collection_metadata {
1523 Ok(m) => m,
1524 Err(e) => return Box::pin(async move { Err(e) }),
1525 }
1526 };
1527
1528 let persist = Arc::clone(&self.persist);
1531 let read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await;
1532
1533 let data_snapshot = match (metadata, as_of.as_option()) {
1534 (
1535 CollectionMetadata {
1536 txns_shard: Some(txns_id),
1537 data_shard,
1538 ..
1539 },
1540 Some(as_of),
1541 ) => {
1542 assert_eq!(txns_id, *self.txns_read.txns_id());
1543 self.txns_read.update_gt(as_of.clone()).await;
1544 let data_snapshot = self
1545 .txns_read
1546 .data_snapshot(data_shard, as_of.clone())
1547 .await;
1548 Some(data_snapshot)
1549 }
1550 _ => None,
1551 };
1552
1553 Box::pin(async move {
1554 let read_handle = read_handle?;
1555 let result = match data_snapshot {
1556 Some(data_snapshot) => data_snapshot.snapshot_parts_stats(&read_handle).await,
1557 None => read_handle.snapshot_parts_stats(as_of).await,
1558 };
1559 read_handle.expire().await;
1560 result.map_err(|_| StorageError::ReadBeforeSince(id))
1561 })
1562 }
1563
1564 fn snapshot(
1570 &self,
1571 id: GlobalId,
1572 as_of: Self::Timestamp,
1573 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>> {
1574 self.snapshot(id, as_of, &self.txns_read)
1575 }
1576
1577 async fn snapshot_latest(
1578 &self,
1579 id: GlobalId,
1580 ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
1581 let upper = self.recent_upper(id).await?;
1582 let res = match upper.as_option() {
1583 Some(f) if f > &T::minimum() => {
1584 let as_of = f.step_back().unwrap();
1585
1586 let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap();
1587 snapshot
1588 .into_iter()
1589 .map(|(row, diff)| {
1590 assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
1591 row
1592 })
1593 .collect()
1594 }
1595 Some(_min) => {
1596 Vec::new()
1598 }
1599 _ => {
1602 return Err(StorageError::InvalidUsage(
1603 "collection closed, cannot determine a read timestamp based on the upper"
1604 .to_string(),
1605 ));
1606 }
1607 };
1608
1609 Ok(res)
1610 }
1611
1612 fn snapshot_cursor(
1613 &self,
1614 id: GlobalId,
1615 as_of: Self::Timestamp,
1616 ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
1617 where
1618 Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
1619 {
1620 let metadata = match self.collection_metadata(id) {
1621 Ok(metadata) => metadata.clone(),
1622 Err(e) => return async { Err(e) }.boxed(),
1623 };
1624 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1625 assert_eq!(txns_id, self.txns_read.txns_id());
1628 self.txns_read.clone()
1629 });
1630 let persist = Arc::clone(&self.persist);
1631
1632 async move {
1634 let mut handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1635 let cursor = match txns_read {
1636 None => {
1637 let cursor = handle
1638 .snapshot_cursor(Antichain::from_elem(as_of), |_| true)
1639 .await
1640 .map_err(|_| StorageError::ReadBeforeSince(id))?;
1641 SnapshotCursor {
1642 _read_handle: handle,
1643 cursor,
1644 }
1645 }
1646 Some(txns_read) => {
1647 txns_read.update_gt(as_of.clone()).await;
1648 let data_snapshot = txns_read
1649 .data_snapshot(metadata.data_shard, as_of.clone())
1650 .await;
1651 let cursor = data_snapshot
1652 .snapshot_cursor(&mut handle, |_| true)
1653 .await
1654 .map_err(|_| StorageError::ReadBeforeSince(id))?;
1655 SnapshotCursor {
1656 _read_handle: handle,
1657 cursor,
1658 }
1659 }
1660 };
1661
1662 Ok(cursor)
1663 }
1664 .boxed()
1665 }
1666
1667 fn snapshot_and_stream(
1668 &self,
1669 id: GlobalId,
1670 as_of: Self::Timestamp,
1671 ) -> BoxFuture<
1672 'static,
1673 Result<
1674 BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
1675 StorageError<Self::Timestamp>,
1676 >,
1677 >
1678 where
1679 Self::Timestamp: TimelyTimestamp + Lattice + Codec64 + 'static,
1680 {
1681 self.snapshot_and_stream(id, as_of, &self.txns_read)
1682 }
1683
1684 fn create_update_builder(
1685 &self,
1686 id: GlobalId,
1687 ) -> BoxFuture<
1688 'static,
1689 Result<
1690 TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
1691 StorageError<Self::Timestamp>,
1692 >,
1693 > {
1694 let metadata = match self.collection_metadata(id) {
1695 Ok(m) => m,
1696 Err(e) => return Box::pin(async move { Err(e) }),
1697 };
1698 let persist = Arc::clone(&self.persist);
1699
1700 async move {
1701 let persist_client = persist
1702 .open(metadata.persist_location.clone())
1703 .await
1704 .expect("invalid persist usage");
1705 let write_handle = persist_client
1706 .open_writer::<SourceData, (), Self::Timestamp, StorageDiff>(
1707 metadata.data_shard,
1708 Arc::new(metadata.relation_desc.clone()),
1709 Arc::new(UnitSchema),
1710 Diagnostics {
1711 shard_name: id.to_string(),
1712 handle_purpose: format!("create write batch {}", id),
1713 },
1714 )
1715 .await
1716 .expect("invalid persist usage");
1717 let builder = TimestamplessUpdateBuilder::new(&write_handle);
1718
1719 Ok(builder)
1720 }
1721 .boxed()
1722 }
1723
1724 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
1725 let collections = self.collections.lock().expect("lock poisoned");
1726
1727 if collections.contains_key(&id) {
1728 Ok(())
1729 } else {
1730 Err(StorageError::IdentifierMissing(id))
1731 }
1732 }
1733
1734 async fn prepare_state(
1735 &self,
1736 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
1737 ids_to_add: BTreeSet<GlobalId>,
1738 ids_to_drop: BTreeSet<GlobalId>,
1739 ids_to_register: BTreeMap<GlobalId, ShardId>,
1740 ) -> Result<(), StorageError<T>> {
1741 txn.insert_collection_metadata(
1742 ids_to_add
1743 .into_iter()
1744 .map(|id| (id, ShardId::new()))
1745 .collect(),
1746 )?;
1747 txn.insert_collection_metadata(ids_to_register)?;
1748
1749 let dropped_mappings = txn.delete_collection_metadata(ids_to_drop);
1751
1752 let dropped_shards = dropped_mappings
1753 .into_iter()
1754 .map(|(_id, shard)| shard)
1755 .collect();
1756
1757 txn.insert_unfinalized_shards(dropped_shards)?;
1758
1759 let finalized_shards = self.finalized_shards.lock().iter().copied().collect();
1762 txn.mark_shards_as_finalized(finalized_shards);
1763
1764 Ok(())
1765 }
1766
1767 #[instrument(level = "debug")]
1770 async fn create_collections_for_bootstrap(
1771 &self,
1772 storage_metadata: &StorageMetadata,
1773 register_ts: Option<Self::Timestamp>,
1774 mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
1775 migrated_storage_collections: &BTreeSet<GlobalId>,
1776 ) -> Result<(), StorageError<Self::Timestamp>> {
1777 let is_in_txns = |id, metadata: &CollectionMetadata| {
1778 metadata.txns_shard.is_some()
1779 && !(self.read_only && migrated_storage_collections.contains(&id))
1780 };
1781
1782 collections.sort_by_key(|(id, _)| *id);
1787 collections.dedup();
1788 for pos in 1..collections.len() {
1789 if collections[pos - 1].0 == collections[pos].0 {
1790 return Err(StorageError::CollectionIdReused(collections[pos].0));
1791 }
1792 }
1793
1794 {
1795 let self_collections = self.collections.lock().expect("lock poisoned");
1801 for (id, description) in collections.iter() {
1802 if let Some(existing_collection) = self_collections.get(id) {
1803 if &existing_collection.description != description {
1804 return Err(StorageError::CollectionIdReused(*id));
1805 }
1806 }
1807 }
1808 }
1809
1810 let enriched_with_metadata = collections
1813 .into_iter()
1814 .map(|(id, description)| {
1815 let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
1816
1817 let get_shard = |id| -> Result<ShardId, StorageError<T>> {
1818 let shard = storage_metadata.get_collection_shard::<T>(id)?;
1819 Ok(shard)
1820 };
1821
1822 let remap_shard = match &description.data_source {
1823 DataSource::Ingestion(IngestionDescription {
1825 remap_collection_id,
1826 ..
1827 }) => {
1828 Some(get_shard(*remap_collection_id)?)
1831 }
1832 _ => None,
1833 };
1834
1835 let txns_shard = description
1839 .data_source
1840 .in_txns()
1841 .then(|| *self.txns_read.txns_id());
1842
1843 let metadata = CollectionMetadata {
1844 persist_location: self.persist_location.clone(),
1845 remap_shard,
1846 data_shard,
1847 relation_desc: description.desc.clone(),
1848 txns_shard,
1849 };
1850
1851 Ok((id, description, metadata))
1852 })
1853 .collect_vec();
1854
1855 let persist_client = self
1857 .persist
1858 .open(self.persist_location.clone())
1859 .await
1860 .unwrap();
1861 let persist_client = &persist_client;
1862 use futures::stream::{StreamExt, TryStreamExt};
1865 let this = &*self;
1866 let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
1867 .map(|data: Result<_, StorageError<Self::Timestamp>>| {
1868 let register_ts = register_ts.clone();
1869 async move {
1870 let (id, description, metadata) = data?;
1871
1872 debug!(
1877 "mapping GlobalId={} to remap shard ({:?}), data shard ({})",
1878 id, metadata.remap_shard, metadata.data_shard
1879 );
1880
1881 let (write, mut since_handle) = this
1882 .open_data_handles(
1883 &id,
1884 metadata.data_shard,
1885 description.since.as_ref(),
1886 metadata.relation_desc.clone(),
1887 persist_client,
1888 )
1889 .await;
1890
1891 match description.data_source {
1900 DataSource::Introspection(_)
1901 | DataSource::IngestionExport { .. }
1902 | DataSource::Webhook
1903 | DataSource::Ingestion(_)
1904 | DataSource::Progress
1905 | DataSource::Other => {}
1906 DataSource::Sink { .. } => {}
1907 DataSource::Table { .. } => {
1908 let register_ts = register_ts.expect(
1909 "caller should have provided a register_ts when creating a table",
1910 );
1911 if since_handle.since().elements() == &[T::minimum()]
1912 && !migrated_storage_collections.contains(&id)
1913 {
1914 debug!("advancing {} to initial since of {:?}", id, register_ts);
1915 let token = since_handle.opaque();
1916 let _ = since_handle
1917 .compare_and_downgrade_since(
1918 &token,
1919 (&token, &Antichain::from_elem(register_ts.clone())),
1920 )
1921 .await;
1922 }
1923 }
1924 }
1925
1926 Ok::<_, StorageError<Self::Timestamp>>((
1927 id,
1928 description,
1929 write,
1930 since_handle,
1931 metadata,
1932 ))
1933 }
1934 })
1935 .buffer_unordered(50)
1937 .try_collect()
1951 .await?;
1952
1953 #[derive(Ord, PartialOrd, Eq, PartialEq)]
1955 enum DependencyOrder {
1956 Table(Reverse<GlobalId>),
1958 Collection(GlobalId),
1960 Sink(GlobalId),
1962 }
1963 to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
1964 DataSource::Table { .. } => DependencyOrder::Table(Reverse(*id)),
1965 DataSource::Sink { .. } => DependencyOrder::Sink(*id),
1966 _ => DependencyOrder::Collection(*id),
1967 });
1968
1969 let mut self_collections = self.collections.lock().expect("lock poisoned");
1972
1973 for (id, mut description, write_handle, since_handle, metadata) in to_register {
1974 if let DataSource::Ingestion(ingestion) = &mut description.data_source {
1979 let export = ingestion.desc.primary_source_export();
1980 ingestion.source_exports.insert(id, export);
1981 }
1982
1983 let write_frontier = write_handle.upper();
1984 let data_shard_since = since_handle.since().clone();
1985
1986 let storage_dependencies = self
1988 .determine_collection_dependencies(&*self_collections, &description.data_source)?;
1989
1990 let initial_since = match storage_dependencies
1992 .iter()
1993 .at_most_one()
1994 .expect("should have at most one dependency")
1995 {
1996 Some(dep) => {
1997 let dependency_collection = self_collections
1998 .get(dep)
1999 .ok_or(StorageError::IdentifierMissing(*dep))?;
2000 let dependency_since = dependency_collection.implied_capability.clone();
2001
2002 if PartialOrder::less_than(&data_shard_since, &dependency_since) {
2013 mz_ore::soft_assert_or_log!(
2032 write_frontier.elements() == &[T::minimum()]
2033 || write_frontier.is_empty()
2034 || PartialOrder::less_than(&dependency_since, write_frontier),
2035 "dependency ({dep}) since has advanced past dependent ({id}) upper \n
2036 dependent ({id}): since {:?}, upper {:?} \n
2037 dependency ({dep}): since {:?}",
2038 data_shard_since,
2039 write_frontier,
2040 dependency_since
2041 );
2042
2043 dependency_since
2044 } else {
2045 data_shard_since
2046 }
2047 }
2048 None => data_shard_since,
2049 };
2050
2051 let mut collection_state = CollectionState::new(
2052 description,
2053 initial_since,
2054 write_frontier.clone(),
2055 storage_dependencies,
2056 metadata.clone(),
2057 );
2058
2059 match &collection_state.description.data_source {
2061 DataSource::Introspection(_) => {
2062 self_collections.insert(id, collection_state);
2063 }
2064 DataSource::Webhook => {
2065 self_collections.insert(id, collection_state);
2066 }
2067 DataSource::IngestionExport {
2068 ingestion_id,
2069 details,
2070 data_config,
2071 } => {
2072 let source_collection = self_collections
2074 .get_mut(ingestion_id)
2075 .expect("known to exist");
2076 match &mut source_collection.description {
2077 CollectionDescription {
2078 data_source: DataSource::Ingestion(ingestion_desc),
2079 ..
2080 } => ingestion_desc.source_exports.insert(
2081 id,
2082 SourceExport {
2083 storage_metadata: (),
2084 details: details.clone(),
2085 data_config: data_config.clone(),
2086 },
2087 ),
2088 _ => unreachable!(
2089 "SourceExport must only refer to primary sources that already exist"
2090 ),
2091 };
2092
2093 self_collections.insert(id, collection_state);
2094 }
2095 DataSource::Table { .. } => {
2096 if is_in_txns(id, &metadata)
2099 && PartialOrder::less_than(
2100 &collection_state.write_frontier,
2101 &self.initial_txn_upper,
2102 )
2103 {
2104 collection_state
2110 .write_frontier
2111 .clone_from(&self.initial_txn_upper);
2112 }
2113 self_collections.insert(id, collection_state);
2114 }
2115 DataSource::Progress | DataSource::Other => {
2116 self_collections.insert(id, collection_state);
2117 }
2118 DataSource::Ingestion(_) => {
2119 self_collections.insert(id, collection_state);
2120 }
2121 DataSource::Sink { .. } => {
2122 self_collections.insert(id, collection_state);
2123 }
2124 }
2125
2126 self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle);
2127
2128 self.install_collection_dependency_read_holds_inner(&mut *self_collections, id)?;
2130 }
2131
2132 drop(self_collections);
2133
2134 self.synchronize_finalized_shards(storage_metadata);
2135
2136 Ok(())
2137 }
2138
2139 async fn alter_ingestion_source_desc(
2140 &self,
2141 ingestion_id: GlobalId,
2142 source_desc: SourceDesc,
2143 ) -> Result<(), StorageError<Self::Timestamp>> {
2144 let mut self_collections = self.collections.lock().expect("lock poisoned");
2148 let collection = self_collections
2149 .get_mut(&ingestion_id)
2150 .ok_or(StorageError::IdentifierMissing(ingestion_id))?;
2151
2152 let curr_ingestion = match &mut collection.description.data_source {
2153 DataSource::Ingestion(active_ingestion) => active_ingestion,
2154 _ => unreachable!("verified collection refers to ingestion"),
2155 };
2156
2157 curr_ingestion.desc = source_desc;
2158 debug!("altered {ingestion_id}'s SourceDesc");
2159
2160 Ok(())
2161 }
2162
2163 async fn alter_ingestion_export_data_configs(
2164 &self,
2165 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
2166 ) -> Result<(), StorageError<Self::Timestamp>> {
2167 let mut self_collections = self.collections.lock().expect("lock poisoned");
2168
2169 for (source_export_id, new_data_config) in source_exports {
2170 let source_export_collection = self_collections
2173 .get_mut(&source_export_id)
2174 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2175 let ingestion_id = match &mut source_export_collection.description.data_source {
2176 DataSource::IngestionExport {
2177 ingestion_id,
2178 details: _,
2179 data_config,
2180 } => {
2181 *data_config = new_data_config.clone();
2182 *ingestion_id
2183 }
2184 o => {
2185 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2186 Err(StorageError::IdentifierInvalid(source_export_id))?
2187 }
2188 };
2189 let ingestion_collection = self_collections
2192 .get_mut(&ingestion_id)
2193 .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
2194
2195 match &mut ingestion_collection.description.data_source {
2196 DataSource::Ingestion(ingestion_desc) => {
2197 let source_export = ingestion_desc
2198 .source_exports
2199 .get_mut(&source_export_id)
2200 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2201
2202 if source_export.data_config != new_data_config {
2203 tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
2204 source_export.data_config = new_data_config;
2205 } else {
2206 tracing::warn!(
2207 "alter_ingestion_export_data_configs called on \
2208 export {source_export_id} of {ingestion_id} but \
2209 the data config was the same"
2210 );
2211 }
2212 }
2213 o => {
2214 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2215 Err(StorageError::IdentifierInvalid(ingestion_id))?;
2216 }
2217 }
2218 }
2219
2220 Ok(())
2221 }
2222
2223 async fn alter_ingestion_connections(
2224 &self,
2225 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
2226 ) -> Result<(), StorageError<Self::Timestamp>> {
2227 let mut self_collections = self.collections.lock().expect("lock poisoned");
2228
2229 for (id, conn) in source_connections {
2230 let collection = self_collections
2231 .get_mut(&id)
2232 .ok_or_else(|| StorageError::IdentifierMissing(id))?;
2233
2234 match &mut collection.description.data_source {
2235 DataSource::Ingestion(ingestion) => {
2236 if ingestion.desc.connection != conn {
2239 info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
2240 ingestion.desc.connection = conn;
2241 } else {
2242 warn!(
2243 "update_source_connection called on {id} but the \
2244 connection was the same"
2245 );
2246 }
2247 }
2248 o => {
2249 warn!("update_source_connection called on {:?}", o);
2250 Err(StorageError::IdentifierInvalid(id))?;
2251 }
2252 }
2253 }
2254
2255 Ok(())
2256 }
2257
2258 async fn alter_table_desc(
2259 &self,
2260 existing_collection: GlobalId,
2261 new_collection: GlobalId,
2262 new_desc: RelationDesc,
2263 expected_version: RelationVersion,
2264 ) -> Result<(), StorageError<Self::Timestamp>> {
2265 let data_shard = {
2266 let self_collections = self.collections.lock().expect("lock poisoned");
2267 let existing = self_collections
2268 .get(&existing_collection)
2269 .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;
2270
2271 if !matches!(&existing.description.data_source, DataSource::Table { .. }) {
2273 return Err(StorageError::IdentifierInvalid(existing_collection));
2274 }
2275
2276 existing.collection_metadata.data_shard
2277 };
2278
2279 let persist_client = self
2280 .persist
2281 .open(self.persist_location.clone())
2282 .await
2283 .unwrap();
2284
2285 let diagnostics = Diagnostics {
2287 shard_name: existing_collection.to_string(),
2288 handle_purpose: "alter_table_desc".to_string(),
2289 };
2290 let expected_schema = expected_version.into();
2292 let schema_result = persist_client
2293 .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
2294 data_shard,
2295 expected_schema,
2296 &new_desc,
2297 &UnitSchema,
2298 diagnostics,
2299 )
2300 .await
2301 .map_err(|e| StorageError::InvalidUsage(e.to_string()))?;
2302 tracing::info!(
2303 ?existing_collection,
2304 ?new_collection,
2305 ?new_desc,
2306 "evolved schema"
2307 );
2308
2309 match schema_result {
2310 CaESchema::Ok(id) => id,
2311 CaESchema::ExpectedMismatch {
2313 schema_id,
2314 key,
2315 val,
2316 } => {
2317 mz_ore::soft_panic_or_log!(
2318 "schema expectation mismatch {schema_id:?}, {key:?}, {val:?}"
2319 );
2320 return Err(StorageError::Generic(anyhow::anyhow!(
2321 "schema expected mismatch, {existing_collection:?}",
2322 )));
2323 }
2324 CaESchema::Incompatible => {
2325 mz_ore::soft_panic_or_log!(
2326 "incompatible schema! {existing_collection} {new_desc:?}"
2327 );
2328 return Err(StorageError::Generic(anyhow::anyhow!(
2329 "schema incompatible, {existing_collection:?}"
2330 )));
2331 }
2332 };
2333
2334 let (write_handle, since_handle) = self
2336 .open_data_handles(
2337 &new_collection,
2338 data_shard,
2339 None,
2340 new_desc.clone(),
2341 &persist_client,
2342 )
2343 .await;
2344
2345 {
2351 let mut self_collections = self.collections.lock().expect("lock poisoned");
2352
2353 let existing = self_collections
2355 .get_mut(&existing_collection)
2356 .expect("existing collection missing");
2357
2358 assert!(matches!(
2360 existing.description.data_source,
2361 DataSource::Table { primary: None }
2362 ));
2363
2364 existing.description.data_source = DataSource::Table {
2366 primary: Some(new_collection),
2367 };
2368 existing.storage_dependencies.push(new_collection);
2369
2370 let implied_capability = existing.read_capabilities.frontier().to_owned();
2374 let write_frontier = existing.write_frontier.clone();
2375
2376 let mut changes = ChangeBatch::new();
2383 changes.extend(implied_capability.iter().map(|t| (t.clone(), 1)));
2384
2385 let collection_desc = CollectionDescription::for_table(new_desc.clone(), None);
2387 let collection_meta = CollectionMetadata {
2388 persist_location: self.persist_location.clone(),
2389 relation_desc: collection_desc.desc.clone(),
2390 data_shard,
2391 remap_shard: None,
2393 txns_shard: Some(self.txns_read.txns_id().clone()),
2394 };
2395 let collection_state = CollectionState::new(
2396 collection_desc,
2397 implied_capability,
2398 write_frontier,
2399 Vec::new(),
2400 collection_meta,
2401 );
2402
2403 self_collections.insert(new_collection, collection_state);
2405
2406 let mut updates = BTreeMap::from([(new_collection, changes)]);
2407 StorageCollectionsImpl::update_read_capabilities_inner(
2408 &self.cmd_tx,
2409 &mut *self_collections,
2410 &mut updates,
2411 );
2412 };
2413
2414 self.register_handles(new_collection, true, since_handle, write_handle);
2416
2417 info!(%existing_collection, %new_collection, ?new_desc, "altered table");
2418
2419 Ok(())
2420 }
2421
2422 fn drop_collections_unvalidated(
2423 &self,
2424 storage_metadata: &StorageMetadata,
2425 identifiers: Vec<GlobalId>,
2426 ) {
2427 debug!(?identifiers, "drop_collections_unvalidated");
2428
2429 let mut self_collections = self.collections.lock().expect("lock poisoned");
2430
2431 for id in identifiers.iter() {
2432 let metadata = storage_metadata.get_collection_shard::<T>(*id);
2433 mz_ore::soft_assert_or_log!(
2434 matches!(metadata, Err(StorageError::IdentifierMissing(_))),
2435 "dropping {id}, but drop was not synchronized with storage \
2436 controller via `synchronize_collections`"
2437 );
2438
2439 let dropped_data_source = match self_collections.get(id) {
2440 Some(col) => col.description.data_source.clone(),
2441 None => continue,
2442 };
2443
2444 if let DataSource::IngestionExport { ingestion_id, .. } = dropped_data_source {
2447 let ingestion = match self_collections.get_mut(&ingestion_id) {
2449 Some(ingestion) => ingestion,
2450 None => {
2452 tracing::error!(
2453 "primary source {ingestion_id} seemingly dropped before subsource {id}",
2454 );
2455 continue;
2456 }
2457 };
2458
2459 match &mut ingestion.description {
2460 CollectionDescription {
2461 data_source: DataSource::Ingestion(ingestion_desc),
2462 ..
2463 } => {
2464 let removed = ingestion_desc.source_exports.remove(id);
2465 mz_ore::soft_assert_or_log!(
2466 removed.is_some(),
2467 "dropped subsource {id} already removed from source exports"
2468 );
2469 }
2470 _ => unreachable!(
2471 "SourceExport must only refer to primary sources that already exist"
2472 ),
2473 };
2474 }
2475 }
2476
2477 let mut finalized_policies = Vec::new();
2485
2486 for id in identifiers {
2487 if self_collections.contains_key(&id) {
2489 finalized_policies.push((id, ReadPolicy::ValidFrom(Antichain::new())));
2490 }
2491 }
2492 self.set_read_policies_inner(&mut self_collections, finalized_policies);
2493
2494 drop(self_collections);
2495
2496 self.synchronize_finalized_shards(storage_metadata);
2497 }
2498
2499 fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
2500 let mut collections = self.collections.lock().expect("lock poisoned");
2501
2502 if tracing::enabled!(tracing::Level::TRACE) {
2503 let user_capabilities = collections
2504 .iter_mut()
2505 .filter(|(id, _c)| id.is_user())
2506 .map(|(id, c)| {
2507 let updates = c.read_capabilities.updates().cloned().collect_vec();
2508 (*id, c.implied_capability.clone(), updates)
2509 })
2510 .collect_vec();
2511
2512 trace!(?policies, ?user_capabilities, "set_read_policies");
2513 }
2514
2515 self.set_read_policies_inner(&mut collections, policies);
2516
2517 if tracing::enabled!(tracing::Level::TRACE) {
2518 let user_capabilities = collections
2519 .iter_mut()
2520 .filter(|(id, _c)| id.is_user())
2521 .map(|(id, c)| {
2522 let updates = c.read_capabilities.updates().cloned().collect_vec();
2523 (*id, c.implied_capability.clone(), updates)
2524 })
2525 .collect_vec();
2526
2527 trace!(?user_capabilities, "after! set_read_policies");
2528 }
2529 }
2530
2531 fn acquire_read_holds(
2532 &self,
2533 desired_holds: Vec<GlobalId>,
2534 ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError> {
2535 let mut collections = self.collections.lock().expect("lock poisoned");
2536
2537 let mut advanced_holds = Vec::new();
2538 for id in desired_holds.iter() {
2549 let collection = collections
2550 .get(id)
2551 .ok_or(ReadHoldError::CollectionMissing(*id))?;
2552 let since = collection.read_capabilities.frontier().to_owned();
2553 advanced_holds.push((*id, since));
2554 }
2555
2556 let mut updates = advanced_holds
2557 .iter()
2558 .map(|(id, hold)| {
2559 let mut changes = ChangeBatch::new();
2560 changes.extend(hold.iter().map(|time| (time.clone(), 1)));
2561 (*id, changes)
2562 })
2563 .collect::<BTreeMap<_, _>>();
2564
2565 StorageCollectionsImpl::update_read_capabilities_inner(
2566 &self.cmd_tx,
2567 &mut collections,
2568 &mut updates,
2569 );
2570
2571 let acquired_holds = advanced_holds
2572 .into_iter()
2573 .map(|(id, since)| ReadHold::with_channel(id, since, self.holds_tx.clone()))
2574 .collect_vec();
2575
2576 trace!(?desired_holds, ?acquired_holds, "acquire_read_holds");
2577
2578 Ok(acquired_holds)
2579 }
2580
2581 fn determine_time_dependence(
2583 &self,
2584 id: GlobalId,
2585 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
2586 use TimeDependenceError::CollectionMissing;
2587 let collections = self.collections.lock().expect("lock poisoned");
2588 let mut collection = Some(collections.get(&id).ok_or(CollectionMissing(id))?);
2589
2590 let mut result = None;
2591
2592 while let Some(c) = collection.take() {
2593 use DataSource::*;
2594 if let Some(timeline) = &c.description.timeline {
2595 if *timeline != Timeline::EpochMilliseconds {
2597 break;
2598 }
2599 }
2600 match &c.description.data_source {
2601 Ingestion(ingestion) => {
2602 use GenericSourceConnection::*;
2603 match ingestion.desc.connection {
2604 Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => {
2607 result = Some(TimeDependence::default())
2608 }
2609 LoadGenerator(_) => {}
2611 }
2612 }
2613 IngestionExport { ingestion_id, .. } => {
2614 let c = collections
2615 .get(ingestion_id)
2616 .ok_or(CollectionMissing(*ingestion_id))?;
2617 collection = Some(c);
2618 }
2619 Introspection(_) | Progress | Table { .. } | Webhook { .. } => {
2621 result = Some(TimeDependence::default())
2622 }
2623 Other => {}
2625 Sink { .. } => {}
2626 };
2627 }
2628 Ok(result)
2629 }
2630}
2631
2632#[derive(Debug)]
2639enum SinceHandleWrapper<T>
2640where
2641 T: TimelyTimestamp + Lattice + Codec64,
2642{
2643 Critical(SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch>),
2644 Leased(ReadHandle<SourceData, (), T, StorageDiff>),
2645}
2646
2647impl<T> SinceHandleWrapper<T>
2648where
2649 T: TimelyTimestamp + Lattice + Codec64 + TotalOrder + Sync,
2650{
2651 pub fn since(&self) -> &Antichain<T> {
2652 match self {
2653 Self::Critical(handle) => handle.since(),
2654 Self::Leased(handle) => handle.since(),
2655 }
2656 }
2657
2658 pub fn opaque(&self) -> PersistEpoch {
2659 match self {
2660 Self::Critical(handle) => handle.opaque().clone(),
2661 Self::Leased(_handle) => {
2662 PersistEpoch(None)
2667 }
2668 }
2669 }
2670
2671 pub async fn compare_and_downgrade_since(
2672 &mut self,
2673 expected: &PersistEpoch,
2674 new: (&PersistEpoch, &Antichain<T>),
2675 ) -> Result<Antichain<T>, PersistEpoch> {
2676 match self {
2677 Self::Critical(handle) => handle.compare_and_downgrade_since(expected, new).await,
2678 Self::Leased(handle) => {
2679 let (opaque, since) = new;
2680 assert_none!(opaque.0);
2681
2682 handle.downgrade_since(since).await;
2683
2684 Ok(since.clone())
2685 }
2686 }
2687 }
2688
2689 pub async fn maybe_compare_and_downgrade_since(
2690 &mut self,
2691 expected: &PersistEpoch,
2692 new: (&PersistEpoch, &Antichain<T>),
2693 ) -> Option<Result<Antichain<T>, PersistEpoch>> {
2694 match self {
2695 Self::Critical(handle) => {
2696 handle
2697 .maybe_compare_and_downgrade_since(expected, new)
2698 .await
2699 }
2700 Self::Leased(handle) => {
2701 let (opaque, since) = new;
2702 assert_none!(opaque.0);
2703
2704 handle.maybe_downgrade_since(since).await;
2705
2706 Some(Ok(since.clone()))
2707 }
2708 }
2709 }
2710
2711 pub fn snapshot_stats(
2712 &self,
2713 id: GlobalId,
2714 as_of: Option<Antichain<T>>,
2715 ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2716 match self {
2717 Self::Critical(handle) => {
2718 let res = handle
2719 .snapshot_stats(as_of)
2720 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2721 Box::pin(res)
2722 }
2723 Self::Leased(handle) => {
2724 let res = handle
2725 .snapshot_stats(as_of)
2726 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2727 Box::pin(res)
2728 }
2729 }
2730 }
2731
2732 pub fn snapshot_stats_from_txn(
2733 &self,
2734 id: GlobalId,
2735 data_snapshot: DataSnapshot<T>,
2736 ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2737 match self {
2738 Self::Critical(handle) => Box::pin(
2739 data_snapshot
2740 .snapshot_stats_from_critical(handle)
2741 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2742 ),
2743 Self::Leased(handle) => Box::pin(
2744 data_snapshot
2745 .snapshot_stats_from_leased(handle)
2746 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2747 ),
2748 }
2749 }
2750}
2751
2752#[derive(Debug, Clone)]
2754struct CollectionState<T> {
2755 pub description: CollectionDescription<T>,
2757
2758 pub read_capabilities: MutableAntichain<T>,
2764
2765 pub implied_capability: Antichain<T>,
2769
2770 pub read_policy: ReadPolicy<T>,
2772
2773 pub storage_dependencies: Vec<GlobalId>,
2775
2776 pub write_frontier: Antichain<T>,
2778
2779 pub collection_metadata: CollectionMetadata,
2780}
2781
2782impl<T: TimelyTimestamp> CollectionState<T> {
2783 pub fn new(
2786 description: CollectionDescription<T>,
2787 since: Antichain<T>,
2788 write_frontier: Antichain<T>,
2789 storage_dependencies: Vec<GlobalId>,
2790 metadata: CollectionMetadata,
2791 ) -> Self {
2792 let mut read_capabilities = MutableAntichain::new();
2793 read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2794 Self {
2795 description,
2796 read_capabilities,
2797 implied_capability: since.clone(),
2798 read_policy: ReadPolicy::NoPolicy {
2799 initial_since: since,
2800 },
2801 storage_dependencies,
2802 write_frontier,
2803 collection_metadata: metadata,
2804 }
2805 }
2806
2807 pub fn is_dropped(&self) -> bool {
2809 self.read_capabilities.is_empty()
2810 }
2811}
2812
2813#[derive(Debug)]
2819struct BackgroundTask<T: TimelyTimestamp + Lattice + Codec64> {
2820 config: Arc<Mutex<StorageConfiguration>>,
2821 cmds_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
2822 cmds_rx: mpsc::UnboundedReceiver<BackgroundCmd<T>>,
2823 holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch<T>)>,
2824 finalizable_shards: Arc<ShardIdSet>,
2825 collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
2826 shard_by_id: BTreeMap<GlobalId, ShardId>,
2829 since_handles: BTreeMap<GlobalId, SinceHandleWrapper<T>>,
2830 txns_handle: Option<WriteHandle<SourceData, (), T, StorageDiff>>,
2831 txns_shards: BTreeSet<GlobalId>,
2832}
2833
2834#[derive(Debug)]
2835enum BackgroundCmd<T: TimelyTimestamp + Lattice + Codec64> {
2836 Register {
2837 id: GlobalId,
2838 is_in_txns: bool,
2839 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
2840 since_handle: SinceHandleWrapper<T>,
2841 },
2842 DowngradeSince(Vec<(GlobalId, Antichain<T>)>),
2843 SnapshotStats(
2844 GlobalId,
2845 SnapshotStatsAsOf<T>,
2846 oneshot::Sender<SnapshotStatsRes<T>>,
2847 ),
2848}
2849
2850pub(crate) struct SnapshotStatsRes<T>(BoxFuture<'static, Result<SnapshotStats, StorageError<T>>>);
2852
2853impl<T> Debug for SnapshotStatsRes<T> {
2854 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2855 f.debug_struct("SnapshotStatsRes").finish_non_exhaustive()
2856 }
2857}
2858
2859impl<T> BackgroundTask<T>
2860where
2861 T: TimelyTimestamp
2862 + Lattice
2863 + Codec64
2864 + From<EpochMillis>
2865 + TimestampManipulation
2866 + Into<mz_repr::Timestamp>
2867 + Sync,
2868{
2869 async fn run(&mut self) {
2870 let mut upper_futures: FuturesUnordered<
2872 std::pin::Pin<
2873 Box<
2874 dyn Future<
2875 Output = (
2876 GlobalId,
2877 WriteHandle<SourceData, (), T, StorageDiff>,
2878 Antichain<T>,
2879 ),
2880 > + Send,
2881 >,
2882 >,
2883 > = FuturesUnordered::new();
2884
2885 let gen_upper_future =
2886 |id, mut handle: WriteHandle<_, _, _, _>, prev_upper: Antichain<T>| {
2887 let fut = async move {
2888 soft_assert_or_log!(
2889 !prev_upper.is_empty(),
2890 "cannot await progress when upper is already empty"
2891 );
2892 handle.wait_for_upper_past(&prev_upper).await;
2893 let new_upper = handle.shared_upper();
2894 (id, handle, new_upper)
2895 };
2896
2897 fut
2898 };
2899
2900 let mut txns_upper_future = match self.txns_handle.take() {
2901 Some(txns_handle) => {
2902 let upper = txns_handle.upper().clone();
2903 let txns_upper_future =
2904 gen_upper_future(GlobalId::Transient(1), txns_handle, upper);
2905 txns_upper_future.boxed()
2906 }
2907 None => async { std::future::pending().await }.boxed(),
2908 };
2909
2910 loop {
2911 tokio::select! {
2912 (id, handle, upper) = &mut txns_upper_future => {
2913 trace!("new upper from txns shard: {:?}", upper);
2914 let mut uppers = Vec::new();
2915 for id in self.txns_shards.iter() {
2916 uppers.push((*id, &upper));
2917 }
2918 self.update_write_frontiers(&uppers).await;
2919
2920 let fut = gen_upper_future(id, handle, upper);
2921 txns_upper_future = fut.boxed();
2922 }
2923 Some((id, handle, upper)) = upper_futures.next() => {
2924 if id.is_user() {
2925 trace!("new upper for collection {id}: {:?}", upper);
2926 }
2927 let current_shard = self.shard_by_id.get(&id);
2928 if let Some(shard_id) = current_shard {
2929 if shard_id == &handle.shard_id() {
2930 let uppers = &[(id, &upper)];
2933 self.update_write_frontiers(uppers).await;
2934 if !upper.is_empty() {
2935 let fut = gen_upper_future(id, handle, upper);
2936 upper_futures.push(fut.boxed());
2937 }
2938 } else {
2939 handle.expire().await;
2943 }
2944 }
2945 }
2946 cmd = self.cmds_rx.recv() => {
2947 let cmd = if let Some(cmd) = cmd {
2948 cmd
2949 } else {
2950 break;
2952 };
2953
2954 match cmd {
2955 BackgroundCmd::Register{ id, is_in_txns, write_handle, since_handle } => {
2956 debug!("registering handles for {}", id);
2957 let previous = self.shard_by_id.insert(id, write_handle.shard_id());
2958 if previous.is_some() {
2959 panic!("already registered a WriteHandle for collection {id}");
2960 }
2961
2962 let previous = self.since_handles.insert(id, since_handle);
2963 if previous.is_some() {
2964 panic!("already registered a SinceHandle for collection {id}");
2965 }
2966
2967 if is_in_txns {
2968 self.txns_shards.insert(id);
2969 } else {
2970 let upper = write_handle.upper().clone();
2971 if !upper.is_empty() {
2972 let fut = gen_upper_future(id, write_handle, upper);
2973 upper_futures.push(fut.boxed());
2974 }
2975 }
2976
2977 }
2978 BackgroundCmd::DowngradeSince(cmds) => {
2979 self.downgrade_sinces(cmds).await;
2980 }
2981 BackgroundCmd::SnapshotStats(id, as_of, tx) => {
2982 let res = match self.since_handles.get(&id) {
2988 Some(x) => {
2989 let fut: BoxFuture<
2990 'static,
2991 Result<SnapshotStats, StorageError<T>>,
2992 > = match as_of {
2993 SnapshotStatsAsOf::Direct(as_of) => {
2994 x.snapshot_stats(id, Some(as_of))
2995 }
2996 SnapshotStatsAsOf::Txns(data_snapshot) => {
2997 x.snapshot_stats_from_txn(id, data_snapshot)
2998 }
2999 };
3000 SnapshotStatsRes(fut)
3001 }
3002 None => SnapshotStatsRes(Box::pin(futures::future::ready(Err(
3003 StorageError::IdentifierMissing(id),
3004 )))),
3005 };
3006 let _ = tx.send(res);
3008 }
3009 }
3010 }
3011 Some(holds_changes) = self.holds_rx.recv() => {
3012 let mut batched_changes = BTreeMap::new();
3013 batched_changes.insert(holds_changes.0, holds_changes.1);
3014
3015 while let Ok(mut holds_changes) = self.holds_rx.try_recv() {
3016 let entry = batched_changes.entry(holds_changes.0);
3017 entry
3018 .and_modify(|existing| existing.extend(holds_changes.1.drain()))
3019 .or_insert_with(|| holds_changes.1);
3020 }
3021
3022 let mut collections = self.collections.lock().expect("lock poisoned");
3023
3024 let user_changes = batched_changes
3025 .iter()
3026 .filter(|(id, _c)| id.is_user())
3027 .map(|(id, c)| {
3028 (id.clone(), c.clone())
3029 })
3030 .collect_vec();
3031
3032 if !user_changes.is_empty() {
3033 trace!(?user_changes, "applying holds changes from channel");
3034 }
3035
3036 StorageCollectionsImpl::update_read_capabilities_inner(
3037 &self.cmds_tx,
3038 &mut collections,
3039 &mut batched_changes,
3040 );
3041 }
3042 }
3043 }
3044
3045 warn!("BackgroundTask shutting down");
3046 }
3047
3048 #[instrument(level = "debug")]
3049 async fn update_write_frontiers(&mut self, updates: &[(GlobalId, &Antichain<T>)]) {
3050 let mut read_capability_changes = BTreeMap::default();
3051
3052 let mut self_collections = self.collections.lock().expect("lock poisoned");
3053
3054 for (id, new_upper) in updates.iter() {
3055 let collection = if let Some(c) = self_collections.get_mut(id) {
3056 c
3057 } else {
3058 trace!(
3059 "Reference to absent collection {id}, due to concurrent removal of that collection"
3060 );
3061 continue;
3062 };
3063
3064 if PartialOrder::less_than(&collection.write_frontier, *new_upper) {
3065 collection.write_frontier.clone_from(new_upper);
3066 }
3067
3068 let mut new_read_capability = collection
3069 .read_policy
3070 .frontier(collection.write_frontier.borrow());
3071
3072 if id.is_user() {
3073 trace!(
3074 %id,
3075 implied_capability = ?collection.implied_capability,
3076 policy = ?collection.read_policy,
3077 write_frontier = ?collection.write_frontier,
3078 ?new_read_capability,
3079 "update_write_frontiers");
3080 }
3081
3082 if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
3083 let mut update = ChangeBatch::new();
3084 update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
3085 std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
3086 update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
3087
3088 if !update.is_empty() {
3089 read_capability_changes.insert(*id, update);
3090 }
3091 }
3092 }
3093
3094 if !read_capability_changes.is_empty() {
3095 StorageCollectionsImpl::update_read_capabilities_inner(
3096 &self.cmds_tx,
3097 &mut self_collections,
3098 &mut read_capability_changes,
3099 );
3100 }
3101 }
3102
3103 async fn downgrade_sinces(&mut self, cmds: Vec<(GlobalId, Antichain<T>)>) {
3104 for (id, new_since) in cmds {
3105 let since_handle = if let Some(c) = self.since_handles.get_mut(&id) {
3106 c
3107 } else {
3108 trace!("downgrade_sinces: reference to absent collection {id}");
3110 continue;
3111 };
3112
3113 if id.is_user() {
3114 trace!("downgrading since of {} to {:?}", id, new_since);
3115 }
3116
3117 let epoch = since_handle.opaque().clone();
3118 let result = if new_since.is_empty() {
3119 let res = Some(
3123 since_handle
3124 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3125 .await,
3126 );
3127
3128 info!(%id, "removing persist handles because the since advanced to []!");
3129
3130 let _since_handle = self.since_handles.remove(&id).expect("known to exist");
3131 let dropped_shard_id = if let Some(shard_id) = self.shard_by_id.remove(&id) {
3132 shard_id
3133 } else {
3134 panic!("missing GlobalId -> ShardId mapping for id {id}");
3135 };
3136
3137 self.txns_shards.remove(&id);
3142
3143 if !self
3144 .config
3145 .lock()
3146 .expect("lock poisoned")
3147 .parameters
3148 .finalize_shards
3149 {
3150 info!(
3151 "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3152 );
3153 return;
3154 }
3155
3156 info!(%id, %dropped_shard_id, "enqueing shard finalization due to dropped collection and dropped persist handle");
3157
3158 self.finalizable_shards.lock().insert(dropped_shard_id);
3159
3160 res
3161 } else {
3162 since_handle
3163 .maybe_compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3164 .await
3165 };
3166
3167 if let Some(Err(other_epoch)) = result {
3168 mz_ore::halt!("fenced by envd @ {other_epoch:?}. ours = {epoch:?}");
3169 }
3170 }
3171 }
3172}
3173
3174struct FinalizeShardsTaskConfig {
3175 envd_epoch: NonZeroI64,
3176 config: Arc<Mutex<StorageConfiguration>>,
3177 metrics: StorageCollectionsMetrics,
3178 finalizable_shards: Arc<ShardIdSet>,
3179 finalized_shards: Arc<ShardIdSet>,
3180 persist_location: PersistLocation,
3181 persist: Arc<PersistClientCache>,
3182 read_only: bool,
3183}
3184
3185async fn finalize_shards_task<T>(
3186 FinalizeShardsTaskConfig {
3187 envd_epoch,
3188 config,
3189 metrics,
3190 finalizable_shards,
3191 finalized_shards,
3192 persist_location,
3193 persist,
3194 read_only,
3195 }: FinalizeShardsTaskConfig,
3196) where
3197 T: TimelyTimestamp + Lattice + Codec64 + Sync,
3198{
3199 if read_only {
3200 info!("disabling shard finalization in read only mode");
3201 return;
3202 }
3203
3204 let mut interval = tokio::time::interval(Duration::from_secs(5));
3205 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
3206 loop {
3207 interval.tick().await;
3208
3209 if !config
3210 .lock()
3211 .expect("lock poisoned")
3212 .parameters
3213 .finalize_shards
3214 {
3215 debug!(
3216 "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3217 );
3218 continue;
3219 }
3220
3221 let current_finalizable_shards = {
3222 finalizable_shards.lock().iter().cloned().collect_vec()
3225 };
3226
3227 if current_finalizable_shards.is_empty() {
3228 debug!("no shards to finalize");
3229 continue;
3230 }
3231
3232 debug!(?current_finalizable_shards, "attempting to finalize shards");
3233
3234 let persist_client = persist.open(persist_location.clone()).await.unwrap();
3236
3237 let metrics = &metrics;
3238 let finalizable_shards = &finalizable_shards;
3239 let finalized_shards = &finalized_shards;
3240 let persist_client = &persist_client;
3241 let diagnostics = &Diagnostics::from_purpose("finalizing shards");
3242
3243 let force_downgrade_since = STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION
3244 .get(config.lock().expect("lock poisoned").config_set());
3245
3246 let epoch = &PersistEpoch::from(envd_epoch);
3247
3248 futures::stream::iter(current_finalizable_shards.clone())
3249 .map(|shard_id| async move {
3250 let persist_client = persist_client.clone();
3251 let diagnostics = diagnostics.clone();
3252 let epoch = epoch.clone();
3253
3254 metrics.finalization_started.inc();
3255
3256 let is_finalized = persist_client
3257 .is_finalized::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
3258 .await
3259 .expect("invalid persist usage");
3260
3261 if is_finalized {
3262 debug!(%shard_id, "shard is already finalized!");
3263 Some(shard_id)
3264 } else {
3265 debug!(%shard_id, "finalizing shard");
3266 let finalize = || async move {
3267 let diagnostics = Diagnostics::from_purpose("finalizing shards");
3269
3270 let schemas = persist_client.latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics.clone()).await.expect("codecs have not changed");
3271 let (key_schema, val_schema) = match schemas {
3272 Some((_, key_schema, val_schema)) => (key_schema, val_schema),
3273 None => (RelationDesc::empty(), UnitSchema),
3274 };
3275
3276 let empty_batch: Vec<((SourceData, ()), T, StorageDiff)> = vec![];
3277 let mut write_handle: WriteHandle<SourceData, (), T, StorageDiff> =
3278 persist_client
3279 .open_writer(
3280 shard_id,
3281 Arc::new(key_schema),
3282 Arc::new(val_schema),
3283 diagnostics,
3284 )
3285 .await
3286 .expect("invalid persist usage");
3287
3288 let upper = write_handle.upper();
3289
3290 if !upper.is_empty() {
3291 let append = write_handle
3292 .append(empty_batch, upper.clone(), Antichain::new())
3293 .await?;
3294
3295 if let Err(e) = append {
3296 warn!(%shard_id, "tried to finalize a shard with an advancing upper: {e:?}");
3297 return Ok(());
3298 }
3299 }
3300 write_handle.expire().await;
3301
3302 if force_downgrade_since {
3303 let mut since_handle: SinceHandle<
3304 SourceData,
3305 (),
3306 T,
3307 StorageDiff,
3308 PersistEpoch,
3309 > = persist_client
3310 .open_critical_since(
3311 shard_id,
3312 PersistClient::CONTROLLER_CRITICAL_SINCE,
3313 Diagnostics::from_purpose("finalizing shards"),
3314 )
3315 .await
3316 .expect("invalid persist usage");
3317 let handle_epoch = since_handle.opaque().clone();
3318 let our_epoch = epoch.clone();
3319 let epoch = if our_epoch.0 > handle_epoch.0 {
3320 handle_epoch
3323 } else {
3324 our_epoch
3329 };
3330 let new_since = Antichain::new();
3331 let downgrade = since_handle
3332 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3333 .await;
3334 if let Err(e) = downgrade {
3335 warn!(
3336 "tried to finalize a shard with an advancing epoch: {e:?}"
3337 );
3338 return Ok(());
3339 }
3340 }
3343
3344 persist_client
3345 .finalize_shard::<SourceData, (), T, StorageDiff>(
3346 shard_id,
3347 Diagnostics::from_purpose("finalizing shards"),
3348 )
3349 .await
3350 };
3351
3352 match finalize().await {
3353 Err(e) => {
3354 warn!("error during finalization of shard {shard_id}: {e:?}");
3357 None
3358 }
3359 Ok(()) => {
3360 debug!(%shard_id, "finalize success!");
3361 Some(shard_id)
3362 }
3363 }
3364 }
3365 })
3366 .buffer_unordered(10)
3371 .for_each(|shard_id| async move {
3375 match shard_id {
3376 None => metrics.finalization_failed.inc(),
3377 Some(shard_id) => {
3378 {
3385 let mut finalizable_shards = finalizable_shards.lock();
3386 let mut finalized_shards = finalized_shards.lock();
3387 finalizable_shards.remove(&shard_id);
3388 finalized_shards.insert(shard_id);
3389 }
3390
3391 metrics.finalization_succeeded.inc();
3392 }
3393 }
3394 })
3395 .await;
3396
3397 debug!("done finalizing shards");
3398 }
3399}
3400
3401#[derive(Debug)]
3402pub(crate) enum SnapshotStatsAsOf<T: TimelyTimestamp + Lattice + Codec64> {
3403 Direct(Antichain<T>),
3406 Txns(DataSnapshot<T>),
3409}
3410
3411#[cfg(test)]
3412mod tests {
3413 use std::str::FromStr;
3414 use std::sync::Arc;
3415
3416 use mz_build_info::DUMMY_BUILD_INFO;
3417 use mz_dyncfg::ConfigSet;
3418 use mz_ore::assert_err;
3419 use mz_ore::metrics::{MetricsRegistry, UIntGauge};
3420 use mz_ore::now::SYSTEM_TIME;
3421 use mz_ore::url::SensitiveUrl;
3422 use mz_persist_client::cache::PersistClientCache;
3423 use mz_persist_client::cfg::PersistConfig;
3424 use mz_persist_client::rpc::PubSubClientConnection;
3425 use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
3426 use mz_persist_types::codec_impls::UnitSchema;
3427 use mz_repr::{RelationDesc, Row};
3428 use mz_secrets::InMemorySecretsController;
3429
3430 use super::*;
3431
3432 #[mz_ore::test(tokio::test)]
3433 #[cfg_attr(miri, ignore)] async fn test_snapshot_stats(&self) {
3435 let persist_location = PersistLocation {
3436 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3437 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3438 };
3439 let persist_client = PersistClientCache::new(
3440 PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
3441 &MetricsRegistry::new(),
3442 |_, _| PubSubClientConnection::noop(),
3443 );
3444 let persist_client = Arc::new(persist_client);
3445
3446 let (cmds_tx, mut background_task) =
3447 BackgroundTask::new_for_test(persist_location.clone(), Arc::clone(&persist_client));
3448 let background_task =
3449 mz_ore::task::spawn(|| "storage_collections::background_task", async move {
3450 background_task.run().await
3451 });
3452
3453 let persist = persist_client.open(persist_location).await.unwrap();
3454
3455 let shard_id = ShardId::new();
3456 let since_handle = persist
3457 .open_critical_since(
3458 shard_id,
3459 PersistClient::CONTROLLER_CRITICAL_SINCE,
3460 Diagnostics::for_tests(),
3461 )
3462 .await
3463 .unwrap();
3464 let write_handle = persist
3465 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3466 shard_id,
3467 Arc::new(RelationDesc::empty()),
3468 Arc::new(UnitSchema),
3469 Diagnostics::for_tests(),
3470 )
3471 .await
3472 .unwrap();
3473
3474 cmds_tx
3475 .send(BackgroundCmd::Register {
3476 id: GlobalId::User(1),
3477 is_in_txns: false,
3478 since_handle: SinceHandleWrapper::Critical(since_handle),
3479 write_handle,
3480 })
3481 .unwrap();
3482
3483 let mut write_handle = persist
3484 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3485 shard_id,
3486 Arc::new(RelationDesc::empty()),
3487 Arc::new(UnitSchema),
3488 Diagnostics::for_tests(),
3489 )
3490 .await
3491 .unwrap();
3492
3493 let stats =
3495 snapshot_stats(&cmds_tx, GlobalId::User(2), Antichain::from_elem(0.into())).await;
3496 assert_err!(stats);
3497
3498 let stats_fut = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3500 assert_none!(stats_fut.now_or_never());
3501
3502 let stats_ts1_fut =
3504 snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3505
3506 let data = (
3508 (SourceData(Ok(Row::default())), ()),
3509 mz_repr::Timestamp::from(0),
3510 1i64,
3511 );
3512 let () = write_handle
3513 .compare_and_append(
3514 &[data],
3515 Antichain::from_elem(0.into()),
3516 Antichain::from_elem(1.into()),
3517 )
3518 .await
3519 .unwrap()
3520 .unwrap();
3521
3522 let stats = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(0.into()))
3524 .await
3525 .unwrap();
3526 assert_eq!(stats.num_updates, 1);
3527
3528 let data = (
3530 (SourceData(Ok(Row::default())), ()),
3531 mz_repr::Timestamp::from(1),
3532 1i64,
3533 );
3534 let () = write_handle
3535 .compare_and_append(
3536 &[data],
3537 Antichain::from_elem(1.into()),
3538 Antichain::from_elem(2.into()),
3539 )
3540 .await
3541 .unwrap()
3542 .unwrap();
3543
3544 let stats = stats_ts1_fut.await.unwrap();
3545 assert_eq!(stats.num_updates, 2);
3546
3547 drop(background_task);
3549 }
3550
3551 async fn snapshot_stats<T: TimelyTimestamp + Lattice + Codec64>(
3552 cmds_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
3553 id: GlobalId,
3554 as_of: Antichain<T>,
3555 ) -> Result<SnapshotStats, StorageError<T>> {
3556 let (tx, rx) = oneshot::channel();
3557 cmds_tx
3558 .send(BackgroundCmd::SnapshotStats(
3559 id,
3560 SnapshotStatsAsOf::Direct(as_of),
3561 tx,
3562 ))
3563 .unwrap();
3564 let res = rx.await.expect("BackgroundTask should be live").0;
3565
3566 res.await
3567 }
3568
3569 impl<T: TimelyTimestamp + Lattice + Codec64> BackgroundTask<T> {
3570 fn new_for_test(
3571 _persist_location: PersistLocation,
3572 _persist_client: Arc<PersistClientCache>,
3573 ) -> (mpsc::UnboundedSender<BackgroundCmd<T>>, Self) {
3574 let (cmds_tx, cmds_rx) = mpsc::unbounded_channel();
3575 let (_holds_tx, holds_rx) = mpsc::unbounded_channel();
3576 let connection_context =
3577 ConnectionContext::for_tests(Arc::new(InMemorySecretsController::new()));
3578
3579 let task = Self {
3580 config: Arc::new(Mutex::new(StorageConfiguration::new(
3581 connection_context,
3582 ConfigSet::default(),
3583 ))),
3584 cmds_tx: cmds_tx.clone(),
3585 cmds_rx,
3586 holds_rx,
3587 finalizable_shards: Arc::new(ShardIdSet::new(
3588 UIntGauge::new("finalizable_shards", "dummy gauge for tests").unwrap(),
3589 )),
3590 collections: Arc::new(Mutex::new(BTreeMap::new())),
3591 shard_by_id: BTreeMap::new(),
3592 since_handles: BTreeMap::new(),
3593 txns_handle: None,
3594 txns_shards: BTreeSet::new(),
3595 };
3596
3597 (cmds_tx, task)
3598 }
3599 }
3600}