1use std::cmp::Reverse;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::num::NonZeroI64;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use async_trait::async_trait;
20use differential_dataflow::lattice::Lattice;
21use futures::future::BoxFuture;
22use futures::stream::{BoxStream, FuturesUnordered};
23use futures::{Future, FutureExt, StreamExt};
24use itertools::Itertools;
25
26use mz_ore::collections::CollectionExt;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::now::{EpochMillis, NowFn};
29use mz_ore::task::AbortOnDropHandle;
30use mz_ore::{assert_none, instrument, soft_assert_or_log};
31use mz_persist_client::cache::PersistClientCache;
32use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
33use mz_persist_client::critical::SinceHandle;
34use mz_persist_client::read::{Cursor, ReadHandle};
35use mz_persist_client::schema::CaESchema;
36use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
37use mz_persist_client::write::WriteHandle;
38use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
39use mz_persist_types::Codec64;
40use mz_persist_types::codec_impls::UnitSchema;
41use mz_persist_types::txn::TxnsCodec;
42use mz_repr::{GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
43use mz_storage_types::StorageDiff;
44use mz_storage_types::configuration::StorageConfiguration;
45use mz_storage_types::connections::ConnectionContext;
46use mz_storage_types::connections::inline::InlinedConnection;
47use mz_storage_types::controller::{CollectionMetadata, StorageError, TxnsCodecRow};
48use mz_storage_types::dyncfgs::STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION;
49use mz_storage_types::parameters::StorageParameters;
50use mz_storage_types::read_holds::{ReadHold, ReadHoldError};
51use mz_storage_types::read_policy::ReadPolicy;
52use mz_storage_types::sources::{
53 GenericSourceConnection, SourceData, SourceDesc, SourceEnvelope, SourceExport,
54 SourceExportDataConfig, Timeline,
55};
56use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
57use mz_txn_wal::metrics::Metrics as TxnMetrics;
58use mz_txn_wal::txn_read::{DataSnapshot, TxnsRead};
59use mz_txn_wal::txns::TxnsHandle;
60use timely::PartialOrder;
61use timely::order::TotalOrder;
62use timely::progress::frontier::MutableAntichain;
63use timely::progress::{Antichain, ChangeBatch, Timestamp as TimelyTimestamp};
64use tokio::sync::{mpsc, oneshot};
65use tokio::time::MissedTickBehavior;
66use tracing::{debug, info, trace, warn};
67
68use crate::client::TimestamplessUpdateBuilder;
69use crate::controller::{
70 CollectionDescription, DataSource, PersistEpoch, StorageMetadata, StorageTxn,
71};
72use crate::storage_collections::metrics::{ShardIdSet, StorageCollectionsMetrics};
73
74mod metrics;
75
76#[async_trait]
90pub trait StorageCollections: Debug {
91 type Timestamp: TimelyTimestamp;
92
93 async fn initialize_state(
100 &self,
101 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
102 init_ids: BTreeSet<GlobalId>,
103 ) -> Result<(), StorageError<Self::Timestamp>>;
104
105 fn update_parameters(&self, config_params: StorageParameters);
107
108 fn collection_metadata(
110 &self,
111 id: GlobalId,
112 ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>>;
113
114 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
120
121 fn collection_frontiers(
123 &self,
124 id: GlobalId,
125 ) -> Result<CollectionFrontiers<Self::Timestamp>, StorageError<Self::Timestamp>> {
126 let frontiers = self
127 .collections_frontiers(vec![id])?
128 .expect_element(|| "known to exist");
129
130 Ok(frontiers)
131 }
132
133 fn collections_frontiers(
136 &self,
137 id: Vec<GlobalId>,
138 ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>>;
139
140 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>>;
145
146 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
149
150 async fn snapshot_stats(
153 &self,
154 id: GlobalId,
155 as_of: Antichain<Self::Timestamp>,
156 ) -> Result<SnapshotStats, StorageError<Self::Timestamp>>;
157
158 async fn snapshot_parts_stats(
167 &self,
168 id: GlobalId,
169 as_of: Antichain<Self::Timestamp>,
170 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>;
171
172 fn snapshot(
174 &self,
175 id: GlobalId,
176 as_of: Self::Timestamp,
177 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>;
178
179 async fn snapshot_latest(
182 &self,
183 id: GlobalId,
184 ) -> Result<Vec<Row>, StorageError<Self::Timestamp>>;
185
186 fn snapshot_cursor(
188 &self,
189 id: GlobalId,
190 as_of: Self::Timestamp,
191 ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
192 where
193 Self::Timestamp: Codec64 + TimelyTimestamp + Lattice;
194
195 fn snapshot_and_stream(
200 &self,
201 id: GlobalId,
202 as_of: Self::Timestamp,
203 ) -> BoxFuture<
204 'static,
205 Result<
206 BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
207 StorageError<Self::Timestamp>,
208 >,
209 >;
210
211 fn create_update_builder(
214 &self,
215 id: GlobalId,
216 ) -> BoxFuture<
217 'static,
218 Result<
219 TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
220 StorageError<Self::Timestamp>,
221 >,
222 >
223 where
224 Self::Timestamp: Lattice + Codec64;
225
226 async fn prepare_state(
232 &self,
233 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
234 ids_to_add: BTreeSet<GlobalId>,
235 ids_to_drop: BTreeSet<GlobalId>,
236 ids_to_register: BTreeMap<GlobalId, ShardId>,
237 ) -> Result<(), StorageError<Self::Timestamp>>;
238
239 async fn create_collections_for_bootstrap(
265 &self,
266 storage_metadata: &StorageMetadata,
267 register_ts: Option<Self::Timestamp>,
268 collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
269 migrated_storage_collections: &BTreeSet<GlobalId>,
270 ) -> Result<(), StorageError<Self::Timestamp>>;
271
272 async fn alter_ingestion_source_desc(
280 &self,
281 ingestion_id: GlobalId,
282 source_desc: SourceDesc,
283 ) -> Result<(), StorageError<Self::Timestamp>>;
284
285 async fn alter_ingestion_export_data_configs(
287 &self,
288 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
289 ) -> Result<(), StorageError<Self::Timestamp>>;
290
291 async fn alter_ingestion_connections(
296 &self,
297 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
298 ) -> Result<(), StorageError<Self::Timestamp>>;
299
300 async fn alter_table_desc(
302 &self,
303 existing_collection: GlobalId,
304 new_collection: GlobalId,
305 new_desc: RelationDesc,
306 expected_version: RelationVersion,
307 ) -> Result<(), StorageError<Self::Timestamp>>;
308
309 fn drop_collections_unvalidated(
321 &self,
322 storage_metadata: &StorageMetadata,
323 identifiers: Vec<GlobalId>,
324 );
325
326 fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>);
340
341 fn acquire_read_holds(
344 &self,
345 desired_holds: Vec<GlobalId>,
346 ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError>;
347
348 fn determine_time_dependence(
351 &self,
352 id: GlobalId,
353 ) -> Result<Option<TimeDependence>, TimeDependenceError>;
354}
355
356pub struct SnapshotCursor<T: Codec64 + TimelyTimestamp + Lattice> {
359 pub _read_handle: ReadHandle<SourceData, (), T, StorageDiff>,
362 pub cursor: Cursor<SourceData, (), T, StorageDiff>,
363}
364
365impl<T: Codec64 + TimelyTimestamp + Lattice + Sync> SnapshotCursor<T> {
366 pub async fn next(
367 &mut self,
368 ) -> Option<
369 impl Iterator<
370 Item = (
371 (Result<SourceData, String>, Result<(), String>),
372 T,
373 StorageDiff,
374 ),
375 > + Sized
376 + '_,
377 > {
378 self.cursor.next().await
379 }
380}
381
382#[derive(Debug)]
384pub struct CollectionFrontiers<T> {
385 pub id: GlobalId,
387
388 pub write_frontier: Antichain<T>,
390
391 pub implied_capability: Antichain<T>,
398
399 pub read_capabilities: Antichain<T>,
402}
403
404#[derive(Debug, Clone)]
407pub struct StorageCollectionsImpl<
408 T: TimelyTimestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
409> {
410 envd_epoch: NonZeroI64,
413
414 read_only: bool,
420
421 finalizable_shards: Arc<ShardIdSet>,
424
425 finalized_shards: Arc<ShardIdSet>,
430
431 collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
433
434 txns_read: TxnsRead<T>,
436
437 config: Arc<Mutex<StorageConfiguration>>,
439
440 initial_txn_upper: Antichain<T>,
449
450 persist_location: PersistLocation,
452
453 persist: Arc<PersistClientCache>,
455
456 cmd_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
458
459 holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<T>)>,
461
462 _background_task: Arc<AbortOnDropHandle<()>>,
464 _finalize_shards_task: Arc<AbortOnDropHandle<()>>,
465}
466
467impl<T> StorageCollectionsImpl<T>
477where
478 T: TimelyTimestamp
479 + Lattice
480 + Codec64
481 + From<EpochMillis>
482 + TimestampManipulation
483 + Into<mz_repr::Timestamp>
484 + Sync,
485{
486 pub async fn new(
494 persist_location: PersistLocation,
495 persist_clients: Arc<PersistClientCache>,
496 metrics_registry: &MetricsRegistry,
497 _now: NowFn,
498 txns_metrics: Arc<TxnMetrics>,
499 envd_epoch: NonZeroI64,
500 read_only: bool,
501 connection_context: ConnectionContext,
502 txn: &dyn StorageTxn<T>,
503 ) -> Self {
504 let metrics = StorageCollectionsMetrics::register_into(metrics_registry);
505
506 let txns_id = txn
510 .get_txn_wal_shard()
511 .expect("must call prepare initialization before creating StorageCollections");
512
513 let txns_client = persist_clients
514 .open(persist_location.clone())
515 .await
516 .expect("location should be valid");
517
518 let _txns_handle: TxnsHandle<SourceData, (), T, StorageDiff, PersistEpoch, TxnsCodecRow> =
521 TxnsHandle::open(
522 T::minimum(),
523 txns_client.clone(),
524 txns_client.dyncfgs().clone(),
525 Arc::clone(&txns_metrics),
526 txns_id,
527 )
528 .await;
529
530 let (txns_key_schema, txns_val_schema) = TxnsCodecRow::schemas();
532 let mut txns_write = txns_client
533 .open_writer(
534 txns_id,
535 Arc::new(txns_key_schema),
536 Arc::new(txns_val_schema),
537 Diagnostics {
538 shard_name: "txns".to_owned(),
539 handle_purpose: "commit txns".to_owned(),
540 },
541 )
542 .await
543 .expect("txns schema shouldn't change");
544
545 let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
546
547 let collections = Arc::new(std::sync::Mutex::new(BTreeMap::default()));
548 let finalizable_shards =
549 Arc::new(ShardIdSet::new(metrics.finalization_outstanding.clone()));
550 let finalized_shards =
551 Arc::new(ShardIdSet::new(metrics.finalization_pending_commit.clone()));
552 let config = Arc::new(Mutex::new(StorageConfiguration::new(
553 connection_context,
554 mz_dyncfgs::all_dyncfgs(),
555 )));
556
557 let initial_txn_upper = txns_write.fetch_recent_upper().await.to_owned();
558
559 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
560 let (holds_tx, holds_rx) = mpsc::unbounded_channel();
561 let mut background_task = BackgroundTask {
562 config: Arc::clone(&config),
563 cmds_tx: cmd_tx.clone(),
564 cmds_rx: cmd_rx,
565 holds_rx,
566 collections: Arc::clone(&collections),
567 finalizable_shards: Arc::clone(&finalizable_shards),
568 shard_by_id: BTreeMap::new(),
569 since_handles: BTreeMap::new(),
570 txns_handle: Some(txns_write),
571 txns_shards: Default::default(),
572 };
573
574 let background_task =
575 mz_ore::task::spawn(|| "storage_collections::background_task", async move {
576 background_task.run().await
577 });
578
579 let finalize_shards_task = mz_ore::task::spawn(
580 || "storage_collections::finalize_shards_task",
581 finalize_shards_task::<T>(FinalizeShardsTaskConfig {
582 envd_epoch: envd_epoch.clone(),
583 config: Arc::clone(&config),
584 metrics,
585 finalizable_shards: Arc::clone(&finalizable_shards),
586 finalized_shards: Arc::clone(&finalized_shards),
587 persist_location: persist_location.clone(),
588 persist: Arc::clone(&persist_clients),
589 read_only,
590 }),
591 );
592
593 Self {
594 finalizable_shards,
595 finalized_shards,
596 collections,
597 txns_read,
598 envd_epoch,
599 read_only,
600 config,
601 initial_txn_upper,
602 persist_location,
603 persist: persist_clients,
604 cmd_tx,
605 holds_tx,
606 _background_task: Arc::new(background_task.abort_on_drop()),
607 _finalize_shards_task: Arc::new(finalize_shards_task.abort_on_drop()),
608 }
609 }
610
611 async fn open_data_handles(
619 &self,
620 id: &GlobalId,
621 shard: ShardId,
622 since: Option<&Antichain<T>>,
623 relation_desc: RelationDesc,
624 persist_client: &PersistClient,
625 ) -> (
626 WriteHandle<SourceData, (), T, StorageDiff>,
627 SinceHandleWrapper<T>,
628 ) {
629 let since_handle = if self.read_only {
630 let read_handle = self
631 .open_leased_handle(id, shard, relation_desc.clone(), since, persist_client)
632 .await;
633 SinceHandleWrapper::Leased(read_handle)
634 } else {
635 let since_handle = self
636 .open_critical_handle(id, shard, since, persist_client)
637 .await;
638
639 SinceHandleWrapper::Critical(since_handle)
640 };
641
642 let mut write_handle = self
643 .open_write_handle(id, shard, relation_desc, persist_client)
644 .await;
645
646 write_handle.fetch_recent_upper().await;
657
658 (write_handle, since_handle)
659 }
660
661 async fn open_write_handle(
663 &self,
664 id: &GlobalId,
665 shard: ShardId,
666 relation_desc: RelationDesc,
667 persist_client: &PersistClient,
668 ) -> WriteHandle<SourceData, (), T, StorageDiff> {
669 let diagnostics = Diagnostics {
670 shard_name: id.to_string(),
671 handle_purpose: format!("controller data for {}", id),
672 };
673
674 let write = persist_client
675 .open_writer(
676 shard,
677 Arc::new(relation_desc),
678 Arc::new(UnitSchema),
679 diagnostics.clone(),
680 )
681 .await
682 .expect("invalid persist usage");
683
684 write
685 }
686
687 async fn open_critical_handle(
695 &self,
696 id: &GlobalId,
697 shard: ShardId,
698 since: Option<&Antichain<T>>,
699 persist_client: &PersistClient,
700 ) -> SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch> {
701 tracing::debug!(%id, ?since, "opening critical handle");
702
703 assert!(
704 !self.read_only,
705 "attempting to open critical SinceHandle in read-only mode"
706 );
707
708 let diagnostics = Diagnostics {
709 shard_name: id.to_string(),
710 handle_purpose: format!("controller data for {}", id),
711 };
712
713 let since_handle = {
716 let mut handle: SinceHandle<_, _, _, _, PersistEpoch> = persist_client
719 .open_critical_since(
720 shard,
721 PersistClient::CONTROLLER_CRITICAL_SINCE,
722 diagnostics.clone(),
723 )
724 .await
725 .expect("invalid persist usage");
726
727 let provided_since = match since {
731 Some(since) => since,
732 None => &Antichain::from_elem(T::minimum()),
733 };
734 let since = handle.since().join(provided_since);
735
736 let our_epoch = self.envd_epoch;
737
738 loop {
739 let current_epoch: PersistEpoch = handle.opaque().clone();
740
741 let unchecked_success = current_epoch.0.map(|e| e <= our_epoch).unwrap_or(true);
743
744 if unchecked_success {
745 let checked_success = handle
748 .compare_and_downgrade_since(
749 ¤t_epoch,
750 (&PersistEpoch::from(our_epoch), &since),
751 )
752 .await
753 .is_ok();
754 if checked_success {
755 break handle;
756 }
757 } else {
758 mz_ore::halt!("fenced by envd @ {current_epoch:?}. ours = {our_epoch}");
759 }
760 }
761 };
762
763 since_handle
764 }
765
766 async fn open_leased_handle(
772 &self,
773 id: &GlobalId,
774 shard: ShardId,
775 relation_desc: RelationDesc,
776 since: Option<&Antichain<T>>,
777 persist_client: &PersistClient,
778 ) -> ReadHandle<SourceData, (), T, StorageDiff> {
779 tracing::debug!(%id, ?since, "opening leased handle");
780
781 let diagnostics = Diagnostics {
782 shard_name: id.to_string(),
783 handle_purpose: format!("controller data for {}", id),
784 };
785
786 let use_critical_since = false;
787 let mut handle: ReadHandle<_, _, _, _> = persist_client
788 .open_leased_reader(
789 shard,
790 Arc::new(relation_desc),
791 Arc::new(UnitSchema),
792 diagnostics.clone(),
793 use_critical_since,
794 )
795 .await
796 .expect("invalid persist usage");
797
798 let provided_since = match since {
802 Some(since) => since,
803 None => &Antichain::from_elem(T::minimum()),
804 };
805 let since = handle.since().join(provided_since);
806
807 handle.downgrade_since(&since).await;
808
809 handle
810 }
811
812 fn register_handles(
813 &self,
814 id: GlobalId,
815 is_in_txns: bool,
816 since_handle: SinceHandleWrapper<T>,
817 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
818 ) {
819 self.send(BackgroundCmd::Register {
820 id,
821 is_in_txns,
822 since_handle,
823 write_handle,
824 });
825 }
826
827 fn send(&self, cmd: BackgroundCmd<T>) {
828 let _ = self.cmd_tx.send(cmd);
829 }
830
831 async fn snapshot_stats_inner(
832 &self,
833 id: GlobalId,
834 as_of: SnapshotStatsAsOf<T>,
835 ) -> Result<SnapshotStats, StorageError<T>> {
836 let (tx, rx) = oneshot::channel();
843 self.send(BackgroundCmd::SnapshotStats(id, as_of, tx));
844 rx.await.expect("BackgroundTask should be live").0.await
845 }
846
847 fn install_collection_dependency_read_holds_inner(
853 &self,
854 self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
855 id: GlobalId,
856 ) -> Result<(), StorageError<T>> {
857 let (deps, collection_implied_capability) = match self_collections.get(&id) {
858 Some(CollectionState {
859 storage_dependencies: deps,
860 implied_capability,
861 ..
862 }) => (deps.clone(), implied_capability),
863 _ => return Ok(()),
864 };
865
866 for dep in deps.iter() {
867 let dep_collection = self_collections
868 .get(dep)
869 .ok_or(StorageError::IdentifierMissing(id))?;
870
871 mz_ore::soft_assert_or_log!(
872 PartialOrder::less_equal(
873 &dep_collection.implied_capability,
874 collection_implied_capability
875 ),
876 "dependency since ({dep}@{:?}) cannot be in advance of dependent's since ({id}@{:?})",
877 dep_collection.implied_capability,
878 collection_implied_capability,
879 );
880 }
881
882 self.install_read_capabilities_inner(
883 self_collections,
884 id,
885 &deps,
886 collection_implied_capability.clone(),
887 )?;
888
889 Ok(())
890 }
891
892 fn determine_collection_dependencies(
896 &self,
897 self_collections: &BTreeMap<GlobalId, CollectionState<T>>,
898 source_id: GlobalId,
899 data_source: &DataSource<T>,
900 ) -> Result<Vec<GlobalId>, StorageError<T>> {
901 let dependencies = match &data_source {
902 DataSource::Introspection(_)
903 | DataSource::Webhook
904 | DataSource::Table { primary: None }
905 | DataSource::Progress
906 | DataSource::Other => Vec::new(),
907 DataSource::Table {
908 primary: Some(primary),
909 } => vec![*primary],
910 DataSource::IngestionExport {
911 ingestion_id,
912 data_config,
913 ..
914 } => {
915 let source = self_collections
918 .get(ingestion_id)
919 .ok_or(StorageError::IdentifierMissing(*ingestion_id))?;
920 let DataSource::Ingestion(ingestion) = &source.description.data_source else {
921 panic!("SourceExport must refer to a primary source that already exists");
922 };
923
924 match data_config.envelope {
925 SourceEnvelope::CdcV2 => Vec::new(),
926 _ => vec![ingestion.remap_collection_id],
927 }
928 }
929 DataSource::Ingestion(ingestion) => {
931 if ingestion.remap_collection_id == source_id {
932 vec![]
933 } else {
934 vec![ingestion.remap_collection_id]
935 }
936 }
937 DataSource::Sink { desc } => vec![desc.sink.from],
938 };
939
940 Ok(dependencies)
941 }
942
943 #[instrument(level = "debug")]
945 fn install_read_capabilities_inner(
946 &self,
947 self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
948 from_id: GlobalId,
949 storage_dependencies: &[GlobalId],
950 read_capability: Antichain<T>,
951 ) -> Result<(), StorageError<T>> {
952 let mut changes = ChangeBatch::new();
953 for time in read_capability.iter() {
954 changes.update(time.clone(), 1);
955 }
956
957 if tracing::span_enabled!(tracing::Level::TRACE) {
958 let user_capabilities = self_collections
960 .iter_mut()
961 .filter(|(id, _c)| id.is_user())
962 .map(|(id, c)| {
963 let updates = c.read_capabilities.updates().cloned().collect_vec();
964 (*id, c.implied_capability.clone(), updates)
965 })
966 .collect_vec();
967
968 trace!(
969 %from_id,
970 ?storage_dependencies,
971 ?read_capability,
972 ?user_capabilities,
973 "install_read_capabilities_inner");
974 }
975
976 let mut storage_read_updates = storage_dependencies
977 .iter()
978 .map(|id| (*id, changes.clone()))
979 .collect();
980
981 StorageCollectionsImpl::update_read_capabilities_inner(
982 &self.cmd_tx,
983 self_collections,
984 &mut storage_read_updates,
985 );
986
987 if tracing::span_enabled!(tracing::Level::TRACE) {
988 let user_capabilities = self_collections
990 .iter_mut()
991 .filter(|(id, _c)| id.is_user())
992 .map(|(id, c)| {
993 let updates = c.read_capabilities.updates().cloned().collect_vec();
994 (*id, c.implied_capability.clone(), updates)
995 })
996 .collect_vec();
997
998 trace!(
999 %from_id,
1000 ?storage_dependencies,
1001 ?read_capability,
1002 ?user_capabilities,
1003 "after install_read_capabilities_inner!");
1004 }
1005
1006 Ok(())
1007 }
1008
1009 async fn recent_upper(&self, id: GlobalId) -> Result<Antichain<T>, StorageError<T>> {
1010 let metadata = &self.collection_metadata(id)?;
1011 let persist_client = self
1012 .persist
1013 .open(metadata.persist_location.clone())
1014 .await
1015 .unwrap();
1016 let diagnostics = Diagnostics {
1019 shard_name: id.to_string(),
1020 handle_purpose: format!("controller data for {}", id),
1021 };
1022 let write = persist_client
1025 .open_writer::<SourceData, (), T, StorageDiff>(
1026 metadata.data_shard,
1027 Arc::new(metadata.relation_desc.clone()),
1028 Arc::new(UnitSchema),
1029 diagnostics.clone(),
1030 )
1031 .await
1032 .expect("invalid persist usage");
1033 Ok(write.shared_upper())
1034 }
1035
1036 async fn read_handle_for_snapshot(
1037 persist: Arc<PersistClientCache>,
1038 metadata: &CollectionMetadata,
1039 id: GlobalId,
1040 ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
1041 let persist_client = persist
1042 .open(metadata.persist_location.clone())
1043 .await
1044 .unwrap();
1045
1046 let read_handle = persist_client
1052 .open_leased_reader::<SourceData, (), _, _>(
1053 metadata.data_shard,
1054 Arc::new(metadata.relation_desc.clone()),
1055 Arc::new(UnitSchema),
1056 Diagnostics {
1057 shard_name: id.to_string(),
1058 handle_purpose: format!("snapshot {}", id),
1059 },
1060 USE_CRITICAL_SINCE_SNAPSHOT.get(&persist.cfg),
1061 )
1062 .await
1063 .expect("invalid persist usage");
1064 Ok(read_handle)
1065 }
1066
1067 fn snapshot(
1073 &self,
1074 id: GlobalId,
1075 as_of: T,
1076 txns_read: &TxnsRead<T>,
1077 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<T>>>
1078 where
1079 T: Codec64 + From<EpochMillis> + TimestampManipulation,
1080 {
1081 let metadata = match self.collection_metadata(id) {
1082 Ok(metadata) => metadata.clone(),
1083 Err(e) => return async { Err(e) }.boxed(),
1084 };
1085 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1086 assert_eq!(txns_id, txns_read.txns_id());
1087 txns_read.clone()
1088 });
1089 let persist = Arc::clone(&self.persist);
1090 async move {
1091 let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1092 let contents = match txns_read {
1093 None => {
1094 read_handle
1096 .snapshot_and_fetch(Antichain::from_elem(as_of))
1097 .await
1098 }
1099 Some(txns_read) => {
1100 txns_read.update_gt(as_of.clone()).await;
1114 let data_snapshot = txns_read
1115 .data_snapshot(metadata.data_shard, as_of.clone())
1116 .await;
1117 data_snapshot.snapshot_and_fetch(&mut read_handle).await
1118 }
1119 };
1120 match contents {
1121 Ok(contents) => {
1122 let mut snapshot = Vec::with_capacity(contents.len());
1123 for ((data, _), _, diff) in contents {
1124 let row = data.expect("invalid protobuf data").0?;
1127 snapshot.push((row, diff));
1128 }
1129 Ok(snapshot)
1130 }
1131 Err(_) => Err(StorageError::ReadBeforeSince(id)),
1132 }
1133 }
1134 .boxed()
1135 }
1136
1137 fn snapshot_and_stream(
1138 &self,
1139 id: GlobalId,
1140 as_of: T,
1141 txns_read: &TxnsRead<T>,
1142 ) -> BoxFuture<'static, Result<BoxStream<'static, (SourceData, T, StorageDiff)>, StorageError<T>>>
1143 {
1144 use futures::stream::StreamExt;
1145
1146 let metadata = match self.collection_metadata(id) {
1147 Ok(metadata) => metadata.clone(),
1148 Err(e) => return async { Err(e) }.boxed(),
1149 };
1150 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1151 assert_eq!(txns_id, txns_read.txns_id());
1152 txns_read.clone()
1153 });
1154 let persist = Arc::clone(&self.persist);
1155
1156 async move {
1157 let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1158 let stream = match txns_read {
1159 None => {
1160 read_handle
1162 .snapshot_and_stream(Antichain::from_elem(as_of))
1163 .await
1164 .map_err(|_| StorageError::ReadBeforeSince(id))?
1165 .boxed()
1166 }
1167 Some(txns_read) => {
1168 txns_read.update_gt(as_of.clone()).await;
1169 let data_snapshot = txns_read
1170 .data_snapshot(metadata.data_shard, as_of.clone())
1171 .await;
1172 data_snapshot
1173 .snapshot_and_stream(&mut read_handle)
1174 .await
1175 .map_err(|_| StorageError::ReadBeforeSince(id))?
1176 .boxed()
1177 }
1178 };
1179
1180 let stream = stream
1182 .map(|((k, _v), t, d)| {
1183 let data = k.expect("error while streaming from Persist");
1186 (data, t, d)
1187 })
1188 .boxed();
1189 Ok(stream)
1190 }
1191 .boxed()
1192 }
1193
1194 fn set_read_policies_inner(
1195 &self,
1196 collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1197 policies: Vec<(GlobalId, ReadPolicy<T>)>,
1198 ) {
1199 trace!("set_read_policies: {:?}", policies);
1200
1201 let mut read_capability_changes = BTreeMap::default();
1202
1203 for (id, policy) in policies.into_iter() {
1204 let collection = match collections.get_mut(&id) {
1205 Some(c) => c,
1206 None => {
1207 panic!("Reference to absent collection {id}");
1208 }
1209 };
1210
1211 let mut new_read_capability = policy.frontier(collection.write_frontier.borrow());
1212
1213 if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
1214 let mut update = ChangeBatch::new();
1215 update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
1216 std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
1217 update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
1218 if !update.is_empty() {
1219 read_capability_changes.insert(id, update);
1220 }
1221 }
1222
1223 collection.read_policy = policy;
1224 }
1225
1226 for (id, changes) in read_capability_changes.iter() {
1227 if id.is_user() {
1228 trace!(%id, ?changes, "in set_read_policies, capability changes");
1229 }
1230 }
1231
1232 if !read_capability_changes.is_empty() {
1233 StorageCollectionsImpl::update_read_capabilities_inner(
1234 &self.cmd_tx,
1235 collections,
1236 &mut read_capability_changes,
1237 );
1238 }
1239 }
1240
1241 fn update_read_capabilities_inner(
1245 cmd_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
1246 collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1247 updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>,
1248 ) {
1249 let mut collections_net = BTreeMap::new();
1251
1252 while let Some(id) = updates.keys().rev().next().cloned() {
1257 let mut update = updates.remove(&id).unwrap();
1258
1259 if id.is_user() {
1260 trace!(id = ?id, update = ?update, "update_read_capabilities");
1261 }
1262
1263 let collection = if let Some(c) = collections.get_mut(&id) {
1264 c
1265 } else {
1266 let has_positive_updates = update.iter().any(|(_ts, diff)| *diff > 0);
1267 if has_positive_updates {
1268 panic!(
1269 "reference to absent collection {id} but we have positive updates: {:?}",
1270 update
1271 );
1272 } else {
1273 continue;
1276 }
1277 };
1278
1279 let current_read_capabilities = collection.read_capabilities.frontier().to_owned();
1280 for (time, diff) in update.iter() {
1281 assert!(
1282 collection.read_capabilities.count_for(time) + diff >= 0,
1283 "update {:?} for collection {id} would lead to negative \
1284 read capabilities, read capabilities before applying: {:?}",
1285 update,
1286 collection.read_capabilities
1287 );
1288
1289 if collection.read_capabilities.count_for(time) + diff > 0 {
1290 assert!(
1291 current_read_capabilities.less_equal(time),
1292 "update {:?} for collection {id} is trying to \
1293 install read capabilities before the current \
1294 frontier of read capabilities, read capabilities before applying: {:?}",
1295 update,
1296 collection.read_capabilities
1297 );
1298 }
1299 }
1300
1301 let changes = collection.read_capabilities.update_iter(update.drain());
1302 update.extend(changes);
1303
1304 if id.is_user() {
1305 trace!(
1306 %id,
1307 ?collection.storage_dependencies,
1308 ?update,
1309 "forwarding update to storage dependencies");
1310 }
1311
1312 for id in collection.storage_dependencies.iter() {
1313 updates
1314 .entry(*id)
1315 .or_insert_with(ChangeBatch::new)
1316 .extend(update.iter().cloned());
1317 }
1318
1319 let (changes, frontier) = collections_net
1320 .entry(id)
1321 .or_insert_with(|| (<ChangeBatch<_>>::new(), Antichain::new()));
1322
1323 changes.extend(update.drain());
1324 *frontier = collection.read_capabilities.frontier().to_owned();
1325 }
1326
1327 let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
1330 for (key, (mut changes, frontier)) in collections_net {
1331 if !changes.is_empty() {
1332 let collection = collections.get(&key).expect("must still exist");
1334 let should_emit_persist_compaction = !matches!(
1335 collection.description.data_source,
1336 DataSource::Table { primary: Some(_) }
1337 );
1338
1339 if frontier.is_empty() {
1340 info!(id = %key, "removing collection state because the since advanced to []!");
1341 collections.remove(&key).expect("must still exist");
1342 }
1343
1344 if should_emit_persist_compaction {
1345 persist_compaction_commands.push((key, frontier));
1346 }
1347 }
1348 }
1349
1350 if !persist_compaction_commands.is_empty() {
1351 cmd_tx
1352 .send(BackgroundCmd::DowngradeSince(persist_compaction_commands))
1353 .expect("cannot fail to send");
1354 }
1355 }
1356
1357 fn synchronize_finalized_shards(&self, storage_metadata: &StorageMetadata) {
1359 self.finalized_shards
1360 .lock()
1361 .retain(|shard| storage_metadata.unfinalized_shards.contains(shard));
1362 }
1363}
1364
1365#[async_trait]
1367impl<T> StorageCollections for StorageCollectionsImpl<T>
1368where
1369 T: TimelyTimestamp
1370 + Lattice
1371 + Codec64
1372 + From<EpochMillis>
1373 + TimestampManipulation
1374 + Into<mz_repr::Timestamp>
1375 + Sync,
1376{
1377 type Timestamp = T;
1378
1379 async fn initialize_state(
1380 &self,
1381 txn: &mut (dyn StorageTxn<T> + Send),
1382 init_ids: BTreeSet<GlobalId>,
1383 ) -> Result<(), StorageError<T>> {
1384 let metadata = txn.get_collection_metadata();
1385 let existing_metadata: BTreeSet<_> = metadata.into_iter().map(|(id, _)| id).collect();
1386
1387 let new_collections: BTreeSet<GlobalId> =
1389 init_ids.difference(&existing_metadata).cloned().collect();
1390
1391 self.prepare_state(
1392 txn,
1393 new_collections,
1394 BTreeSet::default(),
1395 BTreeMap::default(),
1396 )
1397 .await?;
1398
1399 let unfinalized_shards = txn.get_unfinalized_shards().into_iter().collect_vec();
1407
1408 info!(?unfinalized_shards, "initializing finalizable_shards");
1409
1410 self.finalizable_shards.lock().extend(unfinalized_shards);
1411
1412 Ok(())
1413 }
1414
1415 fn update_parameters(&self, config_params: StorageParameters) {
1416 config_params.dyncfg_updates.apply(self.persist.cfg());
1419
1420 self.config
1421 .lock()
1422 .expect("lock poisoned")
1423 .update(config_params);
1424 }
1425
1426 fn collection_metadata(
1427 &self,
1428 id: GlobalId,
1429 ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>> {
1430 let collections = self.collections.lock().expect("lock poisoned");
1431
1432 collections
1433 .get(&id)
1434 .map(|c| c.collection_metadata.clone())
1435 .ok_or(StorageError::IdentifierMissing(id))
1436 }
1437
1438 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
1439 let collections = self.collections.lock().expect("lock poisoned");
1440
1441 collections
1442 .iter()
1443 .filter(|(_id, c)| !c.is_dropped())
1444 .map(|(id, c)| (*id, c.collection_metadata.clone()))
1445 .collect()
1446 }
1447
1448 fn collections_frontiers(
1449 &self,
1450 ids: Vec<GlobalId>,
1451 ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>> {
1452 if ids.is_empty() {
1453 return Ok(vec![]);
1454 }
1455
1456 let collections = self.collections.lock().expect("lock poisoned");
1457
1458 let res = ids
1459 .into_iter()
1460 .map(|id| {
1461 collections
1462 .get(&id)
1463 .map(|c| CollectionFrontiers {
1464 id: id.clone(),
1465 write_frontier: c.write_frontier.clone(),
1466 implied_capability: c.implied_capability.clone(),
1467 read_capabilities: c.read_capabilities.frontier().to_owned(),
1468 })
1469 .ok_or(StorageError::IdentifierMissing(id))
1470 })
1471 .collect::<Result<Vec<_>, _>>()?;
1472
1473 Ok(res)
1474 }
1475
1476 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
1477 let collections = self.collections.lock().expect("lock poisoned");
1478
1479 let res = collections
1480 .iter()
1481 .filter(|(_id, c)| !c.is_dropped())
1482 .map(|(id, c)| CollectionFrontiers {
1483 id: id.clone(),
1484 write_frontier: c.write_frontier.clone(),
1485 implied_capability: c.implied_capability.clone(),
1486 read_capabilities: c.read_capabilities.frontier().to_owned(),
1487 })
1488 .collect_vec();
1489
1490 res
1491 }
1492
1493 async fn snapshot_stats(
1494 &self,
1495 id: GlobalId,
1496 as_of: Antichain<Self::Timestamp>,
1497 ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
1498 let metadata = self.collection_metadata(id)?;
1499
1500 let as_of = match metadata.txns_shard.as_ref() {
1503 None => SnapshotStatsAsOf::Direct(as_of),
1504 Some(txns_id) => {
1505 assert_eq!(txns_id, self.txns_read.txns_id());
1506 let as_of = as_of
1507 .into_option()
1508 .expect("cannot read as_of the empty antichain");
1509 self.txns_read.update_gt(as_of.clone()).await;
1510 let data_snapshot = self
1511 .txns_read
1512 .data_snapshot(metadata.data_shard, as_of.clone())
1513 .await;
1514 SnapshotStatsAsOf::Txns(data_snapshot)
1515 }
1516 };
1517 self.snapshot_stats_inner(id, as_of).await
1518 }
1519
1520 async fn snapshot_parts_stats(
1521 &self,
1522 id: GlobalId,
1523 as_of: Antichain<Self::Timestamp>,
1524 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
1525 let metadata = {
1526 let self_collections = self.collections.lock().expect("lock poisoned");
1527
1528 let collection_metadata = self_collections
1529 .get(&id)
1530 .ok_or(StorageError::IdentifierMissing(id))
1531 .map(|c| c.collection_metadata.clone());
1532
1533 match collection_metadata {
1534 Ok(m) => m,
1535 Err(e) => return Box::pin(async move { Err(e) }),
1536 }
1537 };
1538
1539 let persist = Arc::clone(&self.persist);
1542 let read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await;
1543
1544 let data_snapshot = match (metadata, as_of.as_option()) {
1545 (
1546 CollectionMetadata {
1547 txns_shard: Some(txns_id),
1548 data_shard,
1549 ..
1550 },
1551 Some(as_of),
1552 ) => {
1553 assert_eq!(txns_id, *self.txns_read.txns_id());
1554 self.txns_read.update_gt(as_of.clone()).await;
1555 let data_snapshot = self
1556 .txns_read
1557 .data_snapshot(data_shard, as_of.clone())
1558 .await;
1559 Some(data_snapshot)
1560 }
1561 _ => None,
1562 };
1563
1564 Box::pin(async move {
1565 let read_handle = read_handle?;
1566 let result = match data_snapshot {
1567 Some(data_snapshot) => data_snapshot.snapshot_parts_stats(&read_handle).await,
1568 None => read_handle.snapshot_parts_stats(as_of).await,
1569 };
1570 read_handle.expire().await;
1571 result.map_err(|_| StorageError::ReadBeforeSince(id))
1572 })
1573 }
1574
1575 fn snapshot(
1581 &self,
1582 id: GlobalId,
1583 as_of: Self::Timestamp,
1584 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>> {
1585 self.snapshot(id, as_of, &self.txns_read)
1586 }
1587
1588 async fn snapshot_latest(
1589 &self,
1590 id: GlobalId,
1591 ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
1592 let upper = self.recent_upper(id).await?;
1593 let res = match upper.as_option() {
1594 Some(f) if f > &T::minimum() => {
1595 let as_of = f.step_back().unwrap();
1596
1597 let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap();
1598 snapshot
1599 .into_iter()
1600 .map(|(row, diff)| {
1601 assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
1602 row
1603 })
1604 .collect()
1605 }
1606 Some(_min) => {
1607 Vec::new()
1609 }
1610 _ => {
1613 return Err(StorageError::InvalidUsage(
1614 "collection closed, cannot determine a read timestamp based on the upper"
1615 .to_string(),
1616 ));
1617 }
1618 };
1619
1620 Ok(res)
1621 }
1622
1623 fn snapshot_cursor(
1624 &self,
1625 id: GlobalId,
1626 as_of: Self::Timestamp,
1627 ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
1628 where
1629 Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
1630 {
1631 let metadata = match self.collection_metadata(id) {
1632 Ok(metadata) => metadata.clone(),
1633 Err(e) => return async { Err(e) }.boxed(),
1634 };
1635 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1636 assert_eq!(txns_id, self.txns_read.txns_id());
1639 self.txns_read.clone()
1640 });
1641 let persist = Arc::clone(&self.persist);
1642
1643 async move {
1645 let mut handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1646 let cursor = match txns_read {
1647 None => {
1648 let cursor = handle
1649 .snapshot_cursor(Antichain::from_elem(as_of), |_| true)
1650 .await
1651 .map_err(|_| StorageError::ReadBeforeSince(id))?;
1652 SnapshotCursor {
1653 _read_handle: handle,
1654 cursor,
1655 }
1656 }
1657 Some(txns_read) => {
1658 txns_read.update_gt(as_of.clone()).await;
1659 let data_snapshot = txns_read
1660 .data_snapshot(metadata.data_shard, as_of.clone())
1661 .await;
1662 let cursor = data_snapshot
1663 .snapshot_cursor(&mut handle, |_| true)
1664 .await
1665 .map_err(|_| StorageError::ReadBeforeSince(id))?;
1666 SnapshotCursor {
1667 _read_handle: handle,
1668 cursor,
1669 }
1670 }
1671 };
1672
1673 Ok(cursor)
1674 }
1675 .boxed()
1676 }
1677
1678 fn snapshot_and_stream(
1679 &self,
1680 id: GlobalId,
1681 as_of: Self::Timestamp,
1682 ) -> BoxFuture<
1683 'static,
1684 Result<
1685 BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
1686 StorageError<Self::Timestamp>,
1687 >,
1688 >
1689 where
1690 Self::Timestamp: TimelyTimestamp + Lattice + Codec64 + 'static,
1691 {
1692 self.snapshot_and_stream(id, as_of, &self.txns_read)
1693 }
1694
1695 fn create_update_builder(
1696 &self,
1697 id: GlobalId,
1698 ) -> BoxFuture<
1699 'static,
1700 Result<
1701 TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
1702 StorageError<Self::Timestamp>,
1703 >,
1704 > {
1705 let metadata = match self.collection_metadata(id) {
1706 Ok(m) => m,
1707 Err(e) => return Box::pin(async move { Err(e) }),
1708 };
1709 let persist = Arc::clone(&self.persist);
1710
1711 async move {
1712 let persist_client = persist
1713 .open(metadata.persist_location.clone())
1714 .await
1715 .expect("invalid persist usage");
1716 let write_handle = persist_client
1717 .open_writer::<SourceData, (), Self::Timestamp, StorageDiff>(
1718 metadata.data_shard,
1719 Arc::new(metadata.relation_desc.clone()),
1720 Arc::new(UnitSchema),
1721 Diagnostics {
1722 shard_name: id.to_string(),
1723 handle_purpose: format!("create write batch {}", id),
1724 },
1725 )
1726 .await
1727 .expect("invalid persist usage");
1728 let builder = TimestamplessUpdateBuilder::new(&write_handle);
1729
1730 Ok(builder)
1731 }
1732 .boxed()
1733 }
1734
1735 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
1736 let collections = self.collections.lock().expect("lock poisoned");
1737
1738 if collections.contains_key(&id) {
1739 Ok(())
1740 } else {
1741 Err(StorageError::IdentifierMissing(id))
1742 }
1743 }
1744
1745 async fn prepare_state(
1746 &self,
1747 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
1748 ids_to_add: BTreeSet<GlobalId>,
1749 ids_to_drop: BTreeSet<GlobalId>,
1750 ids_to_register: BTreeMap<GlobalId, ShardId>,
1751 ) -> Result<(), StorageError<T>> {
1752 txn.insert_collection_metadata(
1753 ids_to_add
1754 .into_iter()
1755 .map(|id| (id, ShardId::new()))
1756 .collect(),
1757 )?;
1758 txn.insert_collection_metadata(ids_to_register)?;
1759
1760 let dropped_mappings = txn.delete_collection_metadata(ids_to_drop);
1762
1763 let dropped_shards = dropped_mappings
1764 .into_iter()
1765 .map(|(_id, shard)| shard)
1766 .collect();
1767
1768 txn.insert_unfinalized_shards(dropped_shards)?;
1769
1770 let finalized_shards = self.finalized_shards.lock().iter().copied().collect();
1773 txn.mark_shards_as_finalized(finalized_shards);
1774
1775 Ok(())
1776 }
1777
1778 #[instrument(level = "debug")]
1781 async fn create_collections_for_bootstrap(
1782 &self,
1783 storage_metadata: &StorageMetadata,
1784 register_ts: Option<Self::Timestamp>,
1785 mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
1786 migrated_storage_collections: &BTreeSet<GlobalId>,
1787 ) -> Result<(), StorageError<Self::Timestamp>> {
1788 let is_in_txns = |id, metadata: &CollectionMetadata| {
1789 metadata.txns_shard.is_some()
1790 && !(self.read_only && migrated_storage_collections.contains(&id))
1791 };
1792
1793 collections.sort_by_key(|(id, _)| *id);
1798 collections.dedup();
1799 for pos in 1..collections.len() {
1800 if collections[pos - 1].0 == collections[pos].0 {
1801 return Err(StorageError::CollectionIdReused(collections[pos].0));
1802 }
1803 }
1804
1805 {
1806 let self_collections = self.collections.lock().expect("lock poisoned");
1812 for (id, description) in collections.iter() {
1813 if let Some(existing_collection) = self_collections.get(id) {
1814 if &existing_collection.description != description {
1815 return Err(StorageError::CollectionIdReused(*id));
1816 }
1817 }
1818 }
1819 }
1820
1821 let enriched_with_metadata = collections
1824 .into_iter()
1825 .map(|(id, description)| {
1826 let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
1827
1828 let txns_shard = description
1832 .data_source
1833 .in_txns()
1834 .then(|| *self.txns_read.txns_id());
1835
1836 let metadata = CollectionMetadata {
1837 persist_location: self.persist_location.clone(),
1838 data_shard,
1839 relation_desc: description.desc.clone(),
1840 txns_shard,
1841 };
1842
1843 Ok((id, description, metadata))
1844 })
1845 .collect_vec();
1846
1847 let persist_client = self
1849 .persist
1850 .open(self.persist_location.clone())
1851 .await
1852 .unwrap();
1853 let persist_client = &persist_client;
1854 use futures::stream::{StreamExt, TryStreamExt};
1857 let this = &*self;
1858 let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
1859 .map(|data: Result<_, StorageError<Self::Timestamp>>| {
1860 let register_ts = register_ts.clone();
1861 async move {
1862 let (id, description, metadata) = data?;
1863
1864 debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
1869
1870 let (write, mut since_handle) = this
1871 .open_data_handles(
1872 &id,
1873 metadata.data_shard,
1874 description.since.as_ref(),
1875 metadata.relation_desc.clone(),
1876 persist_client,
1877 )
1878 .await;
1879
1880 match description.data_source {
1889 DataSource::Introspection(_)
1890 | DataSource::IngestionExport { .. }
1891 | DataSource::Webhook
1892 | DataSource::Ingestion(_)
1893 | DataSource::Progress
1894 | DataSource::Other => {}
1895 DataSource::Sink { .. } => {}
1896 DataSource::Table { .. } => {
1897 let register_ts = register_ts.expect(
1898 "caller should have provided a register_ts when creating a table",
1899 );
1900 if since_handle.since().elements() == &[T::minimum()]
1901 && !migrated_storage_collections.contains(&id)
1902 {
1903 debug!("advancing {} to initial since of {:?}", id, register_ts);
1904 let token = since_handle.opaque();
1905 let _ = since_handle
1906 .compare_and_downgrade_since(
1907 &token,
1908 (&token, &Antichain::from_elem(register_ts.clone())),
1909 )
1910 .await;
1911 }
1912 }
1913 }
1914
1915 Ok::<_, StorageError<Self::Timestamp>>((
1916 id,
1917 description,
1918 write,
1919 since_handle,
1920 metadata,
1921 ))
1922 }
1923 })
1924 .buffer_unordered(50)
1926 .try_collect()
1940 .await?;
1941
1942 #[derive(Ord, PartialOrd, Eq, PartialEq)]
1944 enum DependencyOrder {
1945 Table(Reverse<GlobalId>),
1947 Collection(GlobalId),
1949 Sink(GlobalId),
1951 }
1952 to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
1953 DataSource::Table { .. } => DependencyOrder::Table(Reverse(*id)),
1954 DataSource::Sink { .. } => DependencyOrder::Sink(*id),
1955 _ => DependencyOrder::Collection(*id),
1956 });
1957
1958 let mut self_collections = self.collections.lock().expect("lock poisoned");
1961
1962 for (id, description, write_handle, since_handle, metadata) in to_register {
1963 let write_frontier = write_handle.upper();
1964 let data_shard_since = since_handle.since().clone();
1965
1966 let storage_dependencies = self.determine_collection_dependencies(
1968 &*self_collections,
1969 id,
1970 &description.data_source,
1971 )?;
1972
1973 let initial_since = match storage_dependencies
1975 .iter()
1976 .at_most_one()
1977 .expect("should have at most one dependency")
1978 {
1979 Some(dep) => {
1980 let dependency_collection = self_collections
1981 .get(dep)
1982 .ok_or(StorageError::IdentifierMissing(*dep))?;
1983 let dependency_since = dependency_collection.implied_capability.clone();
1984
1985 if PartialOrder::less_than(&data_shard_since, &dependency_since) {
1996 mz_ore::soft_assert_or_log!(
2015 write_frontier.elements() == &[T::minimum()]
2016 || write_frontier.is_empty()
2017 || PartialOrder::less_than(&dependency_since, write_frontier),
2018 "dependency ({dep}) since has advanced past dependent ({id}) upper \n
2019 dependent ({id}): since {:?}, upper {:?} \n
2020 dependency ({dep}): since {:?}",
2021 data_shard_since,
2022 write_frontier,
2023 dependency_since
2024 );
2025
2026 dependency_since
2027 } else {
2028 data_shard_since
2029 }
2030 }
2031 None => data_shard_since,
2032 };
2033
2034 let mut collection_state = CollectionState::new(
2035 description,
2036 initial_since,
2037 write_frontier.clone(),
2038 storage_dependencies,
2039 metadata.clone(),
2040 );
2041
2042 match &collection_state.description.data_source {
2044 DataSource::Introspection(_) => {
2045 self_collections.insert(id, collection_state);
2046 }
2047 DataSource::Webhook => {
2048 self_collections.insert(id, collection_state);
2049 }
2050 DataSource::IngestionExport {
2051 ingestion_id,
2052 details,
2053 data_config,
2054 } => {
2055 let source_collection = self_collections
2057 .get_mut(ingestion_id)
2058 .expect("known to exist");
2059 match &mut source_collection.description {
2060 CollectionDescription {
2061 data_source: DataSource::Ingestion(ingestion_desc),
2062 ..
2063 } => ingestion_desc.source_exports.insert(
2064 id,
2065 SourceExport {
2066 storage_metadata: (),
2067 details: details.clone(),
2068 data_config: data_config.clone(),
2069 },
2070 ),
2071 _ => unreachable!(
2072 "SourceExport must only refer to primary sources that already exist"
2073 ),
2074 };
2075
2076 self_collections.insert(id, collection_state);
2077 }
2078 DataSource::Table { .. } => {
2079 if is_in_txns(id, &metadata)
2082 && PartialOrder::less_than(
2083 &collection_state.write_frontier,
2084 &self.initial_txn_upper,
2085 )
2086 {
2087 collection_state
2093 .write_frontier
2094 .clone_from(&self.initial_txn_upper);
2095 }
2096 self_collections.insert(id, collection_state);
2097 }
2098 DataSource::Progress | DataSource::Other => {
2099 self_collections.insert(id, collection_state);
2100 }
2101 DataSource::Ingestion(_) => {
2102 self_collections.insert(id, collection_state);
2103 }
2104 DataSource::Sink { .. } => {
2105 self_collections.insert(id, collection_state);
2106 }
2107 }
2108
2109 self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle);
2110
2111 self.install_collection_dependency_read_holds_inner(&mut *self_collections, id)?;
2113 }
2114
2115 drop(self_collections);
2116
2117 self.synchronize_finalized_shards(storage_metadata);
2118
2119 Ok(())
2120 }
2121
2122 async fn alter_ingestion_source_desc(
2123 &self,
2124 ingestion_id: GlobalId,
2125 source_desc: SourceDesc,
2126 ) -> Result<(), StorageError<Self::Timestamp>> {
2127 let mut self_collections = self.collections.lock().expect("lock poisoned");
2131 let collection = self_collections
2132 .get_mut(&ingestion_id)
2133 .ok_or(StorageError::IdentifierMissing(ingestion_id))?;
2134
2135 let curr_ingestion = match &mut collection.description.data_source {
2136 DataSource::Ingestion(active_ingestion) => active_ingestion,
2137 _ => unreachable!("verified collection refers to ingestion"),
2138 };
2139
2140 curr_ingestion.desc = source_desc;
2141 debug!("altered {ingestion_id}'s SourceDesc");
2142
2143 Ok(())
2144 }
2145
2146 async fn alter_ingestion_export_data_configs(
2147 &self,
2148 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
2149 ) -> Result<(), StorageError<Self::Timestamp>> {
2150 let mut self_collections = self.collections.lock().expect("lock poisoned");
2151
2152 for (source_export_id, new_data_config) in source_exports {
2153 let source_export_collection = self_collections
2156 .get_mut(&source_export_id)
2157 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2158 let ingestion_id = match &mut source_export_collection.description.data_source {
2159 DataSource::IngestionExport {
2160 ingestion_id,
2161 details: _,
2162 data_config,
2163 } => {
2164 *data_config = new_data_config.clone();
2165 *ingestion_id
2166 }
2167 o => {
2168 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2169 Err(StorageError::IdentifierInvalid(source_export_id))?
2170 }
2171 };
2172 let ingestion_collection = self_collections
2175 .get_mut(&ingestion_id)
2176 .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
2177
2178 match &mut ingestion_collection.description.data_source {
2179 DataSource::Ingestion(ingestion_desc) => {
2180 let source_export = ingestion_desc
2181 .source_exports
2182 .get_mut(&source_export_id)
2183 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2184
2185 if source_export.data_config != new_data_config {
2186 tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
2187 source_export.data_config = new_data_config;
2188 } else {
2189 tracing::warn!(
2190 "alter_ingestion_export_data_configs called on \
2191 export {source_export_id} of {ingestion_id} but \
2192 the data config was the same"
2193 );
2194 }
2195 }
2196 o => {
2197 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2198 Err(StorageError::IdentifierInvalid(ingestion_id))?;
2199 }
2200 }
2201 }
2202
2203 Ok(())
2204 }
2205
2206 async fn alter_ingestion_connections(
2207 &self,
2208 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
2209 ) -> Result<(), StorageError<Self::Timestamp>> {
2210 let mut self_collections = self.collections.lock().expect("lock poisoned");
2211
2212 for (id, conn) in source_connections {
2213 let collection = self_collections
2214 .get_mut(&id)
2215 .ok_or_else(|| StorageError::IdentifierMissing(id))?;
2216
2217 match &mut collection.description.data_source {
2218 DataSource::Ingestion(ingestion) => {
2219 if ingestion.desc.connection != conn {
2222 info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
2223 ingestion.desc.connection = conn;
2224 } else {
2225 warn!(
2226 "update_source_connection called on {id} but the \
2227 connection was the same"
2228 );
2229 }
2230 }
2231 o => {
2232 warn!("update_source_connection called on {:?}", o);
2233 Err(StorageError::IdentifierInvalid(id))?;
2234 }
2235 }
2236 }
2237
2238 Ok(())
2239 }
2240
2241 async fn alter_table_desc(
2242 &self,
2243 existing_collection: GlobalId,
2244 new_collection: GlobalId,
2245 new_desc: RelationDesc,
2246 expected_version: RelationVersion,
2247 ) -> Result<(), StorageError<Self::Timestamp>> {
2248 let data_shard = {
2249 let self_collections = self.collections.lock().expect("lock poisoned");
2250 let existing = self_collections
2251 .get(&existing_collection)
2252 .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;
2253
2254 if !matches!(&existing.description.data_source, DataSource::Table { .. }) {
2256 return Err(StorageError::IdentifierInvalid(existing_collection));
2257 }
2258
2259 existing.collection_metadata.data_shard
2260 };
2261
2262 let persist_client = self
2263 .persist
2264 .open(self.persist_location.clone())
2265 .await
2266 .unwrap();
2267
2268 let diagnostics = Diagnostics {
2270 shard_name: existing_collection.to_string(),
2271 handle_purpose: "alter_table_desc".to_string(),
2272 };
2273 let expected_schema = expected_version.into();
2275 let schema_result = persist_client
2276 .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
2277 data_shard,
2278 expected_schema,
2279 &new_desc,
2280 &UnitSchema,
2281 diagnostics,
2282 )
2283 .await
2284 .map_err(|e| StorageError::InvalidUsage(e.to_string()))?;
2285 tracing::info!(
2286 ?existing_collection,
2287 ?new_collection,
2288 ?new_desc,
2289 "evolved schema"
2290 );
2291
2292 match schema_result {
2293 CaESchema::Ok(id) => id,
2294 CaESchema::ExpectedMismatch {
2296 schema_id,
2297 key,
2298 val,
2299 } => {
2300 mz_ore::soft_panic_or_log!(
2301 "schema expectation mismatch {schema_id:?}, {key:?}, {val:?}"
2302 );
2303 return Err(StorageError::Generic(anyhow::anyhow!(
2304 "schema expected mismatch, {existing_collection:?}",
2305 )));
2306 }
2307 CaESchema::Incompatible => {
2308 mz_ore::soft_panic_or_log!(
2309 "incompatible schema! {existing_collection} {new_desc:?}"
2310 );
2311 return Err(StorageError::Generic(anyhow::anyhow!(
2312 "schema incompatible, {existing_collection:?}"
2313 )));
2314 }
2315 };
2316
2317 let (write_handle, since_handle) = self
2319 .open_data_handles(
2320 &new_collection,
2321 data_shard,
2322 None,
2323 new_desc.clone(),
2324 &persist_client,
2325 )
2326 .await;
2327
2328 {
2334 let mut self_collections = self.collections.lock().expect("lock poisoned");
2335
2336 let existing = self_collections
2338 .get_mut(&existing_collection)
2339 .expect("existing collection missing");
2340
2341 assert!(matches!(
2343 existing.description.data_source,
2344 DataSource::Table { primary: None }
2345 ));
2346
2347 existing.description.data_source = DataSource::Table {
2349 primary: Some(new_collection),
2350 };
2351 existing.storage_dependencies.push(new_collection);
2352
2353 let implied_capability = existing.read_capabilities.frontier().to_owned();
2357 let write_frontier = existing.write_frontier.clone();
2358
2359 let mut changes = ChangeBatch::new();
2366 changes.extend(implied_capability.iter().map(|t| (t.clone(), 1)));
2367
2368 let collection_desc = CollectionDescription::for_table(new_desc.clone(), None);
2370 let collection_meta = CollectionMetadata {
2371 persist_location: self.persist_location.clone(),
2372 relation_desc: collection_desc.desc.clone(),
2373 data_shard,
2374 txns_shard: Some(self.txns_read.txns_id().clone()),
2375 };
2376 let collection_state = CollectionState::new(
2377 collection_desc,
2378 implied_capability,
2379 write_frontier,
2380 Vec::new(),
2381 collection_meta,
2382 );
2383
2384 self_collections.insert(new_collection, collection_state);
2386
2387 let mut updates = BTreeMap::from([(new_collection, changes)]);
2388 StorageCollectionsImpl::update_read_capabilities_inner(
2389 &self.cmd_tx,
2390 &mut *self_collections,
2391 &mut updates,
2392 );
2393 };
2394
2395 self.register_handles(new_collection, true, since_handle, write_handle);
2397
2398 info!(%existing_collection, %new_collection, ?new_desc, "altered table");
2399
2400 Ok(())
2401 }
2402
2403 fn drop_collections_unvalidated(
2404 &self,
2405 storage_metadata: &StorageMetadata,
2406 identifiers: Vec<GlobalId>,
2407 ) {
2408 debug!(?identifiers, "drop_collections_unvalidated");
2409
2410 let mut self_collections = self.collections.lock().expect("lock poisoned");
2411
2412 for id in identifiers.iter() {
2413 let metadata = storage_metadata.get_collection_shard::<T>(*id);
2414 mz_ore::soft_assert_or_log!(
2415 matches!(metadata, Err(StorageError::IdentifierMissing(_))),
2416 "dropping {id}, but drop was not synchronized with storage \
2417 controller via `synchronize_collections`"
2418 );
2419
2420 let dropped_data_source = match self_collections.get(id) {
2421 Some(col) => col.description.data_source.clone(),
2422 None => continue,
2423 };
2424
2425 if let DataSource::IngestionExport { ingestion_id, .. } = dropped_data_source {
2428 let ingestion = match self_collections.get_mut(&ingestion_id) {
2430 Some(ingestion) => ingestion,
2431 None => {
2433 tracing::error!(
2434 "primary source {ingestion_id} seemingly dropped before subsource {id}",
2435 );
2436 continue;
2437 }
2438 };
2439
2440 match &mut ingestion.description {
2441 CollectionDescription {
2442 data_source: DataSource::Ingestion(ingestion_desc),
2443 ..
2444 } => {
2445 let removed = ingestion_desc.source_exports.remove(id);
2446 mz_ore::soft_assert_or_log!(
2447 removed.is_some(),
2448 "dropped subsource {id} already removed from source exports"
2449 );
2450 }
2451 _ => unreachable!(
2452 "SourceExport must only refer to primary sources that already exist"
2453 ),
2454 };
2455 }
2456 }
2457
2458 let mut finalized_policies = Vec::new();
2466
2467 for id in identifiers {
2468 if self_collections.contains_key(&id) {
2470 finalized_policies.push((id, ReadPolicy::ValidFrom(Antichain::new())));
2471 }
2472 }
2473 self.set_read_policies_inner(&mut self_collections, finalized_policies);
2474
2475 drop(self_collections);
2476
2477 self.synchronize_finalized_shards(storage_metadata);
2478 }
2479
2480 fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
2481 let mut collections = self.collections.lock().expect("lock poisoned");
2482
2483 if tracing::enabled!(tracing::Level::TRACE) {
2484 let user_capabilities = collections
2485 .iter_mut()
2486 .filter(|(id, _c)| id.is_user())
2487 .map(|(id, c)| {
2488 let updates = c.read_capabilities.updates().cloned().collect_vec();
2489 (*id, c.implied_capability.clone(), updates)
2490 })
2491 .collect_vec();
2492
2493 trace!(?policies, ?user_capabilities, "set_read_policies");
2494 }
2495
2496 self.set_read_policies_inner(&mut collections, policies);
2497
2498 if tracing::enabled!(tracing::Level::TRACE) {
2499 let user_capabilities = collections
2500 .iter_mut()
2501 .filter(|(id, _c)| id.is_user())
2502 .map(|(id, c)| {
2503 let updates = c.read_capabilities.updates().cloned().collect_vec();
2504 (*id, c.implied_capability.clone(), updates)
2505 })
2506 .collect_vec();
2507
2508 trace!(?user_capabilities, "after! set_read_policies");
2509 }
2510 }
2511
2512 fn acquire_read_holds(
2513 &self,
2514 desired_holds: Vec<GlobalId>,
2515 ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError> {
2516 if desired_holds.is_empty() {
2517 return Ok(vec![]);
2518 }
2519
2520 let mut collections = self.collections.lock().expect("lock poisoned");
2521
2522 let mut advanced_holds = Vec::new();
2523 for id in desired_holds.iter() {
2534 let collection = collections
2535 .get(id)
2536 .ok_or(ReadHoldError::CollectionMissing(*id))?;
2537 let since = collection.read_capabilities.frontier().to_owned();
2538 advanced_holds.push((*id, since));
2539 }
2540
2541 let mut updates = advanced_holds
2542 .iter()
2543 .map(|(id, hold)| {
2544 let mut changes = ChangeBatch::new();
2545 changes.extend(hold.iter().map(|time| (time.clone(), 1)));
2546 (*id, changes)
2547 })
2548 .collect::<BTreeMap<_, _>>();
2549
2550 StorageCollectionsImpl::update_read_capabilities_inner(
2551 &self.cmd_tx,
2552 &mut collections,
2553 &mut updates,
2554 );
2555
2556 let acquired_holds = advanced_holds
2557 .into_iter()
2558 .map(|(id, since)| ReadHold::with_channel(id, since, self.holds_tx.clone()))
2559 .collect_vec();
2560
2561 trace!(?desired_holds, ?acquired_holds, "acquire_read_holds");
2562
2563 Ok(acquired_holds)
2564 }
2565
2566 fn determine_time_dependence(
2568 &self,
2569 id: GlobalId,
2570 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
2571 use TimeDependenceError::CollectionMissing;
2572 let collections = self.collections.lock().expect("lock poisoned");
2573 let mut collection = Some(collections.get(&id).ok_or(CollectionMissing(id))?);
2574
2575 let mut result = None;
2576
2577 while let Some(c) = collection.take() {
2578 use DataSource::*;
2579 if let Some(timeline) = &c.description.timeline {
2580 if *timeline != Timeline::EpochMilliseconds {
2582 break;
2583 }
2584 }
2585 match &c.description.data_source {
2586 Ingestion(ingestion) => {
2587 use GenericSourceConnection::*;
2588 match ingestion.desc.connection {
2589 Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => {
2592 result = Some(TimeDependence::default())
2593 }
2594 LoadGenerator(_) => {}
2596 }
2597 }
2598 IngestionExport { ingestion_id, .. } => {
2599 let c = collections
2600 .get(ingestion_id)
2601 .ok_or(CollectionMissing(*ingestion_id))?;
2602 collection = Some(c);
2603 }
2604 Introspection(_) | Progress | Table { .. } | Webhook { .. } => {
2606 result = Some(TimeDependence::default())
2607 }
2608 Other => {}
2610 Sink { .. } => {}
2611 };
2612 }
2613 Ok(result)
2614 }
2615}
2616
2617#[derive(Debug)]
2624enum SinceHandleWrapper<T>
2625where
2626 T: TimelyTimestamp + Lattice + Codec64,
2627{
2628 Critical(SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch>),
2629 Leased(ReadHandle<SourceData, (), T, StorageDiff>),
2630}
2631
2632impl<T> SinceHandleWrapper<T>
2633where
2634 T: TimelyTimestamp + Lattice + Codec64 + TotalOrder + Sync,
2635{
2636 pub fn since(&self) -> &Antichain<T> {
2637 match self {
2638 Self::Critical(handle) => handle.since(),
2639 Self::Leased(handle) => handle.since(),
2640 }
2641 }
2642
2643 pub fn opaque(&self) -> PersistEpoch {
2644 match self {
2645 Self::Critical(handle) => handle.opaque().clone(),
2646 Self::Leased(_handle) => {
2647 PersistEpoch(None)
2652 }
2653 }
2654 }
2655
2656 pub async fn compare_and_downgrade_since(
2657 &mut self,
2658 expected: &PersistEpoch,
2659 new: (&PersistEpoch, &Antichain<T>),
2660 ) -> Result<Antichain<T>, PersistEpoch> {
2661 match self {
2662 Self::Critical(handle) => handle.compare_and_downgrade_since(expected, new).await,
2663 Self::Leased(handle) => {
2664 let (opaque, since) = new;
2665 assert_none!(opaque.0);
2666
2667 handle.downgrade_since(since).await;
2668
2669 Ok(since.clone())
2670 }
2671 }
2672 }
2673
2674 pub async fn maybe_compare_and_downgrade_since(
2675 &mut self,
2676 expected: &PersistEpoch,
2677 new: (&PersistEpoch, &Antichain<T>),
2678 ) -> Option<Result<Antichain<T>, PersistEpoch>> {
2679 match self {
2680 Self::Critical(handle) => {
2681 handle
2682 .maybe_compare_and_downgrade_since(expected, new)
2683 .await
2684 }
2685 Self::Leased(handle) => {
2686 let (opaque, since) = new;
2687 assert_none!(opaque.0);
2688
2689 handle.maybe_downgrade_since(since).await;
2690
2691 Some(Ok(since.clone()))
2692 }
2693 }
2694 }
2695
2696 pub fn snapshot_stats(
2697 &self,
2698 id: GlobalId,
2699 as_of: Option<Antichain<T>>,
2700 ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2701 match self {
2702 Self::Critical(handle) => {
2703 let res = handle
2704 .snapshot_stats(as_of)
2705 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2706 Box::pin(res)
2707 }
2708 Self::Leased(handle) => {
2709 let res = handle
2710 .snapshot_stats(as_of)
2711 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2712 Box::pin(res)
2713 }
2714 }
2715 }
2716
2717 pub fn snapshot_stats_from_txn(
2718 &self,
2719 id: GlobalId,
2720 data_snapshot: DataSnapshot<T>,
2721 ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2722 match self {
2723 Self::Critical(handle) => Box::pin(
2724 data_snapshot
2725 .snapshot_stats_from_critical(handle)
2726 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2727 ),
2728 Self::Leased(handle) => Box::pin(
2729 data_snapshot
2730 .snapshot_stats_from_leased(handle)
2731 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2732 ),
2733 }
2734 }
2735}
2736
2737#[derive(Debug, Clone)]
2739struct CollectionState<T> {
2740 pub description: CollectionDescription<T>,
2742
2743 pub read_capabilities: MutableAntichain<T>,
2749
2750 pub implied_capability: Antichain<T>,
2754
2755 pub read_policy: ReadPolicy<T>,
2757
2758 pub storage_dependencies: Vec<GlobalId>,
2760
2761 pub write_frontier: Antichain<T>,
2763
2764 pub collection_metadata: CollectionMetadata,
2765}
2766
2767impl<T: TimelyTimestamp> CollectionState<T> {
2768 pub fn new(
2771 description: CollectionDescription<T>,
2772 since: Antichain<T>,
2773 write_frontier: Antichain<T>,
2774 storage_dependencies: Vec<GlobalId>,
2775 metadata: CollectionMetadata,
2776 ) -> Self {
2777 let mut read_capabilities = MutableAntichain::new();
2778 read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2779 Self {
2780 description,
2781 read_capabilities,
2782 implied_capability: since.clone(),
2783 read_policy: ReadPolicy::NoPolicy {
2784 initial_since: since,
2785 },
2786 storage_dependencies,
2787 write_frontier,
2788 collection_metadata: metadata,
2789 }
2790 }
2791
2792 pub fn is_dropped(&self) -> bool {
2794 self.read_capabilities.is_empty()
2795 }
2796}
2797
2798#[derive(Debug)]
2804struct BackgroundTask<T: TimelyTimestamp + Lattice + Codec64> {
2805 config: Arc<Mutex<StorageConfiguration>>,
2806 cmds_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
2807 cmds_rx: mpsc::UnboundedReceiver<BackgroundCmd<T>>,
2808 holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch<T>)>,
2809 finalizable_shards: Arc<ShardIdSet>,
2810 collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
2811 shard_by_id: BTreeMap<GlobalId, ShardId>,
2814 since_handles: BTreeMap<GlobalId, SinceHandleWrapper<T>>,
2815 txns_handle: Option<WriteHandle<SourceData, (), T, StorageDiff>>,
2816 txns_shards: BTreeSet<GlobalId>,
2817}
2818
2819#[derive(Debug)]
2820enum BackgroundCmd<T: TimelyTimestamp + Lattice + Codec64> {
2821 Register {
2822 id: GlobalId,
2823 is_in_txns: bool,
2824 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
2825 since_handle: SinceHandleWrapper<T>,
2826 },
2827 DowngradeSince(Vec<(GlobalId, Antichain<T>)>),
2828 SnapshotStats(
2829 GlobalId,
2830 SnapshotStatsAsOf<T>,
2831 oneshot::Sender<SnapshotStatsRes<T>>,
2832 ),
2833}
2834
2835pub(crate) struct SnapshotStatsRes<T>(BoxFuture<'static, Result<SnapshotStats, StorageError<T>>>);
2837
2838impl<T> Debug for SnapshotStatsRes<T> {
2839 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2840 f.debug_struct("SnapshotStatsRes").finish_non_exhaustive()
2841 }
2842}
2843
2844impl<T> BackgroundTask<T>
2845where
2846 T: TimelyTimestamp
2847 + Lattice
2848 + Codec64
2849 + From<EpochMillis>
2850 + TimestampManipulation
2851 + Into<mz_repr::Timestamp>
2852 + Sync,
2853{
2854 async fn run(&mut self) {
2855 let mut upper_futures: FuturesUnordered<
2857 std::pin::Pin<
2858 Box<
2859 dyn Future<
2860 Output = (
2861 GlobalId,
2862 WriteHandle<SourceData, (), T, StorageDiff>,
2863 Antichain<T>,
2864 ),
2865 > + Send,
2866 >,
2867 >,
2868 > = FuturesUnordered::new();
2869
2870 let gen_upper_future =
2871 |id, mut handle: WriteHandle<_, _, _, _>, prev_upper: Antichain<T>| {
2872 let fut = async move {
2873 soft_assert_or_log!(
2874 !prev_upper.is_empty(),
2875 "cannot await progress when upper is already empty"
2876 );
2877 handle.wait_for_upper_past(&prev_upper).await;
2878 let new_upper = handle.shared_upper();
2879 (id, handle, new_upper)
2880 };
2881
2882 fut
2883 };
2884
2885 let mut txns_upper_future = match self.txns_handle.take() {
2886 Some(txns_handle) => {
2887 let upper = txns_handle.upper().clone();
2888 let txns_upper_future =
2889 gen_upper_future(GlobalId::Transient(1), txns_handle, upper);
2890 txns_upper_future.boxed()
2891 }
2892 None => async { std::future::pending().await }.boxed(),
2893 };
2894
2895 loop {
2896 tokio::select! {
2897 (id, handle, upper) = &mut txns_upper_future => {
2898 trace!("new upper from txns shard: {:?}", upper);
2899 let mut uppers = Vec::new();
2900 for id in self.txns_shards.iter() {
2901 uppers.push((*id, &upper));
2902 }
2903 self.update_write_frontiers(&uppers).await;
2904
2905 let fut = gen_upper_future(id, handle, upper);
2906 txns_upper_future = fut.boxed();
2907 }
2908 Some((id, handle, upper)) = upper_futures.next() => {
2909 if id.is_user() {
2910 trace!("new upper for collection {id}: {:?}", upper);
2911 }
2912 let current_shard = self.shard_by_id.get(&id);
2913 if let Some(shard_id) = current_shard {
2914 if shard_id == &handle.shard_id() {
2915 let uppers = &[(id, &upper)];
2918 self.update_write_frontiers(uppers).await;
2919 if !upper.is_empty() {
2920 let fut = gen_upper_future(id, handle, upper);
2921 upper_futures.push(fut.boxed());
2922 }
2923 } else {
2924 handle.expire().await;
2928 }
2929 }
2930 }
2931 cmd = self.cmds_rx.recv() => {
2932 let cmd = if let Some(cmd) = cmd {
2933 cmd
2934 } else {
2935 break;
2937 };
2938
2939 match cmd {
2940 BackgroundCmd::Register{ id, is_in_txns, write_handle, since_handle } => {
2941 debug!("registering handles for {}", id);
2942 let previous = self.shard_by_id.insert(id, write_handle.shard_id());
2943 if previous.is_some() {
2944 panic!("already registered a WriteHandle for collection {id}");
2945 }
2946
2947 let previous = self.since_handles.insert(id, since_handle);
2948 if previous.is_some() {
2949 panic!("already registered a SinceHandle for collection {id}");
2950 }
2951
2952 if is_in_txns {
2953 self.txns_shards.insert(id);
2954 } else {
2955 let upper = write_handle.upper().clone();
2956 if !upper.is_empty() {
2957 let fut = gen_upper_future(id, write_handle, upper);
2958 upper_futures.push(fut.boxed());
2959 }
2960 }
2961
2962 }
2963 BackgroundCmd::DowngradeSince(cmds) => {
2964 self.downgrade_sinces(cmds).await;
2965 }
2966 BackgroundCmd::SnapshotStats(id, as_of, tx) => {
2967 let res = match self.since_handles.get(&id) {
2973 Some(x) => {
2974 let fut: BoxFuture<
2975 'static,
2976 Result<SnapshotStats, StorageError<T>>,
2977 > = match as_of {
2978 SnapshotStatsAsOf::Direct(as_of) => {
2979 x.snapshot_stats(id, Some(as_of))
2980 }
2981 SnapshotStatsAsOf::Txns(data_snapshot) => {
2982 x.snapshot_stats_from_txn(id, data_snapshot)
2983 }
2984 };
2985 SnapshotStatsRes(fut)
2986 }
2987 None => SnapshotStatsRes(Box::pin(futures::future::ready(Err(
2988 StorageError::IdentifierMissing(id),
2989 )))),
2990 };
2991 let _ = tx.send(res);
2993 }
2994 }
2995 }
2996 Some(holds_changes) = self.holds_rx.recv() => {
2997 let mut batched_changes = BTreeMap::new();
2998 batched_changes.insert(holds_changes.0, holds_changes.1);
2999
3000 while let Ok(mut holds_changes) = self.holds_rx.try_recv() {
3001 let entry = batched_changes.entry(holds_changes.0);
3002 entry
3003 .and_modify(|existing| existing.extend(holds_changes.1.drain()))
3004 .or_insert_with(|| holds_changes.1);
3005 }
3006
3007 let mut collections = self.collections.lock().expect("lock poisoned");
3008
3009 let user_changes = batched_changes
3010 .iter()
3011 .filter(|(id, _c)| id.is_user())
3012 .map(|(id, c)| {
3013 (id.clone(), c.clone())
3014 })
3015 .collect_vec();
3016
3017 if !user_changes.is_empty() {
3018 trace!(?user_changes, "applying holds changes from channel");
3019 }
3020
3021 StorageCollectionsImpl::update_read_capabilities_inner(
3022 &self.cmds_tx,
3023 &mut collections,
3024 &mut batched_changes,
3025 );
3026 }
3027 }
3028 }
3029
3030 warn!("BackgroundTask shutting down");
3031 }
3032
3033 #[instrument(level = "debug")]
3034 async fn update_write_frontiers(&mut self, updates: &[(GlobalId, &Antichain<T>)]) {
3035 let mut read_capability_changes = BTreeMap::default();
3036
3037 let mut self_collections = self.collections.lock().expect("lock poisoned");
3038
3039 for (id, new_upper) in updates.iter() {
3040 let collection = if let Some(c) = self_collections.get_mut(id) {
3041 c
3042 } else {
3043 trace!(
3044 "Reference to absent collection {id}, due to concurrent removal of that collection"
3045 );
3046 continue;
3047 };
3048
3049 if PartialOrder::less_than(&collection.write_frontier, *new_upper) {
3050 collection.write_frontier.clone_from(new_upper);
3051 }
3052
3053 let mut new_read_capability = collection
3054 .read_policy
3055 .frontier(collection.write_frontier.borrow());
3056
3057 if id.is_user() {
3058 trace!(
3059 %id,
3060 implied_capability = ?collection.implied_capability,
3061 policy = ?collection.read_policy,
3062 write_frontier = ?collection.write_frontier,
3063 ?new_read_capability,
3064 "update_write_frontiers");
3065 }
3066
3067 if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
3068 let mut update = ChangeBatch::new();
3069 update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
3070 std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
3071 update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
3072
3073 if !update.is_empty() {
3074 read_capability_changes.insert(*id, update);
3075 }
3076 }
3077 }
3078
3079 if !read_capability_changes.is_empty() {
3080 StorageCollectionsImpl::update_read_capabilities_inner(
3081 &self.cmds_tx,
3082 &mut self_collections,
3083 &mut read_capability_changes,
3084 );
3085 }
3086 }
3087
3088 async fn downgrade_sinces(&mut self, cmds: Vec<(GlobalId, Antichain<T>)>) {
3089 for (id, new_since) in cmds {
3090 let since_handle = if let Some(c) = self.since_handles.get_mut(&id) {
3091 c
3092 } else {
3093 trace!("downgrade_sinces: reference to absent collection {id}");
3095 continue;
3096 };
3097
3098 if id.is_user() {
3099 trace!("downgrading since of {} to {:?}", id, new_since);
3100 }
3101
3102 let epoch = since_handle.opaque().clone();
3103 let result = if new_since.is_empty() {
3104 let res = Some(
3108 since_handle
3109 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3110 .await,
3111 );
3112
3113 info!(%id, "removing persist handles because the since advanced to []!");
3114
3115 let _since_handle = self.since_handles.remove(&id).expect("known to exist");
3116 let dropped_shard_id = if let Some(shard_id) = self.shard_by_id.remove(&id) {
3117 shard_id
3118 } else {
3119 panic!("missing GlobalId -> ShardId mapping for id {id}");
3120 };
3121
3122 self.txns_shards.remove(&id);
3127
3128 if !self
3129 .config
3130 .lock()
3131 .expect("lock poisoned")
3132 .parameters
3133 .finalize_shards
3134 {
3135 info!(
3136 "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3137 );
3138 return;
3139 }
3140
3141 info!(%id, %dropped_shard_id, "enqueing shard finalization due to dropped collection and dropped persist handle");
3142
3143 self.finalizable_shards.lock().insert(dropped_shard_id);
3144
3145 res
3146 } else {
3147 since_handle
3148 .maybe_compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3149 .await
3150 };
3151
3152 if let Some(Err(other_epoch)) = result {
3153 mz_ore::halt!("fenced by envd @ {other_epoch:?}. ours = {epoch:?}");
3154 }
3155 }
3156 }
3157}
3158
3159struct FinalizeShardsTaskConfig {
3160 envd_epoch: NonZeroI64,
3161 config: Arc<Mutex<StorageConfiguration>>,
3162 metrics: StorageCollectionsMetrics,
3163 finalizable_shards: Arc<ShardIdSet>,
3164 finalized_shards: Arc<ShardIdSet>,
3165 persist_location: PersistLocation,
3166 persist: Arc<PersistClientCache>,
3167 read_only: bool,
3168}
3169
3170async fn finalize_shards_task<T>(
3171 FinalizeShardsTaskConfig {
3172 envd_epoch,
3173 config,
3174 metrics,
3175 finalizable_shards,
3176 finalized_shards,
3177 persist_location,
3178 persist,
3179 read_only,
3180 }: FinalizeShardsTaskConfig,
3181) where
3182 T: TimelyTimestamp + TotalOrder + Lattice + Codec64 + Sync,
3183{
3184 if read_only {
3185 info!("disabling shard finalization in read only mode");
3186 return;
3187 }
3188
3189 let mut interval = tokio::time::interval(Duration::from_secs(5));
3190 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
3191 loop {
3192 interval.tick().await;
3193
3194 if !config
3195 .lock()
3196 .expect("lock poisoned")
3197 .parameters
3198 .finalize_shards
3199 {
3200 debug!(
3201 "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3202 );
3203 continue;
3204 }
3205
3206 let current_finalizable_shards = {
3207 finalizable_shards.lock().iter().cloned().collect_vec()
3210 };
3211
3212 if current_finalizable_shards.is_empty() {
3213 debug!("no shards to finalize");
3214 continue;
3215 }
3216
3217 debug!(?current_finalizable_shards, "attempting to finalize shards");
3218
3219 let persist_client = persist.open(persist_location.clone()).await.unwrap();
3221
3222 let metrics = &metrics;
3223 let finalizable_shards = &finalizable_shards;
3224 let finalized_shards = &finalized_shards;
3225 let persist_client = &persist_client;
3226 let diagnostics = &Diagnostics::from_purpose("finalizing shards");
3227
3228 let force_downgrade_since = STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION
3229 .get(config.lock().expect("lock poisoned").config_set());
3230
3231 let epoch = &PersistEpoch::from(envd_epoch);
3232
3233 futures::stream::iter(current_finalizable_shards.clone())
3234 .map(|shard_id| async move {
3235 let persist_client = persist_client.clone();
3236 let diagnostics = diagnostics.clone();
3237 let epoch = epoch.clone();
3238
3239 metrics.finalization_started.inc();
3240
3241 let is_finalized = persist_client
3242 .is_finalized::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
3243 .await
3244 .expect("invalid persist usage");
3245
3246 if is_finalized {
3247 debug!(%shard_id, "shard is already finalized!");
3248 Some(shard_id)
3249 } else {
3250 debug!(%shard_id, "finalizing shard");
3251 let finalize = || async move {
3252 let diagnostics = Diagnostics::from_purpose("finalizing shards");
3254
3255 let schemas = persist_client.latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics.clone()).await.expect("codecs have not changed");
3256 let (key_schema, val_schema) = match schemas {
3257 Some((_, key_schema, val_schema)) => (key_schema, val_schema),
3258 None => (RelationDesc::empty(), UnitSchema),
3259 };
3260
3261 let empty_batch: Vec<((SourceData, ()), T, StorageDiff)> = vec![];
3262 let mut write_handle: WriteHandle<SourceData, (), T, StorageDiff> =
3263 persist_client
3264 .open_writer(
3265 shard_id,
3266 Arc::new(key_schema),
3267 Arc::new(val_schema),
3268 diagnostics,
3269 )
3270 .await
3271 .expect("invalid persist usage");
3272
3273 let upper = write_handle.upper();
3274
3275 if !upper.is_empty() {
3276 let append = write_handle
3277 .append(empty_batch, upper.clone(), Antichain::new())
3278 .await?;
3279
3280 if let Err(e) = append {
3281 warn!(%shard_id, "tried to finalize a shard with an advancing upper: {e:?}");
3282 return Ok(());
3283 }
3284 }
3285 write_handle.expire().await;
3286
3287 if force_downgrade_since {
3288 let mut since_handle: SinceHandle<
3289 SourceData,
3290 (),
3291 T,
3292 StorageDiff,
3293 PersistEpoch,
3294 > = persist_client
3295 .open_critical_since(
3296 shard_id,
3297 PersistClient::CONTROLLER_CRITICAL_SINCE,
3298 Diagnostics::from_purpose("finalizing shards"),
3299 )
3300 .await
3301 .expect("invalid persist usage");
3302 let handle_epoch = since_handle.opaque().clone();
3303 let our_epoch = epoch.clone();
3304 let epoch = if our_epoch.0 > handle_epoch.0 {
3305 handle_epoch
3308 } else {
3309 our_epoch
3314 };
3315 let new_since = Antichain::new();
3316 let downgrade = since_handle
3317 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3318 .await;
3319 if let Err(e) = downgrade {
3320 warn!(
3321 "tried to finalize a shard with an advancing epoch: {e:?}"
3322 );
3323 return Ok(());
3324 }
3325 }
3328
3329 persist_client
3330 .finalize_shard::<SourceData, (), T, StorageDiff>(
3331 shard_id,
3332 Diagnostics::from_purpose("finalizing shards"),
3333 )
3334 .await
3335 };
3336
3337 match finalize().await {
3338 Err(e) => {
3339 warn!("error during finalization of shard {shard_id}: {e:?}");
3342 None
3343 }
3344 Ok(()) => {
3345 debug!(%shard_id, "finalize success!");
3346 Some(shard_id)
3347 }
3348 }
3349 }
3350 })
3351 .buffer_unordered(10)
3356 .for_each(|shard_id| async move {
3360 match shard_id {
3361 None => metrics.finalization_failed.inc(),
3362 Some(shard_id) => {
3363 {
3370 let mut finalizable_shards = finalizable_shards.lock();
3371 let mut finalized_shards = finalized_shards.lock();
3372 finalizable_shards.remove(&shard_id);
3373 finalized_shards.insert(shard_id);
3374 }
3375
3376 metrics.finalization_succeeded.inc();
3377 }
3378 }
3379 })
3380 .await;
3381
3382 debug!("done finalizing shards");
3383 }
3384}
3385
3386#[derive(Debug)]
3387pub(crate) enum SnapshotStatsAsOf<T: TimelyTimestamp + Lattice + Codec64> {
3388 Direct(Antichain<T>),
3391 Txns(DataSnapshot<T>),
3394}
3395
3396#[cfg(test)]
3397mod tests {
3398 use std::str::FromStr;
3399 use std::sync::Arc;
3400
3401 use mz_build_info::DUMMY_BUILD_INFO;
3402 use mz_dyncfg::ConfigSet;
3403 use mz_ore::assert_err;
3404 use mz_ore::metrics::{MetricsRegistry, UIntGauge};
3405 use mz_ore::now::SYSTEM_TIME;
3406 use mz_ore::url::SensitiveUrl;
3407 use mz_persist_client::cache::PersistClientCache;
3408 use mz_persist_client::cfg::PersistConfig;
3409 use mz_persist_client::rpc::PubSubClientConnection;
3410 use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
3411 use mz_persist_types::codec_impls::UnitSchema;
3412 use mz_repr::{RelationDesc, Row};
3413 use mz_secrets::InMemorySecretsController;
3414
3415 use super::*;
3416
3417 #[mz_ore::test(tokio::test)]
3418 #[cfg_attr(miri, ignore)] async fn test_snapshot_stats(&self) {
3420 let persist_location = PersistLocation {
3421 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3422 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3423 };
3424 let persist_client = PersistClientCache::new(
3425 PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
3426 &MetricsRegistry::new(),
3427 |_, _| PubSubClientConnection::noop(),
3428 );
3429 let persist_client = Arc::new(persist_client);
3430
3431 let (cmds_tx, mut background_task) =
3432 BackgroundTask::new_for_test(persist_location.clone(), Arc::clone(&persist_client));
3433 let background_task =
3434 mz_ore::task::spawn(|| "storage_collections::background_task", async move {
3435 background_task.run().await
3436 });
3437
3438 let persist = persist_client.open(persist_location).await.unwrap();
3439
3440 let shard_id = ShardId::new();
3441 let since_handle = persist
3442 .open_critical_since(
3443 shard_id,
3444 PersistClient::CONTROLLER_CRITICAL_SINCE,
3445 Diagnostics::for_tests(),
3446 )
3447 .await
3448 .unwrap();
3449 let write_handle = persist
3450 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3451 shard_id,
3452 Arc::new(RelationDesc::empty()),
3453 Arc::new(UnitSchema),
3454 Diagnostics::for_tests(),
3455 )
3456 .await
3457 .unwrap();
3458
3459 cmds_tx
3460 .send(BackgroundCmd::Register {
3461 id: GlobalId::User(1),
3462 is_in_txns: false,
3463 since_handle: SinceHandleWrapper::Critical(since_handle),
3464 write_handle,
3465 })
3466 .unwrap();
3467
3468 let mut write_handle = persist
3469 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3470 shard_id,
3471 Arc::new(RelationDesc::empty()),
3472 Arc::new(UnitSchema),
3473 Diagnostics::for_tests(),
3474 )
3475 .await
3476 .unwrap();
3477
3478 let stats =
3480 snapshot_stats(&cmds_tx, GlobalId::User(2), Antichain::from_elem(0.into())).await;
3481 assert_err!(stats);
3482
3483 let stats_fut = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3485 assert_none!(stats_fut.now_or_never());
3486
3487 let stats_ts1_fut =
3489 snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3490
3491 let data = (
3493 (SourceData(Ok(Row::default())), ()),
3494 mz_repr::Timestamp::from(0),
3495 1i64,
3496 );
3497 let () = write_handle
3498 .compare_and_append(
3499 &[data],
3500 Antichain::from_elem(0.into()),
3501 Antichain::from_elem(1.into()),
3502 )
3503 .await
3504 .unwrap()
3505 .unwrap();
3506
3507 let stats = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(0.into()))
3509 .await
3510 .unwrap();
3511 assert_eq!(stats.num_updates, 1);
3512
3513 let data = (
3515 (SourceData(Ok(Row::default())), ()),
3516 mz_repr::Timestamp::from(1),
3517 1i64,
3518 );
3519 let () = write_handle
3520 .compare_and_append(
3521 &[data],
3522 Antichain::from_elem(1.into()),
3523 Antichain::from_elem(2.into()),
3524 )
3525 .await
3526 .unwrap()
3527 .unwrap();
3528
3529 let stats = stats_ts1_fut.await.unwrap();
3530 assert_eq!(stats.num_updates, 2);
3531
3532 drop(background_task);
3534 }
3535
3536 async fn snapshot_stats<T: TimelyTimestamp + Lattice + Codec64>(
3537 cmds_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
3538 id: GlobalId,
3539 as_of: Antichain<T>,
3540 ) -> Result<SnapshotStats, StorageError<T>> {
3541 let (tx, rx) = oneshot::channel();
3542 cmds_tx
3543 .send(BackgroundCmd::SnapshotStats(
3544 id,
3545 SnapshotStatsAsOf::Direct(as_of),
3546 tx,
3547 ))
3548 .unwrap();
3549 let res = rx.await.expect("BackgroundTask should be live").0;
3550
3551 res.await
3552 }
3553
3554 impl<T: TimelyTimestamp + Lattice + Codec64> BackgroundTask<T> {
3555 fn new_for_test(
3556 _persist_location: PersistLocation,
3557 _persist_client: Arc<PersistClientCache>,
3558 ) -> (mpsc::UnboundedSender<BackgroundCmd<T>>, Self) {
3559 let (cmds_tx, cmds_rx) = mpsc::unbounded_channel();
3560 let (_holds_tx, holds_rx) = mpsc::unbounded_channel();
3561 let connection_context =
3562 ConnectionContext::for_tests(Arc::new(InMemorySecretsController::new()));
3563
3564 let task = Self {
3565 config: Arc::new(Mutex::new(StorageConfiguration::new(
3566 connection_context,
3567 ConfigSet::default(),
3568 ))),
3569 cmds_tx: cmds_tx.clone(),
3570 cmds_rx,
3571 holds_rx,
3572 finalizable_shards: Arc::new(ShardIdSet::new(
3573 UIntGauge::new("finalizable_shards", "dummy gauge for tests").unwrap(),
3574 )),
3575 collections: Arc::new(Mutex::new(BTreeMap::new())),
3576 shard_by_id: BTreeMap::new(),
3577 since_handles: BTreeMap::new(),
3578 txns_handle: None,
3579 txns_shards: BTreeSet::new(),
3580 };
3581
3582 (cmds_tx, task)
3583 }
3584 }
3585}