1use std::cmp::Reverse;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::num::NonZeroI64;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use async_trait::async_trait;
20use differential_dataflow::lattice::Lattice;
21use futures::future::BoxFuture;
22use futures::stream::{BoxStream, FuturesUnordered};
23use futures::{Future, FutureExt, StreamExt};
24use itertools::Itertools;
25
26use mz_ore::collections::CollectionExt;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::now::{EpochMillis, NowFn};
29use mz_ore::task::AbortOnDropHandle;
30use mz_ore::{assert_none, instrument, soft_assert_or_log};
31use mz_persist_client::cache::PersistClientCache;
32use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
33use mz_persist_client::critical::SinceHandle;
34use mz_persist_client::read::{Cursor, ReadHandle};
35use mz_persist_client::schema::CaESchema;
36use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
37use mz_persist_client::write::WriteHandle;
38use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
39use mz_persist_types::Codec64;
40use mz_persist_types::codec_impls::UnitSchema;
41use mz_persist_types::txn::TxnsCodec;
42use mz_repr::{GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
43use mz_storage_types::StorageDiff;
44use mz_storage_types::configuration::StorageConfiguration;
45use mz_storage_types::connections::ConnectionContext;
46use mz_storage_types::connections::inline::InlinedConnection;
47use mz_storage_types::controller::{CollectionMetadata, StorageError, TxnsCodecRow};
48use mz_storage_types::dyncfgs::STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION;
49use mz_storage_types::parameters::StorageParameters;
50use mz_storage_types::read_holds::{ReadHold, ReadHoldError};
51use mz_storage_types::read_policy::ReadPolicy;
52use mz_storage_types::sources::{
53 GenericSourceConnection, SourceData, SourceDesc, SourceEnvelope, SourceExport,
54 SourceExportDataConfig, Timeline,
55};
56use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
57use mz_txn_wal::metrics::Metrics as TxnMetrics;
58use mz_txn_wal::txn_read::{DataSnapshot, TxnsRead};
59use mz_txn_wal::txns::TxnsHandle;
60use timely::PartialOrder;
61use timely::order::TotalOrder;
62use timely::progress::frontier::MutableAntichain;
63use timely::progress::{Antichain, ChangeBatch, Timestamp as TimelyTimestamp};
64use tokio::sync::{mpsc, oneshot};
65use tokio::time::MissedTickBehavior;
66use tracing::{debug, info, trace, warn};
67
68use crate::client::TimestamplessUpdateBuilder;
69use crate::controller::{
70 CollectionDescription, DataSource, PersistEpoch, StorageMetadata, StorageTxn,
71};
72use crate::storage_collections::metrics::{ShardIdSet, StorageCollectionsMetrics};
73
74mod metrics;
75
76#[async_trait]
90pub trait StorageCollections: Debug {
91 type Timestamp: TimelyTimestamp;
92
93 async fn initialize_state(
100 &self,
101 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
102 init_ids: BTreeSet<GlobalId>,
103 ) -> Result<(), StorageError<Self::Timestamp>>;
104
105 fn update_parameters(&self, config_params: StorageParameters);
107
108 fn collection_metadata(
110 &self,
111 id: GlobalId,
112 ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>>;
113
114 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
120
121 fn collection_frontiers(
123 &self,
124 id: GlobalId,
125 ) -> Result<CollectionFrontiers<Self::Timestamp>, StorageError<Self::Timestamp>> {
126 let frontiers = self
127 .collections_frontiers(vec![id])?
128 .expect_element(|| "known to exist");
129
130 Ok(frontiers)
131 }
132
133 fn collections_frontiers(
136 &self,
137 id: Vec<GlobalId>,
138 ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>>;
139
140 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>>;
145
146 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
149
150 async fn snapshot_stats(
153 &self,
154 id: GlobalId,
155 as_of: Antichain<Self::Timestamp>,
156 ) -> Result<SnapshotStats, StorageError<Self::Timestamp>>;
157
158 async fn snapshot_parts_stats(
167 &self,
168 id: GlobalId,
169 as_of: Antichain<Self::Timestamp>,
170 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>;
171
172 fn snapshot(
174 &self,
175 id: GlobalId,
176 as_of: Self::Timestamp,
177 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>;
178
179 async fn snapshot_latest(
182 &self,
183 id: GlobalId,
184 ) -> Result<Vec<Row>, StorageError<Self::Timestamp>>;
185
186 fn snapshot_cursor(
188 &self,
189 id: GlobalId,
190 as_of: Self::Timestamp,
191 ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
192 where
193 Self::Timestamp: Codec64 + TimelyTimestamp + Lattice;
194
195 fn snapshot_and_stream(
200 &self,
201 id: GlobalId,
202 as_of: Self::Timestamp,
203 ) -> BoxFuture<
204 'static,
205 Result<
206 BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
207 StorageError<Self::Timestamp>,
208 >,
209 >;
210
211 fn create_update_builder(
214 &self,
215 id: GlobalId,
216 ) -> BoxFuture<
217 'static,
218 Result<
219 TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
220 StorageError<Self::Timestamp>,
221 >,
222 >
223 where
224 Self::Timestamp: Lattice + Codec64;
225
226 async fn prepare_state(
232 &self,
233 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
234 ids_to_add: BTreeSet<GlobalId>,
235 ids_to_drop: BTreeSet<GlobalId>,
236 ids_to_register: BTreeMap<GlobalId, ShardId>,
237 ) -> Result<(), StorageError<Self::Timestamp>>;
238
239 async fn create_collections_for_bootstrap(
265 &self,
266 storage_metadata: &StorageMetadata,
267 register_ts: Option<Self::Timestamp>,
268 collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
269 migrated_storage_collections: &BTreeSet<GlobalId>,
270 ) -> Result<(), StorageError<Self::Timestamp>>;
271
272 async fn alter_ingestion_source_desc(
280 &self,
281 ingestion_id: GlobalId,
282 source_desc: SourceDesc,
283 ) -> Result<(), StorageError<Self::Timestamp>>;
284
285 async fn alter_ingestion_export_data_configs(
287 &self,
288 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
289 ) -> Result<(), StorageError<Self::Timestamp>>;
290
291 async fn alter_ingestion_connections(
296 &self,
297 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
298 ) -> Result<(), StorageError<Self::Timestamp>>;
299
300 async fn alter_table_desc(
302 &self,
303 existing_collection: GlobalId,
304 new_collection: GlobalId,
305 new_desc: RelationDesc,
306 expected_version: RelationVersion,
307 ) -> Result<(), StorageError<Self::Timestamp>>;
308
309 fn drop_collections_unvalidated(
321 &self,
322 storage_metadata: &StorageMetadata,
323 identifiers: Vec<GlobalId>,
324 );
325
326 fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>);
340
341 fn acquire_read_holds(
344 &self,
345 desired_holds: Vec<GlobalId>,
346 ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError>;
347
348 fn determine_time_dependence(
351 &self,
352 id: GlobalId,
353 ) -> Result<Option<TimeDependence>, TimeDependenceError>;
354}
355
356pub struct SnapshotCursor<T: Codec64 + TimelyTimestamp + Lattice> {
359 pub _read_handle: ReadHandle<SourceData, (), T, StorageDiff>,
362 pub cursor: Cursor<SourceData, (), T, StorageDiff>,
363}
364
365impl<T: Codec64 + TimelyTimestamp + Lattice + Sync> SnapshotCursor<T> {
366 pub async fn next(
367 &mut self,
368 ) -> Option<
369 impl Iterator<
370 Item = (
371 (Result<SourceData, String>, Result<(), String>),
372 T,
373 StorageDiff,
374 ),
375 > + Sized
376 + '_,
377 > {
378 self.cursor.next().await
379 }
380}
381
382#[derive(Debug)]
384pub struct CollectionFrontiers<T> {
385 pub id: GlobalId,
387
388 pub write_frontier: Antichain<T>,
390
391 pub implied_capability: Antichain<T>,
398
399 pub read_capabilities: Antichain<T>,
402}
403
404#[derive(Debug, Clone)]
407pub struct StorageCollectionsImpl<
408 T: TimelyTimestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
409> {
410 envd_epoch: NonZeroI64,
413
414 read_only: bool,
420
421 finalizable_shards: Arc<ShardIdSet>,
424
425 finalized_shards: Arc<ShardIdSet>,
430
431 collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
433
434 txns_read: TxnsRead<T>,
436
437 config: Arc<Mutex<StorageConfiguration>>,
439
440 initial_txn_upper: Antichain<T>,
449
450 persist_location: PersistLocation,
452
453 persist: Arc<PersistClientCache>,
455
456 cmd_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
458
459 holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<T>)>,
461
462 _background_task: Arc<AbortOnDropHandle<()>>,
464 _finalize_shards_task: Arc<AbortOnDropHandle<()>>,
465}
466
467impl<T> StorageCollectionsImpl<T>
477where
478 T: TimelyTimestamp
479 + Lattice
480 + Codec64
481 + From<EpochMillis>
482 + TimestampManipulation
483 + Into<mz_repr::Timestamp>
484 + Sync,
485{
486 pub async fn new(
494 persist_location: PersistLocation,
495 persist_clients: Arc<PersistClientCache>,
496 metrics_registry: &MetricsRegistry,
497 _now: NowFn,
498 txns_metrics: Arc<TxnMetrics>,
499 envd_epoch: NonZeroI64,
500 read_only: bool,
501 connection_context: ConnectionContext,
502 txn: &dyn StorageTxn<T>,
503 ) -> Self {
504 let metrics = StorageCollectionsMetrics::register_into(metrics_registry);
505
506 let txns_id = txn
510 .get_txn_wal_shard()
511 .expect("must call prepare initialization before creating StorageCollections");
512
513 let txns_client = persist_clients
514 .open(persist_location.clone())
515 .await
516 .expect("location should be valid");
517
518 let _txns_handle: TxnsHandle<SourceData, (), T, StorageDiff, PersistEpoch, TxnsCodecRow> =
521 TxnsHandle::open(
522 T::minimum(),
523 txns_client.clone(),
524 txns_client.dyncfgs().clone(),
525 Arc::clone(&txns_metrics),
526 txns_id,
527 )
528 .await;
529
530 let (txns_key_schema, txns_val_schema) = TxnsCodecRow::schemas();
532 let mut txns_write = txns_client
533 .open_writer(
534 txns_id,
535 Arc::new(txns_key_schema),
536 Arc::new(txns_val_schema),
537 Diagnostics {
538 shard_name: "txns".to_owned(),
539 handle_purpose: "commit txns".to_owned(),
540 },
541 )
542 .await
543 .expect("txns schema shouldn't change");
544
545 let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
546
547 let collections = Arc::new(std::sync::Mutex::new(BTreeMap::default()));
548 let finalizable_shards =
549 Arc::new(ShardIdSet::new(metrics.finalization_outstanding.clone()));
550 let finalized_shards =
551 Arc::new(ShardIdSet::new(metrics.finalization_pending_commit.clone()));
552 let config = Arc::new(Mutex::new(StorageConfiguration::new(
553 connection_context,
554 mz_dyncfgs::all_dyncfgs(),
555 )));
556
557 let initial_txn_upper = txns_write.fetch_recent_upper().await.to_owned();
558
559 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
560 let (holds_tx, holds_rx) = mpsc::unbounded_channel();
561 let mut background_task = BackgroundTask {
562 config: Arc::clone(&config),
563 cmds_tx: cmd_tx.clone(),
564 cmds_rx: cmd_rx,
565 holds_rx,
566 collections: Arc::clone(&collections),
567 finalizable_shards: Arc::clone(&finalizable_shards),
568 shard_by_id: BTreeMap::new(),
569 since_handles: BTreeMap::new(),
570 txns_handle: Some(txns_write),
571 txns_shards: Default::default(),
572 };
573
574 let background_task =
575 mz_ore::task::spawn(|| "storage_collections::background_task", async move {
576 background_task.run().await
577 });
578
579 let finalize_shards_task = mz_ore::task::spawn(
580 || "storage_collections::finalize_shards_task",
581 finalize_shards_task::<T>(FinalizeShardsTaskConfig {
582 envd_epoch: envd_epoch.clone(),
583 config: Arc::clone(&config),
584 metrics,
585 finalizable_shards: Arc::clone(&finalizable_shards),
586 finalized_shards: Arc::clone(&finalized_shards),
587 persist_location: persist_location.clone(),
588 persist: Arc::clone(&persist_clients),
589 read_only,
590 }),
591 );
592
593 Self {
594 finalizable_shards,
595 finalized_shards,
596 collections,
597 txns_read,
598 envd_epoch,
599 read_only,
600 config,
601 initial_txn_upper,
602 persist_location,
603 persist: persist_clients,
604 cmd_tx,
605 holds_tx,
606 _background_task: Arc::new(background_task.abort_on_drop()),
607 _finalize_shards_task: Arc::new(finalize_shards_task.abort_on_drop()),
608 }
609 }
610
611 async fn open_data_handles(
619 &self,
620 id: &GlobalId,
621 shard: ShardId,
622 since: Option<&Antichain<T>>,
623 relation_desc: RelationDesc,
624 persist_client: &PersistClient,
625 ) -> (
626 WriteHandle<SourceData, (), T, StorageDiff>,
627 SinceHandleWrapper<T>,
628 ) {
629 let since_handle = if self.read_only {
630 let read_handle = self
631 .open_leased_handle(id, shard, relation_desc.clone(), since, persist_client)
632 .await;
633 SinceHandleWrapper::Leased(read_handle)
634 } else {
635 persist_client
638 .upgrade_version::<SourceData, (), T, StorageDiff>(
639 shard,
640 Diagnostics {
641 shard_name: id.to_string(),
642 handle_purpose: format!("controller data for {}", id),
643 },
644 )
645 .await
646 .expect("invalid persist usage");
647
648 let since_handle = self
649 .open_critical_handle(id, shard, since, persist_client)
650 .await;
651
652 SinceHandleWrapper::Critical(since_handle)
653 };
654
655 let mut write_handle = self
656 .open_write_handle(id, shard, relation_desc, persist_client)
657 .await;
658
659 write_handle.fetch_recent_upper().await;
670
671 (write_handle, since_handle)
672 }
673
674 async fn open_write_handle(
676 &self,
677 id: &GlobalId,
678 shard: ShardId,
679 relation_desc: RelationDesc,
680 persist_client: &PersistClient,
681 ) -> WriteHandle<SourceData, (), T, StorageDiff> {
682 let diagnostics = Diagnostics {
683 shard_name: id.to_string(),
684 handle_purpose: format!("controller data for {}", id),
685 };
686
687 let write = persist_client
688 .open_writer(
689 shard,
690 Arc::new(relation_desc),
691 Arc::new(UnitSchema),
692 diagnostics.clone(),
693 )
694 .await
695 .expect("invalid persist usage");
696
697 write
698 }
699
700 async fn open_critical_handle(
708 &self,
709 id: &GlobalId,
710 shard: ShardId,
711 since: Option<&Antichain<T>>,
712 persist_client: &PersistClient,
713 ) -> SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch> {
714 tracing::debug!(%id, ?since, "opening critical handle");
715
716 assert!(
717 !self.read_only,
718 "attempting to open critical SinceHandle in read-only mode"
719 );
720
721 let diagnostics = Diagnostics {
722 shard_name: id.to_string(),
723 handle_purpose: format!("controller data for {}", id),
724 };
725
726 let since_handle = {
729 let mut handle: SinceHandle<_, _, _, _, PersistEpoch> = persist_client
732 .open_critical_since(
733 shard,
734 PersistClient::CONTROLLER_CRITICAL_SINCE,
735 diagnostics.clone(),
736 )
737 .await
738 .expect("invalid persist usage");
739
740 let provided_since = match since {
744 Some(since) => since,
745 None => &Antichain::from_elem(T::minimum()),
746 };
747 let since = handle.since().join(provided_since);
748
749 let our_epoch = self.envd_epoch;
750
751 loop {
752 let current_epoch: PersistEpoch = handle.opaque().clone();
753
754 let unchecked_success = current_epoch.0.map(|e| e <= our_epoch).unwrap_or(true);
756
757 if unchecked_success {
758 let checked_success = handle
761 .compare_and_downgrade_since(
762 ¤t_epoch,
763 (&PersistEpoch::from(our_epoch), &since),
764 )
765 .await
766 .is_ok();
767 if checked_success {
768 break handle;
769 }
770 } else {
771 mz_ore::halt!("fenced by envd @ {current_epoch:?}. ours = {our_epoch}");
772 }
773 }
774 };
775
776 since_handle
777 }
778
779 async fn open_leased_handle(
785 &self,
786 id: &GlobalId,
787 shard: ShardId,
788 relation_desc: RelationDesc,
789 since: Option<&Antichain<T>>,
790 persist_client: &PersistClient,
791 ) -> ReadHandle<SourceData, (), T, StorageDiff> {
792 tracing::debug!(%id, ?since, "opening leased handle");
793
794 let diagnostics = Diagnostics {
795 shard_name: id.to_string(),
796 handle_purpose: format!("controller data for {}", id),
797 };
798
799 let use_critical_since = false;
800 let mut handle: ReadHandle<_, _, _, _> = persist_client
801 .open_leased_reader(
802 shard,
803 Arc::new(relation_desc),
804 Arc::new(UnitSchema),
805 diagnostics.clone(),
806 use_critical_since,
807 )
808 .await
809 .expect("invalid persist usage");
810
811 let provided_since = match since {
815 Some(since) => since,
816 None => &Antichain::from_elem(T::minimum()),
817 };
818 let since = handle.since().join(provided_since);
819
820 handle.downgrade_since(&since).await;
821
822 handle
823 }
824
825 fn register_handles(
826 &self,
827 id: GlobalId,
828 is_in_txns: bool,
829 since_handle: SinceHandleWrapper<T>,
830 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
831 ) {
832 self.send(BackgroundCmd::Register {
833 id,
834 is_in_txns,
835 since_handle,
836 write_handle,
837 });
838 }
839
840 fn send(&self, cmd: BackgroundCmd<T>) {
841 let _ = self.cmd_tx.send(cmd);
842 }
843
844 async fn snapshot_stats_inner(
845 &self,
846 id: GlobalId,
847 as_of: SnapshotStatsAsOf<T>,
848 ) -> Result<SnapshotStats, StorageError<T>> {
849 let (tx, rx) = oneshot::channel();
856 self.send(BackgroundCmd::SnapshotStats(id, as_of, tx));
857 rx.await.expect("BackgroundTask should be live").0.await
858 }
859
860 fn install_collection_dependency_read_holds_inner(
866 &self,
867 self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
868 id: GlobalId,
869 ) -> Result<(), StorageError<T>> {
870 let (deps, collection_implied_capability) = match self_collections.get(&id) {
871 Some(CollectionState {
872 storage_dependencies: deps,
873 implied_capability,
874 ..
875 }) => (deps.clone(), implied_capability),
876 _ => return Ok(()),
877 };
878
879 for dep in deps.iter() {
880 let dep_collection = self_collections
881 .get(dep)
882 .ok_or(StorageError::IdentifierMissing(id))?;
883
884 mz_ore::soft_assert_or_log!(
885 PartialOrder::less_equal(
886 &dep_collection.implied_capability,
887 collection_implied_capability
888 ),
889 "dependency since ({dep}@{:?}) cannot be in advance of dependent's since ({id}@{:?})",
890 dep_collection.implied_capability,
891 collection_implied_capability,
892 );
893 }
894
895 self.install_read_capabilities_inner(
896 self_collections,
897 id,
898 &deps,
899 collection_implied_capability.clone(),
900 )?;
901
902 Ok(())
903 }
904
905 fn determine_collection_dependencies(
909 &self,
910 self_collections: &BTreeMap<GlobalId, CollectionState<T>>,
911 source_id: GlobalId,
912 data_source: &DataSource<T>,
913 ) -> Result<Vec<GlobalId>, StorageError<T>> {
914 let dependencies = match &data_source {
915 DataSource::Introspection(_)
916 | DataSource::Webhook
917 | DataSource::Table { primary: None }
918 | DataSource::Progress
919 | DataSource::Other => Vec::new(),
920 DataSource::Table {
921 primary: Some(primary),
922 } => vec![*primary],
923 DataSource::IngestionExport {
924 ingestion_id,
925 data_config,
926 ..
927 } => {
928 let source = self_collections
931 .get(ingestion_id)
932 .ok_or(StorageError::IdentifierMissing(*ingestion_id))?;
933 let DataSource::Ingestion(ingestion) = &source.description.data_source else {
934 panic!("SourceExport must refer to a primary source that already exists");
935 };
936
937 match data_config.envelope {
938 SourceEnvelope::CdcV2 => Vec::new(),
939 _ => vec![ingestion.remap_collection_id],
940 }
941 }
942 DataSource::Ingestion(ingestion) => {
944 if ingestion.remap_collection_id == source_id {
945 vec![]
946 } else {
947 vec![ingestion.remap_collection_id]
948 }
949 }
950 DataSource::Sink { desc } => vec![desc.sink.from],
951 };
952
953 Ok(dependencies)
954 }
955
956 #[instrument(level = "debug")]
958 fn install_read_capabilities_inner(
959 &self,
960 self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
961 from_id: GlobalId,
962 storage_dependencies: &[GlobalId],
963 read_capability: Antichain<T>,
964 ) -> Result<(), StorageError<T>> {
965 let mut changes = ChangeBatch::new();
966 for time in read_capability.iter() {
967 changes.update(time.clone(), 1);
968 }
969
970 if tracing::span_enabled!(tracing::Level::TRACE) {
971 let user_capabilities = self_collections
973 .iter_mut()
974 .filter(|(id, _c)| id.is_user())
975 .map(|(id, c)| {
976 let updates = c.read_capabilities.updates().cloned().collect_vec();
977 (*id, c.implied_capability.clone(), updates)
978 })
979 .collect_vec();
980
981 trace!(
982 %from_id,
983 ?storage_dependencies,
984 ?read_capability,
985 ?user_capabilities,
986 "install_read_capabilities_inner");
987 }
988
989 let mut storage_read_updates = storage_dependencies
990 .iter()
991 .map(|id| (*id, changes.clone()))
992 .collect();
993
994 StorageCollectionsImpl::update_read_capabilities_inner(
995 &self.cmd_tx,
996 self_collections,
997 &mut storage_read_updates,
998 );
999
1000 if tracing::span_enabled!(tracing::Level::TRACE) {
1001 let user_capabilities = self_collections
1003 .iter_mut()
1004 .filter(|(id, _c)| id.is_user())
1005 .map(|(id, c)| {
1006 let updates = c.read_capabilities.updates().cloned().collect_vec();
1007 (*id, c.implied_capability.clone(), updates)
1008 })
1009 .collect_vec();
1010
1011 trace!(
1012 %from_id,
1013 ?storage_dependencies,
1014 ?read_capability,
1015 ?user_capabilities,
1016 "after install_read_capabilities_inner!");
1017 }
1018
1019 Ok(())
1020 }
1021
1022 async fn recent_upper(&self, id: GlobalId) -> Result<Antichain<T>, StorageError<T>> {
1023 let metadata = &self.collection_metadata(id)?;
1024 let persist_client = self
1025 .persist
1026 .open(metadata.persist_location.clone())
1027 .await
1028 .unwrap();
1029 let diagnostics = Diagnostics {
1032 shard_name: id.to_string(),
1033 handle_purpose: format!("controller data for {}", id),
1034 };
1035 let write = persist_client
1038 .open_writer::<SourceData, (), T, StorageDiff>(
1039 metadata.data_shard,
1040 Arc::new(metadata.relation_desc.clone()),
1041 Arc::new(UnitSchema),
1042 diagnostics.clone(),
1043 )
1044 .await
1045 .expect("invalid persist usage");
1046 Ok(write.shared_upper())
1047 }
1048
1049 async fn read_handle_for_snapshot(
1050 persist: Arc<PersistClientCache>,
1051 metadata: &CollectionMetadata,
1052 id: GlobalId,
1053 ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
1054 let persist_client = persist
1055 .open(metadata.persist_location.clone())
1056 .await
1057 .unwrap();
1058
1059 let read_handle = persist_client
1065 .open_leased_reader::<SourceData, (), _, _>(
1066 metadata.data_shard,
1067 Arc::new(metadata.relation_desc.clone()),
1068 Arc::new(UnitSchema),
1069 Diagnostics {
1070 shard_name: id.to_string(),
1071 handle_purpose: format!("snapshot {}", id),
1072 },
1073 USE_CRITICAL_SINCE_SNAPSHOT.get(&persist.cfg),
1074 )
1075 .await
1076 .expect("invalid persist usage");
1077 Ok(read_handle)
1078 }
1079
1080 fn snapshot(
1086 &self,
1087 id: GlobalId,
1088 as_of: T,
1089 txns_read: &TxnsRead<T>,
1090 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<T>>>
1091 where
1092 T: Codec64 + From<EpochMillis> + TimestampManipulation,
1093 {
1094 let metadata = match self.collection_metadata(id) {
1095 Ok(metadata) => metadata.clone(),
1096 Err(e) => return async { Err(e) }.boxed(),
1097 };
1098 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1099 assert_eq!(txns_id, txns_read.txns_id());
1100 txns_read.clone()
1101 });
1102 let persist = Arc::clone(&self.persist);
1103 async move {
1104 let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1105 let contents = match txns_read {
1106 None => {
1107 read_handle
1109 .snapshot_and_fetch(Antichain::from_elem(as_of))
1110 .await
1111 }
1112 Some(txns_read) => {
1113 txns_read.update_gt(as_of.clone()).await;
1127 let data_snapshot = txns_read
1128 .data_snapshot(metadata.data_shard, as_of.clone())
1129 .await;
1130 data_snapshot.snapshot_and_fetch(&mut read_handle).await
1131 }
1132 };
1133 match contents {
1134 Ok(contents) => {
1135 let mut snapshot = Vec::with_capacity(contents.len());
1136 for ((data, _), _, diff) in contents {
1137 let row = data.expect("invalid protobuf data").0?;
1140 snapshot.push((row, diff));
1141 }
1142 Ok(snapshot)
1143 }
1144 Err(_) => Err(StorageError::ReadBeforeSince(id)),
1145 }
1146 }
1147 .boxed()
1148 }
1149
1150 fn snapshot_and_stream(
1151 &self,
1152 id: GlobalId,
1153 as_of: T,
1154 txns_read: &TxnsRead<T>,
1155 ) -> BoxFuture<'static, Result<BoxStream<'static, (SourceData, T, StorageDiff)>, StorageError<T>>>
1156 {
1157 use futures::stream::StreamExt;
1158
1159 let metadata = match self.collection_metadata(id) {
1160 Ok(metadata) => metadata.clone(),
1161 Err(e) => return async { Err(e) }.boxed(),
1162 };
1163 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1164 assert_eq!(txns_id, txns_read.txns_id());
1165 txns_read.clone()
1166 });
1167 let persist = Arc::clone(&self.persist);
1168
1169 async move {
1170 let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1171 let stream = match txns_read {
1172 None => {
1173 read_handle
1175 .snapshot_and_stream(Antichain::from_elem(as_of))
1176 .await
1177 .map_err(|_| StorageError::ReadBeforeSince(id))?
1178 .boxed()
1179 }
1180 Some(txns_read) => {
1181 txns_read.update_gt(as_of.clone()).await;
1182 let data_snapshot = txns_read
1183 .data_snapshot(metadata.data_shard, as_of.clone())
1184 .await;
1185 data_snapshot
1186 .snapshot_and_stream(&mut read_handle)
1187 .await
1188 .map_err(|_| StorageError::ReadBeforeSince(id))?
1189 .boxed()
1190 }
1191 };
1192
1193 let stream = stream
1195 .map(|((k, _v), t, d)| {
1196 let data = k.expect("error while streaming from Persist");
1199 (data, t, d)
1200 })
1201 .boxed();
1202 Ok(stream)
1203 }
1204 .boxed()
1205 }
1206
1207 fn set_read_policies_inner(
1208 &self,
1209 collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1210 policies: Vec<(GlobalId, ReadPolicy<T>)>,
1211 ) {
1212 trace!("set_read_policies: {:?}", policies);
1213
1214 let mut read_capability_changes = BTreeMap::default();
1215
1216 for (id, policy) in policies.into_iter() {
1217 let collection = match collections.get_mut(&id) {
1218 Some(c) => c,
1219 None => {
1220 panic!("Reference to absent collection {id}");
1221 }
1222 };
1223
1224 let mut new_read_capability = policy.frontier(collection.write_frontier.borrow());
1225
1226 if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
1227 let mut update = ChangeBatch::new();
1228 update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
1229 std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
1230 update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
1231 if !update.is_empty() {
1232 read_capability_changes.insert(id, update);
1233 }
1234 }
1235
1236 collection.read_policy = policy;
1237 }
1238
1239 for (id, changes) in read_capability_changes.iter() {
1240 if id.is_user() {
1241 trace!(%id, ?changes, "in set_read_policies, capability changes");
1242 }
1243 }
1244
1245 if !read_capability_changes.is_empty() {
1246 StorageCollectionsImpl::update_read_capabilities_inner(
1247 &self.cmd_tx,
1248 collections,
1249 &mut read_capability_changes,
1250 );
1251 }
1252 }
1253
1254 fn update_read_capabilities_inner(
1258 cmd_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
1259 collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1260 updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>,
1261 ) {
1262 let mut collections_net = BTreeMap::new();
1264
1265 while let Some(id) = updates.keys().rev().next().cloned() {
1270 let mut update = updates.remove(&id).unwrap();
1271
1272 if id.is_user() {
1273 trace!(id = ?id, update = ?update, "update_read_capabilities");
1274 }
1275
1276 let collection = if let Some(c) = collections.get_mut(&id) {
1277 c
1278 } else {
1279 let has_positive_updates = update.iter().any(|(_ts, diff)| *diff > 0);
1280 if has_positive_updates {
1281 panic!(
1282 "reference to absent collection {id} but we have positive updates: {:?}",
1283 update
1284 );
1285 } else {
1286 continue;
1289 }
1290 };
1291
1292 let current_read_capabilities = collection.read_capabilities.frontier().to_owned();
1293 for (time, diff) in update.iter() {
1294 assert!(
1295 collection.read_capabilities.count_for(time) + diff >= 0,
1296 "update {:?} for collection {id} would lead to negative \
1297 read capabilities, read capabilities before applying: {:?}",
1298 update,
1299 collection.read_capabilities
1300 );
1301
1302 if collection.read_capabilities.count_for(time) + diff > 0 {
1303 assert!(
1304 current_read_capabilities.less_equal(time),
1305 "update {:?} for collection {id} is trying to \
1306 install read capabilities before the current \
1307 frontier of read capabilities, read capabilities before applying: {:?}",
1308 update,
1309 collection.read_capabilities
1310 );
1311 }
1312 }
1313
1314 let changes = collection.read_capabilities.update_iter(update.drain());
1315 update.extend(changes);
1316
1317 if id.is_user() {
1318 trace!(
1319 %id,
1320 ?collection.storage_dependencies,
1321 ?update,
1322 "forwarding update to storage dependencies");
1323 }
1324
1325 for id in collection.storage_dependencies.iter() {
1326 updates
1327 .entry(*id)
1328 .or_insert_with(ChangeBatch::new)
1329 .extend(update.iter().cloned());
1330 }
1331
1332 let (changes, frontier) = collections_net
1333 .entry(id)
1334 .or_insert_with(|| (<ChangeBatch<_>>::new(), Antichain::new()));
1335
1336 changes.extend(update.drain());
1337 *frontier = collection.read_capabilities.frontier().to_owned();
1338 }
1339
1340 let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
1343 for (key, (mut changes, frontier)) in collections_net {
1344 if !changes.is_empty() {
1345 let collection = collections.get(&key).expect("must still exist");
1347 let should_emit_persist_compaction = !matches!(
1348 collection.description.data_source,
1349 DataSource::Table { primary: Some(_) }
1350 );
1351
1352 if frontier.is_empty() {
1353 info!(id = %key, "removing collection state because the since advanced to []!");
1354 collections.remove(&key).expect("must still exist");
1355 }
1356
1357 if should_emit_persist_compaction {
1358 persist_compaction_commands.push((key, frontier));
1359 }
1360 }
1361 }
1362
1363 if !persist_compaction_commands.is_empty() {
1364 cmd_tx
1365 .send(BackgroundCmd::DowngradeSince(persist_compaction_commands))
1366 .expect("cannot fail to send");
1367 }
1368 }
1369
1370 fn synchronize_finalized_shards(&self, storage_metadata: &StorageMetadata) {
1372 self.finalized_shards
1373 .lock()
1374 .retain(|shard| storage_metadata.unfinalized_shards.contains(shard));
1375 }
1376}
1377
1378#[async_trait]
1380impl<T> StorageCollections for StorageCollectionsImpl<T>
1381where
1382 T: TimelyTimestamp
1383 + Lattice
1384 + Codec64
1385 + From<EpochMillis>
1386 + TimestampManipulation
1387 + Into<mz_repr::Timestamp>
1388 + Sync,
1389{
1390 type Timestamp = T;
1391
1392 async fn initialize_state(
1393 &self,
1394 txn: &mut (dyn StorageTxn<T> + Send),
1395 init_ids: BTreeSet<GlobalId>,
1396 ) -> Result<(), StorageError<T>> {
1397 let metadata = txn.get_collection_metadata();
1398 let existing_metadata: BTreeSet<_> = metadata.into_iter().map(|(id, _)| id).collect();
1399
1400 let new_collections: BTreeSet<GlobalId> =
1402 init_ids.difference(&existing_metadata).cloned().collect();
1403
1404 self.prepare_state(
1405 txn,
1406 new_collections,
1407 BTreeSet::default(),
1408 BTreeMap::default(),
1409 )
1410 .await?;
1411
1412 let unfinalized_shards = txn.get_unfinalized_shards().into_iter().collect_vec();
1420
1421 info!(?unfinalized_shards, "initializing finalizable_shards");
1422
1423 self.finalizable_shards.lock().extend(unfinalized_shards);
1424
1425 Ok(())
1426 }
1427
1428 fn update_parameters(&self, config_params: StorageParameters) {
1429 config_params.dyncfg_updates.apply(self.persist.cfg());
1432
1433 self.config
1434 .lock()
1435 .expect("lock poisoned")
1436 .update(config_params);
1437 }
1438
1439 fn collection_metadata(
1440 &self,
1441 id: GlobalId,
1442 ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>> {
1443 let collections = self.collections.lock().expect("lock poisoned");
1444
1445 collections
1446 .get(&id)
1447 .map(|c| c.collection_metadata.clone())
1448 .ok_or(StorageError::IdentifierMissing(id))
1449 }
1450
1451 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
1452 let collections = self.collections.lock().expect("lock poisoned");
1453
1454 collections
1455 .iter()
1456 .filter(|(_id, c)| !c.is_dropped())
1457 .map(|(id, c)| (*id, c.collection_metadata.clone()))
1458 .collect()
1459 }
1460
1461 fn collections_frontiers(
1462 &self,
1463 ids: Vec<GlobalId>,
1464 ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>> {
1465 if ids.is_empty() {
1466 return Ok(vec![]);
1467 }
1468
1469 let collections = self.collections.lock().expect("lock poisoned");
1470
1471 let res = ids
1472 .into_iter()
1473 .map(|id| {
1474 collections
1475 .get(&id)
1476 .map(|c| CollectionFrontiers {
1477 id: id.clone(),
1478 write_frontier: c.write_frontier.clone(),
1479 implied_capability: c.implied_capability.clone(),
1480 read_capabilities: c.read_capabilities.frontier().to_owned(),
1481 })
1482 .ok_or(StorageError::IdentifierMissing(id))
1483 })
1484 .collect::<Result<Vec<_>, _>>()?;
1485
1486 Ok(res)
1487 }
1488
1489 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
1490 let collections = self.collections.lock().expect("lock poisoned");
1491
1492 let res = collections
1493 .iter()
1494 .filter(|(_id, c)| !c.is_dropped())
1495 .map(|(id, c)| CollectionFrontiers {
1496 id: id.clone(),
1497 write_frontier: c.write_frontier.clone(),
1498 implied_capability: c.implied_capability.clone(),
1499 read_capabilities: c.read_capabilities.frontier().to_owned(),
1500 })
1501 .collect_vec();
1502
1503 res
1504 }
1505
1506 async fn snapshot_stats(
1507 &self,
1508 id: GlobalId,
1509 as_of: Antichain<Self::Timestamp>,
1510 ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
1511 let metadata = self.collection_metadata(id)?;
1512
1513 let as_of = match metadata.txns_shard.as_ref() {
1516 None => SnapshotStatsAsOf::Direct(as_of),
1517 Some(txns_id) => {
1518 assert_eq!(txns_id, self.txns_read.txns_id());
1519 let as_of = as_of
1520 .into_option()
1521 .expect("cannot read as_of the empty antichain");
1522 self.txns_read.update_gt(as_of.clone()).await;
1523 let data_snapshot = self
1524 .txns_read
1525 .data_snapshot(metadata.data_shard, as_of.clone())
1526 .await;
1527 SnapshotStatsAsOf::Txns(data_snapshot)
1528 }
1529 };
1530 self.snapshot_stats_inner(id, as_of).await
1531 }
1532
1533 async fn snapshot_parts_stats(
1534 &self,
1535 id: GlobalId,
1536 as_of: Antichain<Self::Timestamp>,
1537 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
1538 let metadata = {
1539 let self_collections = self.collections.lock().expect("lock poisoned");
1540
1541 let collection_metadata = self_collections
1542 .get(&id)
1543 .ok_or(StorageError::IdentifierMissing(id))
1544 .map(|c| c.collection_metadata.clone());
1545
1546 match collection_metadata {
1547 Ok(m) => m,
1548 Err(e) => return Box::pin(async move { Err(e) }),
1549 }
1550 };
1551
1552 let persist = Arc::clone(&self.persist);
1555 let read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await;
1556
1557 let data_snapshot = match (metadata, as_of.as_option()) {
1558 (
1559 CollectionMetadata {
1560 txns_shard: Some(txns_id),
1561 data_shard,
1562 ..
1563 },
1564 Some(as_of),
1565 ) => {
1566 assert_eq!(txns_id, *self.txns_read.txns_id());
1567 self.txns_read.update_gt(as_of.clone()).await;
1568 let data_snapshot = self
1569 .txns_read
1570 .data_snapshot(data_shard, as_of.clone())
1571 .await;
1572 Some(data_snapshot)
1573 }
1574 _ => None,
1575 };
1576
1577 Box::pin(async move {
1578 let read_handle = read_handle?;
1579 let result = match data_snapshot {
1580 Some(data_snapshot) => data_snapshot.snapshot_parts_stats(&read_handle).await,
1581 None => read_handle.snapshot_parts_stats(as_of).await,
1582 };
1583 read_handle.expire().await;
1584 result.map_err(|_| StorageError::ReadBeforeSince(id))
1585 })
1586 }
1587
1588 fn snapshot(
1594 &self,
1595 id: GlobalId,
1596 as_of: Self::Timestamp,
1597 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>> {
1598 self.snapshot(id, as_of, &self.txns_read)
1599 }
1600
1601 async fn snapshot_latest(
1602 &self,
1603 id: GlobalId,
1604 ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
1605 let upper = self.recent_upper(id).await?;
1606 let res = match upper.as_option() {
1607 Some(f) if f > &T::minimum() => {
1608 let as_of = f.step_back().unwrap();
1609
1610 let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap();
1611 snapshot
1612 .into_iter()
1613 .map(|(row, diff)| {
1614 assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
1615 row
1616 })
1617 .collect()
1618 }
1619 Some(_min) => {
1620 Vec::new()
1622 }
1623 _ => {
1626 return Err(StorageError::InvalidUsage(
1627 "collection closed, cannot determine a read timestamp based on the upper"
1628 .to_string(),
1629 ));
1630 }
1631 };
1632
1633 Ok(res)
1634 }
1635
1636 fn snapshot_cursor(
1637 &self,
1638 id: GlobalId,
1639 as_of: Self::Timestamp,
1640 ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
1641 where
1642 Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
1643 {
1644 let metadata = match self.collection_metadata(id) {
1645 Ok(metadata) => metadata.clone(),
1646 Err(e) => return async { Err(e) }.boxed(),
1647 };
1648 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1649 assert_eq!(txns_id, self.txns_read.txns_id());
1652 self.txns_read.clone()
1653 });
1654 let persist = Arc::clone(&self.persist);
1655
1656 async move {
1658 let mut handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1659 let cursor = match txns_read {
1660 None => {
1661 let cursor = handle
1662 .snapshot_cursor(Antichain::from_elem(as_of), |_| true)
1663 .await
1664 .map_err(|_| StorageError::ReadBeforeSince(id))?;
1665 SnapshotCursor {
1666 _read_handle: handle,
1667 cursor,
1668 }
1669 }
1670 Some(txns_read) => {
1671 txns_read.update_gt(as_of.clone()).await;
1672 let data_snapshot = txns_read
1673 .data_snapshot(metadata.data_shard, as_of.clone())
1674 .await;
1675 let cursor = data_snapshot
1676 .snapshot_cursor(&mut handle, |_| true)
1677 .await
1678 .map_err(|_| StorageError::ReadBeforeSince(id))?;
1679 SnapshotCursor {
1680 _read_handle: handle,
1681 cursor,
1682 }
1683 }
1684 };
1685
1686 Ok(cursor)
1687 }
1688 .boxed()
1689 }
1690
1691 fn snapshot_and_stream(
1692 &self,
1693 id: GlobalId,
1694 as_of: Self::Timestamp,
1695 ) -> BoxFuture<
1696 'static,
1697 Result<
1698 BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
1699 StorageError<Self::Timestamp>,
1700 >,
1701 >
1702 where
1703 Self::Timestamp: TimelyTimestamp + Lattice + Codec64 + 'static,
1704 {
1705 self.snapshot_and_stream(id, as_of, &self.txns_read)
1706 }
1707
1708 fn create_update_builder(
1709 &self,
1710 id: GlobalId,
1711 ) -> BoxFuture<
1712 'static,
1713 Result<
1714 TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
1715 StorageError<Self::Timestamp>,
1716 >,
1717 > {
1718 let metadata = match self.collection_metadata(id) {
1719 Ok(m) => m,
1720 Err(e) => return Box::pin(async move { Err(e) }),
1721 };
1722 let persist = Arc::clone(&self.persist);
1723
1724 async move {
1725 let persist_client = persist
1726 .open(metadata.persist_location.clone())
1727 .await
1728 .expect("invalid persist usage");
1729 let write_handle = persist_client
1730 .open_writer::<SourceData, (), Self::Timestamp, StorageDiff>(
1731 metadata.data_shard,
1732 Arc::new(metadata.relation_desc.clone()),
1733 Arc::new(UnitSchema),
1734 Diagnostics {
1735 shard_name: id.to_string(),
1736 handle_purpose: format!("create write batch {}", id),
1737 },
1738 )
1739 .await
1740 .expect("invalid persist usage");
1741 let builder = TimestamplessUpdateBuilder::new(&write_handle);
1742
1743 Ok(builder)
1744 }
1745 .boxed()
1746 }
1747
1748 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
1749 let collections = self.collections.lock().expect("lock poisoned");
1750
1751 if collections.contains_key(&id) {
1752 Ok(())
1753 } else {
1754 Err(StorageError::IdentifierMissing(id))
1755 }
1756 }
1757
1758 async fn prepare_state(
1759 &self,
1760 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
1761 ids_to_add: BTreeSet<GlobalId>,
1762 ids_to_drop: BTreeSet<GlobalId>,
1763 ids_to_register: BTreeMap<GlobalId, ShardId>,
1764 ) -> Result<(), StorageError<T>> {
1765 txn.insert_collection_metadata(
1766 ids_to_add
1767 .into_iter()
1768 .map(|id| (id, ShardId::new()))
1769 .collect(),
1770 )?;
1771 txn.insert_collection_metadata(ids_to_register)?;
1772
1773 let dropped_mappings = txn.delete_collection_metadata(ids_to_drop);
1775
1776 let dropped_shards = dropped_mappings
1777 .into_iter()
1778 .map(|(_id, shard)| shard)
1779 .collect();
1780
1781 txn.insert_unfinalized_shards(dropped_shards)?;
1782
1783 let finalized_shards = self.finalized_shards.lock().iter().copied().collect();
1786 txn.mark_shards_as_finalized(finalized_shards);
1787
1788 Ok(())
1789 }
1790
1791 #[instrument(level = "debug")]
1794 async fn create_collections_for_bootstrap(
1795 &self,
1796 storage_metadata: &StorageMetadata,
1797 register_ts: Option<Self::Timestamp>,
1798 mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
1799 migrated_storage_collections: &BTreeSet<GlobalId>,
1800 ) -> Result<(), StorageError<Self::Timestamp>> {
1801 let is_in_txns = |id, metadata: &CollectionMetadata| {
1802 metadata.txns_shard.is_some()
1803 && !(self.read_only && migrated_storage_collections.contains(&id))
1804 };
1805
1806 collections.sort_by_key(|(id, _)| *id);
1811 collections.dedup();
1812 for pos in 1..collections.len() {
1813 if collections[pos - 1].0 == collections[pos].0 {
1814 return Err(StorageError::CollectionIdReused(collections[pos].0));
1815 }
1816 }
1817
1818 {
1819 let self_collections = self.collections.lock().expect("lock poisoned");
1825 for (id, description) in collections.iter() {
1826 if let Some(existing_collection) = self_collections.get(id) {
1827 if &existing_collection.description != description {
1828 return Err(StorageError::CollectionIdReused(*id));
1829 }
1830 }
1831 }
1832 }
1833
1834 let enriched_with_metadata = collections
1837 .into_iter()
1838 .map(|(id, description)| {
1839 let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
1840
1841 let txns_shard = description
1845 .data_source
1846 .in_txns()
1847 .then(|| *self.txns_read.txns_id());
1848
1849 let metadata = CollectionMetadata {
1850 persist_location: self.persist_location.clone(),
1851 data_shard,
1852 relation_desc: description.desc.clone(),
1853 txns_shard,
1854 };
1855
1856 Ok((id, description, metadata))
1857 })
1858 .collect_vec();
1859
1860 let persist_client = self
1862 .persist
1863 .open(self.persist_location.clone())
1864 .await
1865 .unwrap();
1866 let persist_client = &persist_client;
1867 use futures::stream::{StreamExt, TryStreamExt};
1870 let this = &*self;
1871 let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
1872 .map(|data: Result<_, StorageError<Self::Timestamp>>| {
1873 let register_ts = register_ts.clone();
1874 async move {
1875 let (id, description, metadata) = data?;
1876
1877 debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
1882
1883 let (write, mut since_handle) = this
1884 .open_data_handles(
1885 &id,
1886 metadata.data_shard,
1887 description.since.as_ref(),
1888 metadata.relation_desc.clone(),
1889 persist_client,
1890 )
1891 .await;
1892
1893 match description.data_source {
1902 DataSource::Introspection(_)
1903 | DataSource::IngestionExport { .. }
1904 | DataSource::Webhook
1905 | DataSource::Ingestion(_)
1906 | DataSource::Progress
1907 | DataSource::Other => {}
1908 DataSource::Sink { .. } => {}
1909 DataSource::Table { .. } => {
1910 let register_ts = register_ts.expect(
1911 "caller should have provided a register_ts when creating a table",
1912 );
1913 if since_handle.since().elements() == &[T::minimum()]
1914 && !migrated_storage_collections.contains(&id)
1915 {
1916 debug!("advancing {} to initial since of {:?}", id, register_ts);
1917 let token = since_handle.opaque();
1918 let _ = since_handle
1919 .compare_and_downgrade_since(
1920 &token,
1921 (&token, &Antichain::from_elem(register_ts.clone())),
1922 )
1923 .await;
1924 }
1925 }
1926 }
1927
1928 Ok::<_, StorageError<Self::Timestamp>>((
1929 id,
1930 description,
1931 write,
1932 since_handle,
1933 metadata,
1934 ))
1935 }
1936 })
1937 .buffer_unordered(50)
1939 .try_collect()
1953 .await?;
1954
1955 #[derive(Ord, PartialOrd, Eq, PartialEq)]
1957 enum DependencyOrder {
1958 Table(Reverse<GlobalId>),
1960 Collection(GlobalId),
1962 Sink(GlobalId),
1964 }
1965 to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
1966 DataSource::Table { .. } => DependencyOrder::Table(Reverse(*id)),
1967 DataSource::Sink { .. } => DependencyOrder::Sink(*id),
1968 _ => DependencyOrder::Collection(*id),
1969 });
1970
1971 let mut self_collections = self.collections.lock().expect("lock poisoned");
1974
1975 for (id, description, write_handle, since_handle, metadata) in to_register {
1976 let write_frontier = write_handle.upper();
1977 let data_shard_since = since_handle.since().clone();
1978
1979 let storage_dependencies = self.determine_collection_dependencies(
1981 &*self_collections,
1982 id,
1983 &description.data_source,
1984 )?;
1985
1986 let initial_since = match storage_dependencies
1988 .iter()
1989 .at_most_one()
1990 .expect("should have at most one dependency")
1991 {
1992 Some(dep) => {
1993 let dependency_collection = self_collections
1994 .get(dep)
1995 .ok_or(StorageError::IdentifierMissing(*dep))?;
1996 let dependency_since = dependency_collection.implied_capability.clone();
1997
1998 if PartialOrder::less_than(&data_shard_since, &dependency_since) {
2009 mz_ore::soft_assert_or_log!(
2028 write_frontier.elements() == &[T::minimum()]
2029 || write_frontier.is_empty()
2030 || PartialOrder::less_than(&dependency_since, write_frontier),
2031 "dependency ({dep}) since has advanced past dependent ({id}) upper \n
2032 dependent ({id}): since {:?}, upper {:?} \n
2033 dependency ({dep}): since {:?}",
2034 data_shard_since,
2035 write_frontier,
2036 dependency_since
2037 );
2038
2039 dependency_since
2040 } else {
2041 data_shard_since
2042 }
2043 }
2044 None => data_shard_since,
2045 };
2046
2047 let mut collection_state = CollectionState::new(
2048 description,
2049 initial_since,
2050 write_frontier.clone(),
2051 storage_dependencies,
2052 metadata.clone(),
2053 );
2054
2055 match &collection_state.description.data_source {
2057 DataSource::Introspection(_) => {
2058 self_collections.insert(id, collection_state);
2059 }
2060 DataSource::Webhook => {
2061 self_collections.insert(id, collection_state);
2062 }
2063 DataSource::IngestionExport {
2064 ingestion_id,
2065 details,
2066 data_config,
2067 } => {
2068 let source_collection = self_collections
2070 .get_mut(ingestion_id)
2071 .expect("known to exist");
2072 match &mut source_collection.description {
2073 CollectionDescription {
2074 data_source: DataSource::Ingestion(ingestion_desc),
2075 ..
2076 } => ingestion_desc.source_exports.insert(
2077 id,
2078 SourceExport {
2079 storage_metadata: (),
2080 details: details.clone(),
2081 data_config: data_config.clone(),
2082 },
2083 ),
2084 _ => unreachable!(
2085 "SourceExport must only refer to primary sources that already exist"
2086 ),
2087 };
2088
2089 self_collections.insert(id, collection_state);
2090 }
2091 DataSource::Table { .. } => {
2092 if is_in_txns(id, &metadata)
2095 && PartialOrder::less_than(
2096 &collection_state.write_frontier,
2097 &self.initial_txn_upper,
2098 )
2099 {
2100 collection_state
2106 .write_frontier
2107 .clone_from(&self.initial_txn_upper);
2108 }
2109 self_collections.insert(id, collection_state);
2110 }
2111 DataSource::Progress | DataSource::Other => {
2112 self_collections.insert(id, collection_state);
2113 }
2114 DataSource::Ingestion(_) => {
2115 self_collections.insert(id, collection_state);
2116 }
2117 DataSource::Sink { .. } => {
2118 self_collections.insert(id, collection_state);
2119 }
2120 }
2121
2122 self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle);
2123
2124 self.install_collection_dependency_read_holds_inner(&mut *self_collections, id)?;
2126 }
2127
2128 drop(self_collections);
2129
2130 self.synchronize_finalized_shards(storage_metadata);
2131
2132 Ok(())
2133 }
2134
2135 async fn alter_ingestion_source_desc(
2136 &self,
2137 ingestion_id: GlobalId,
2138 source_desc: SourceDesc,
2139 ) -> Result<(), StorageError<Self::Timestamp>> {
2140 let mut self_collections = self.collections.lock().expect("lock poisoned");
2144 let collection = self_collections
2145 .get_mut(&ingestion_id)
2146 .ok_or(StorageError::IdentifierMissing(ingestion_id))?;
2147
2148 let curr_ingestion = match &mut collection.description.data_source {
2149 DataSource::Ingestion(active_ingestion) => active_ingestion,
2150 _ => unreachable!("verified collection refers to ingestion"),
2151 };
2152
2153 curr_ingestion.desc = source_desc;
2154 debug!("altered {ingestion_id}'s SourceDesc");
2155
2156 Ok(())
2157 }
2158
2159 async fn alter_ingestion_export_data_configs(
2160 &self,
2161 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
2162 ) -> Result<(), StorageError<Self::Timestamp>> {
2163 let mut self_collections = self.collections.lock().expect("lock poisoned");
2164
2165 for (source_export_id, new_data_config) in source_exports {
2166 let source_export_collection = self_collections
2169 .get_mut(&source_export_id)
2170 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2171 let ingestion_id = match &mut source_export_collection.description.data_source {
2172 DataSource::IngestionExport {
2173 ingestion_id,
2174 details: _,
2175 data_config,
2176 } => {
2177 *data_config = new_data_config.clone();
2178 *ingestion_id
2179 }
2180 o => {
2181 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2182 Err(StorageError::IdentifierInvalid(source_export_id))?
2183 }
2184 };
2185 let ingestion_collection = self_collections
2188 .get_mut(&ingestion_id)
2189 .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
2190
2191 match &mut ingestion_collection.description.data_source {
2192 DataSource::Ingestion(ingestion_desc) => {
2193 let source_export = ingestion_desc
2194 .source_exports
2195 .get_mut(&source_export_id)
2196 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2197
2198 if source_export.data_config != new_data_config {
2199 tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
2200 source_export.data_config = new_data_config;
2201 } else {
2202 tracing::warn!(
2203 "alter_ingestion_export_data_configs called on \
2204 export {source_export_id} of {ingestion_id} but \
2205 the data config was the same"
2206 );
2207 }
2208 }
2209 o => {
2210 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2211 Err(StorageError::IdentifierInvalid(ingestion_id))?;
2212 }
2213 }
2214 }
2215
2216 Ok(())
2217 }
2218
2219 async fn alter_ingestion_connections(
2220 &self,
2221 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
2222 ) -> Result<(), StorageError<Self::Timestamp>> {
2223 let mut self_collections = self.collections.lock().expect("lock poisoned");
2224
2225 for (id, conn) in source_connections {
2226 let collection = self_collections
2227 .get_mut(&id)
2228 .ok_or_else(|| StorageError::IdentifierMissing(id))?;
2229
2230 match &mut collection.description.data_source {
2231 DataSource::Ingestion(ingestion) => {
2232 if ingestion.desc.connection != conn {
2235 info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
2236 ingestion.desc.connection = conn;
2237 } else {
2238 warn!(
2239 "update_source_connection called on {id} but the \
2240 connection was the same"
2241 );
2242 }
2243 }
2244 o => {
2245 warn!("update_source_connection called on {:?}", o);
2246 Err(StorageError::IdentifierInvalid(id))?;
2247 }
2248 }
2249 }
2250
2251 Ok(())
2252 }
2253
2254 async fn alter_table_desc(
2255 &self,
2256 existing_collection: GlobalId,
2257 new_collection: GlobalId,
2258 new_desc: RelationDesc,
2259 expected_version: RelationVersion,
2260 ) -> Result<(), StorageError<Self::Timestamp>> {
2261 let data_shard = {
2262 let self_collections = self.collections.lock().expect("lock poisoned");
2263 let existing = self_collections
2264 .get(&existing_collection)
2265 .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;
2266
2267 if !matches!(&existing.description.data_source, DataSource::Table { .. }) {
2269 return Err(StorageError::IdentifierInvalid(existing_collection));
2270 }
2271
2272 existing.collection_metadata.data_shard
2273 };
2274
2275 let persist_client = self
2276 .persist
2277 .open(self.persist_location.clone())
2278 .await
2279 .unwrap();
2280
2281 let diagnostics = Diagnostics {
2283 shard_name: existing_collection.to_string(),
2284 handle_purpose: "alter_table_desc".to_string(),
2285 };
2286 let expected_schema = expected_version.into();
2288 let schema_result = persist_client
2289 .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
2290 data_shard,
2291 expected_schema,
2292 &new_desc,
2293 &UnitSchema,
2294 diagnostics,
2295 )
2296 .await
2297 .map_err(|e| StorageError::InvalidUsage(e.to_string()))?;
2298 tracing::info!(
2299 ?existing_collection,
2300 ?new_collection,
2301 ?new_desc,
2302 "evolved schema"
2303 );
2304
2305 match schema_result {
2306 CaESchema::Ok(id) => id,
2307 CaESchema::ExpectedMismatch {
2309 schema_id,
2310 key,
2311 val,
2312 } => {
2313 mz_ore::soft_panic_or_log!(
2314 "schema expectation mismatch {schema_id:?}, {key:?}, {val:?}"
2315 );
2316 return Err(StorageError::Generic(anyhow::anyhow!(
2317 "schema expected mismatch, {existing_collection:?}",
2318 )));
2319 }
2320 CaESchema::Incompatible => {
2321 mz_ore::soft_panic_or_log!(
2322 "incompatible schema! {existing_collection} {new_desc:?}"
2323 );
2324 return Err(StorageError::Generic(anyhow::anyhow!(
2325 "schema incompatible, {existing_collection:?}"
2326 )));
2327 }
2328 };
2329
2330 let (write_handle, since_handle) = self
2332 .open_data_handles(
2333 &new_collection,
2334 data_shard,
2335 None,
2336 new_desc.clone(),
2337 &persist_client,
2338 )
2339 .await;
2340
2341 {
2347 let mut self_collections = self.collections.lock().expect("lock poisoned");
2348
2349 let existing = self_collections
2351 .get_mut(&existing_collection)
2352 .expect("existing collection missing");
2353
2354 assert!(matches!(
2356 existing.description.data_source,
2357 DataSource::Table { primary: None }
2358 ));
2359
2360 existing.description.data_source = DataSource::Table {
2362 primary: Some(new_collection),
2363 };
2364 existing.storage_dependencies.push(new_collection);
2365
2366 let implied_capability = existing.read_capabilities.frontier().to_owned();
2370 let write_frontier = existing.write_frontier.clone();
2371
2372 let mut changes = ChangeBatch::new();
2379 changes.extend(implied_capability.iter().map(|t| (t.clone(), 1)));
2380
2381 let collection_desc = CollectionDescription::for_table(new_desc.clone(), None);
2383 let collection_meta = CollectionMetadata {
2384 persist_location: self.persist_location.clone(),
2385 relation_desc: collection_desc.desc.clone(),
2386 data_shard,
2387 txns_shard: Some(self.txns_read.txns_id().clone()),
2388 };
2389 let collection_state = CollectionState::new(
2390 collection_desc,
2391 implied_capability,
2392 write_frontier,
2393 Vec::new(),
2394 collection_meta,
2395 );
2396
2397 self_collections.insert(new_collection, collection_state);
2399
2400 let mut updates = BTreeMap::from([(new_collection, changes)]);
2401 StorageCollectionsImpl::update_read_capabilities_inner(
2402 &self.cmd_tx,
2403 &mut *self_collections,
2404 &mut updates,
2405 );
2406 };
2407
2408 self.register_handles(new_collection, true, since_handle, write_handle);
2410
2411 info!(%existing_collection, %new_collection, ?new_desc, "altered table");
2412
2413 Ok(())
2414 }
2415
2416 fn drop_collections_unvalidated(
2417 &self,
2418 storage_metadata: &StorageMetadata,
2419 identifiers: Vec<GlobalId>,
2420 ) {
2421 debug!(?identifiers, "drop_collections_unvalidated");
2422
2423 let mut self_collections = self.collections.lock().expect("lock poisoned");
2424
2425 for id in identifiers.iter() {
2426 let metadata = storage_metadata.get_collection_shard::<T>(*id);
2427 mz_ore::soft_assert_or_log!(
2428 matches!(metadata, Err(StorageError::IdentifierMissing(_))),
2429 "dropping {id}, but drop was not synchronized with storage \
2430 controller via `synchronize_collections`"
2431 );
2432
2433 let dropped_data_source = match self_collections.get(id) {
2434 Some(col) => col.description.data_source.clone(),
2435 None => continue,
2436 };
2437
2438 if let DataSource::IngestionExport { ingestion_id, .. } = dropped_data_source {
2441 let ingestion = match self_collections.get_mut(&ingestion_id) {
2443 Some(ingestion) => ingestion,
2444 None => {
2446 tracing::error!(
2447 "primary source {ingestion_id} seemingly dropped before subsource {id}",
2448 );
2449 continue;
2450 }
2451 };
2452
2453 match &mut ingestion.description {
2454 CollectionDescription {
2455 data_source: DataSource::Ingestion(ingestion_desc),
2456 ..
2457 } => {
2458 let removed = ingestion_desc.source_exports.remove(id);
2459 mz_ore::soft_assert_or_log!(
2460 removed.is_some(),
2461 "dropped subsource {id} already removed from source exports"
2462 );
2463 }
2464 _ => unreachable!(
2465 "SourceExport must only refer to primary sources that already exist"
2466 ),
2467 };
2468 }
2469 }
2470
2471 let mut finalized_policies = Vec::new();
2479
2480 for id in identifiers {
2481 if self_collections.contains_key(&id) {
2483 finalized_policies.push((id, ReadPolicy::ValidFrom(Antichain::new())));
2484 }
2485 }
2486 self.set_read_policies_inner(&mut self_collections, finalized_policies);
2487
2488 drop(self_collections);
2489
2490 self.synchronize_finalized_shards(storage_metadata);
2491 }
2492
2493 fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
2494 let mut collections = self.collections.lock().expect("lock poisoned");
2495
2496 if tracing::enabled!(tracing::Level::TRACE) {
2497 let user_capabilities = collections
2498 .iter_mut()
2499 .filter(|(id, _c)| id.is_user())
2500 .map(|(id, c)| {
2501 let updates = c.read_capabilities.updates().cloned().collect_vec();
2502 (*id, c.implied_capability.clone(), updates)
2503 })
2504 .collect_vec();
2505
2506 trace!(?policies, ?user_capabilities, "set_read_policies");
2507 }
2508
2509 self.set_read_policies_inner(&mut collections, policies);
2510
2511 if tracing::enabled!(tracing::Level::TRACE) {
2512 let user_capabilities = collections
2513 .iter_mut()
2514 .filter(|(id, _c)| id.is_user())
2515 .map(|(id, c)| {
2516 let updates = c.read_capabilities.updates().cloned().collect_vec();
2517 (*id, c.implied_capability.clone(), updates)
2518 })
2519 .collect_vec();
2520
2521 trace!(?user_capabilities, "after! set_read_policies");
2522 }
2523 }
2524
2525 fn acquire_read_holds(
2526 &self,
2527 desired_holds: Vec<GlobalId>,
2528 ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError> {
2529 if desired_holds.is_empty() {
2530 return Ok(vec![]);
2531 }
2532
2533 let mut collections = self.collections.lock().expect("lock poisoned");
2534
2535 let mut advanced_holds = Vec::new();
2536 for id in desired_holds.iter() {
2547 let collection = collections
2548 .get(id)
2549 .ok_or(ReadHoldError::CollectionMissing(*id))?;
2550 let since = collection.read_capabilities.frontier().to_owned();
2551 advanced_holds.push((*id, since));
2552 }
2553
2554 let mut updates = advanced_holds
2555 .iter()
2556 .map(|(id, hold)| {
2557 let mut changes = ChangeBatch::new();
2558 changes.extend(hold.iter().map(|time| (time.clone(), 1)));
2559 (*id, changes)
2560 })
2561 .collect::<BTreeMap<_, _>>();
2562
2563 StorageCollectionsImpl::update_read_capabilities_inner(
2564 &self.cmd_tx,
2565 &mut collections,
2566 &mut updates,
2567 );
2568
2569 let acquired_holds = advanced_holds
2570 .into_iter()
2571 .map(|(id, since)| ReadHold::with_channel(id, since, self.holds_tx.clone()))
2572 .collect_vec();
2573
2574 trace!(?desired_holds, ?acquired_holds, "acquire_read_holds");
2575
2576 Ok(acquired_holds)
2577 }
2578
2579 fn determine_time_dependence(
2581 &self,
2582 id: GlobalId,
2583 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
2584 use TimeDependenceError::CollectionMissing;
2585 let collections = self.collections.lock().expect("lock poisoned");
2586 let mut collection = Some(collections.get(&id).ok_or(CollectionMissing(id))?);
2587
2588 let mut result = None;
2589
2590 while let Some(c) = collection.take() {
2591 use DataSource::*;
2592 if let Some(timeline) = &c.description.timeline {
2593 if *timeline != Timeline::EpochMilliseconds {
2595 break;
2596 }
2597 }
2598 match &c.description.data_source {
2599 Ingestion(ingestion) => {
2600 use GenericSourceConnection::*;
2601 match ingestion.desc.connection {
2602 Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => {
2605 result = Some(TimeDependence::default())
2606 }
2607 LoadGenerator(_) => {}
2609 }
2610 }
2611 IngestionExport { ingestion_id, .. } => {
2612 let c = collections
2613 .get(ingestion_id)
2614 .ok_or(CollectionMissing(*ingestion_id))?;
2615 collection = Some(c);
2616 }
2617 Introspection(_) | Progress | Table { .. } | Webhook { .. } => {
2619 result = Some(TimeDependence::default())
2620 }
2621 Other => {}
2623 Sink { .. } => {}
2624 };
2625 }
2626 Ok(result)
2627 }
2628}
2629
2630#[derive(Debug)]
2637enum SinceHandleWrapper<T>
2638where
2639 T: TimelyTimestamp + Lattice + Codec64,
2640{
2641 Critical(SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch>),
2642 Leased(ReadHandle<SourceData, (), T, StorageDiff>),
2643}
2644
2645impl<T> SinceHandleWrapper<T>
2646where
2647 T: TimelyTimestamp + Lattice + Codec64 + TotalOrder + Sync,
2648{
2649 pub fn since(&self) -> &Antichain<T> {
2650 match self {
2651 Self::Critical(handle) => handle.since(),
2652 Self::Leased(handle) => handle.since(),
2653 }
2654 }
2655
2656 pub fn opaque(&self) -> PersistEpoch {
2657 match self {
2658 Self::Critical(handle) => handle.opaque().clone(),
2659 Self::Leased(_handle) => {
2660 PersistEpoch(None)
2665 }
2666 }
2667 }
2668
2669 pub async fn compare_and_downgrade_since(
2670 &mut self,
2671 expected: &PersistEpoch,
2672 new: (&PersistEpoch, &Antichain<T>),
2673 ) -> Result<Antichain<T>, PersistEpoch> {
2674 match self {
2675 Self::Critical(handle) => handle.compare_and_downgrade_since(expected, new).await,
2676 Self::Leased(handle) => {
2677 let (opaque, since) = new;
2678 assert_none!(opaque.0);
2679
2680 handle.downgrade_since(since).await;
2681
2682 Ok(since.clone())
2683 }
2684 }
2685 }
2686
2687 pub async fn maybe_compare_and_downgrade_since(
2688 &mut self,
2689 expected: &PersistEpoch,
2690 new: (&PersistEpoch, &Antichain<T>),
2691 ) -> Option<Result<Antichain<T>, PersistEpoch>> {
2692 match self {
2693 Self::Critical(handle) => {
2694 handle
2695 .maybe_compare_and_downgrade_since(expected, new)
2696 .await
2697 }
2698 Self::Leased(handle) => {
2699 let (opaque, since) = new;
2700 assert_none!(opaque.0);
2701
2702 handle.maybe_downgrade_since(since).await;
2703
2704 Some(Ok(since.clone()))
2705 }
2706 }
2707 }
2708
2709 pub fn snapshot_stats(
2710 &self,
2711 id: GlobalId,
2712 as_of: Option<Antichain<T>>,
2713 ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2714 match self {
2715 Self::Critical(handle) => {
2716 let res = handle
2717 .snapshot_stats(as_of)
2718 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2719 Box::pin(res)
2720 }
2721 Self::Leased(handle) => {
2722 let res = handle
2723 .snapshot_stats(as_of)
2724 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2725 Box::pin(res)
2726 }
2727 }
2728 }
2729
2730 pub fn snapshot_stats_from_txn(
2731 &self,
2732 id: GlobalId,
2733 data_snapshot: DataSnapshot<T>,
2734 ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2735 match self {
2736 Self::Critical(handle) => Box::pin(
2737 data_snapshot
2738 .snapshot_stats_from_critical(handle)
2739 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2740 ),
2741 Self::Leased(handle) => Box::pin(
2742 data_snapshot
2743 .snapshot_stats_from_leased(handle)
2744 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2745 ),
2746 }
2747 }
2748}
2749
2750#[derive(Debug, Clone)]
2752struct CollectionState<T> {
2753 pub description: CollectionDescription<T>,
2755
2756 pub read_capabilities: MutableAntichain<T>,
2762
2763 pub implied_capability: Antichain<T>,
2767
2768 pub read_policy: ReadPolicy<T>,
2770
2771 pub storage_dependencies: Vec<GlobalId>,
2773
2774 pub write_frontier: Antichain<T>,
2776
2777 pub collection_metadata: CollectionMetadata,
2778}
2779
2780impl<T: TimelyTimestamp> CollectionState<T> {
2781 pub fn new(
2784 description: CollectionDescription<T>,
2785 since: Antichain<T>,
2786 write_frontier: Antichain<T>,
2787 storage_dependencies: Vec<GlobalId>,
2788 metadata: CollectionMetadata,
2789 ) -> Self {
2790 let mut read_capabilities = MutableAntichain::new();
2791 read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2792 Self {
2793 description,
2794 read_capabilities,
2795 implied_capability: since.clone(),
2796 read_policy: ReadPolicy::NoPolicy {
2797 initial_since: since,
2798 },
2799 storage_dependencies,
2800 write_frontier,
2801 collection_metadata: metadata,
2802 }
2803 }
2804
2805 pub fn is_dropped(&self) -> bool {
2807 self.read_capabilities.is_empty()
2808 }
2809}
2810
2811#[derive(Debug)]
2817struct BackgroundTask<T: TimelyTimestamp + Lattice + Codec64> {
2818 config: Arc<Mutex<StorageConfiguration>>,
2819 cmds_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
2820 cmds_rx: mpsc::UnboundedReceiver<BackgroundCmd<T>>,
2821 holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch<T>)>,
2822 finalizable_shards: Arc<ShardIdSet>,
2823 collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
2824 shard_by_id: BTreeMap<GlobalId, ShardId>,
2827 since_handles: BTreeMap<GlobalId, SinceHandleWrapper<T>>,
2828 txns_handle: Option<WriteHandle<SourceData, (), T, StorageDiff>>,
2829 txns_shards: BTreeSet<GlobalId>,
2830}
2831
2832#[derive(Debug)]
2833enum BackgroundCmd<T: TimelyTimestamp + Lattice + Codec64> {
2834 Register {
2835 id: GlobalId,
2836 is_in_txns: bool,
2837 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
2838 since_handle: SinceHandleWrapper<T>,
2839 },
2840 DowngradeSince(Vec<(GlobalId, Antichain<T>)>),
2841 SnapshotStats(
2842 GlobalId,
2843 SnapshotStatsAsOf<T>,
2844 oneshot::Sender<SnapshotStatsRes<T>>,
2845 ),
2846}
2847
2848pub(crate) struct SnapshotStatsRes<T>(BoxFuture<'static, Result<SnapshotStats, StorageError<T>>>);
2850
2851impl<T> Debug for SnapshotStatsRes<T> {
2852 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2853 f.debug_struct("SnapshotStatsRes").finish_non_exhaustive()
2854 }
2855}
2856
2857impl<T> BackgroundTask<T>
2858where
2859 T: TimelyTimestamp
2860 + Lattice
2861 + Codec64
2862 + From<EpochMillis>
2863 + TimestampManipulation
2864 + Into<mz_repr::Timestamp>
2865 + Sync,
2866{
2867 async fn run(&mut self) {
2868 let mut upper_futures: FuturesUnordered<
2870 std::pin::Pin<
2871 Box<
2872 dyn Future<
2873 Output = (
2874 GlobalId,
2875 WriteHandle<SourceData, (), T, StorageDiff>,
2876 Antichain<T>,
2877 ),
2878 > + Send,
2879 >,
2880 >,
2881 > = FuturesUnordered::new();
2882
2883 let gen_upper_future =
2884 |id, mut handle: WriteHandle<_, _, _, _>, prev_upper: Antichain<T>| {
2885 let fut = async move {
2886 soft_assert_or_log!(
2887 !prev_upper.is_empty(),
2888 "cannot await progress when upper is already empty"
2889 );
2890 handle.wait_for_upper_past(&prev_upper).await;
2891 let new_upper = handle.shared_upper();
2892 (id, handle, new_upper)
2893 };
2894
2895 fut
2896 };
2897
2898 let mut txns_upper_future = match self.txns_handle.take() {
2899 Some(txns_handle) => {
2900 let upper = txns_handle.upper().clone();
2901 let txns_upper_future =
2902 gen_upper_future(GlobalId::Transient(1), txns_handle, upper);
2903 txns_upper_future.boxed()
2904 }
2905 None => async { std::future::pending().await }.boxed(),
2906 };
2907
2908 loop {
2909 tokio::select! {
2910 (id, handle, upper) = &mut txns_upper_future => {
2911 trace!("new upper from txns shard: {:?}", upper);
2912 let mut uppers = Vec::new();
2913 for id in self.txns_shards.iter() {
2914 uppers.push((*id, &upper));
2915 }
2916 self.update_write_frontiers(&uppers).await;
2917
2918 let fut = gen_upper_future(id, handle, upper);
2919 txns_upper_future = fut.boxed();
2920 }
2921 Some((id, handle, upper)) = upper_futures.next() => {
2922 if id.is_user() {
2923 trace!("new upper for collection {id}: {:?}", upper);
2924 }
2925 let current_shard = self.shard_by_id.get(&id);
2926 if let Some(shard_id) = current_shard {
2927 if shard_id == &handle.shard_id() {
2928 let uppers = &[(id, &upper)];
2931 self.update_write_frontiers(uppers).await;
2932 if !upper.is_empty() {
2933 let fut = gen_upper_future(id, handle, upper);
2934 upper_futures.push(fut.boxed());
2935 }
2936 } else {
2937 handle.expire().await;
2941 }
2942 }
2943 }
2944 cmd = self.cmds_rx.recv() => {
2945 let cmd = if let Some(cmd) = cmd {
2946 cmd
2947 } else {
2948 break;
2950 };
2951
2952 match cmd {
2953 BackgroundCmd::Register{ id, is_in_txns, write_handle, since_handle } => {
2954 debug!("registering handles for {}", id);
2955 let previous = self.shard_by_id.insert(id, write_handle.shard_id());
2956 if previous.is_some() {
2957 panic!("already registered a WriteHandle for collection {id}");
2958 }
2959
2960 let previous = self.since_handles.insert(id, since_handle);
2961 if previous.is_some() {
2962 panic!("already registered a SinceHandle for collection {id}");
2963 }
2964
2965 if is_in_txns {
2966 self.txns_shards.insert(id);
2967 } else {
2968 let upper = write_handle.upper().clone();
2969 if !upper.is_empty() {
2970 let fut = gen_upper_future(id, write_handle, upper);
2971 upper_futures.push(fut.boxed());
2972 }
2973 }
2974
2975 }
2976 BackgroundCmd::DowngradeSince(cmds) => {
2977 self.downgrade_sinces(cmds).await;
2978 }
2979 BackgroundCmd::SnapshotStats(id, as_of, tx) => {
2980 let res = match self.since_handles.get(&id) {
2986 Some(x) => {
2987 let fut: BoxFuture<
2988 'static,
2989 Result<SnapshotStats, StorageError<T>>,
2990 > = match as_of {
2991 SnapshotStatsAsOf::Direct(as_of) => {
2992 x.snapshot_stats(id, Some(as_of))
2993 }
2994 SnapshotStatsAsOf::Txns(data_snapshot) => {
2995 x.snapshot_stats_from_txn(id, data_snapshot)
2996 }
2997 };
2998 SnapshotStatsRes(fut)
2999 }
3000 None => SnapshotStatsRes(Box::pin(futures::future::ready(Err(
3001 StorageError::IdentifierMissing(id),
3002 )))),
3003 };
3004 let _ = tx.send(res);
3006 }
3007 }
3008 }
3009 Some(holds_changes) = self.holds_rx.recv() => {
3010 let mut batched_changes = BTreeMap::new();
3011 batched_changes.insert(holds_changes.0, holds_changes.1);
3012
3013 while let Ok(mut holds_changes) = self.holds_rx.try_recv() {
3014 let entry = batched_changes.entry(holds_changes.0);
3015 entry
3016 .and_modify(|existing| existing.extend(holds_changes.1.drain()))
3017 .or_insert_with(|| holds_changes.1);
3018 }
3019
3020 let mut collections = self.collections.lock().expect("lock poisoned");
3021
3022 let user_changes = batched_changes
3023 .iter()
3024 .filter(|(id, _c)| id.is_user())
3025 .map(|(id, c)| {
3026 (id.clone(), c.clone())
3027 })
3028 .collect_vec();
3029
3030 if !user_changes.is_empty() {
3031 trace!(?user_changes, "applying holds changes from channel");
3032 }
3033
3034 StorageCollectionsImpl::update_read_capabilities_inner(
3035 &self.cmds_tx,
3036 &mut collections,
3037 &mut batched_changes,
3038 );
3039 }
3040 }
3041 }
3042
3043 warn!("BackgroundTask shutting down");
3044 }
3045
3046 #[instrument(level = "debug")]
3047 async fn update_write_frontiers(&mut self, updates: &[(GlobalId, &Antichain<T>)]) {
3048 let mut read_capability_changes = BTreeMap::default();
3049
3050 let mut self_collections = self.collections.lock().expect("lock poisoned");
3051
3052 for (id, new_upper) in updates.iter() {
3053 let collection = if let Some(c) = self_collections.get_mut(id) {
3054 c
3055 } else {
3056 trace!(
3057 "Reference to absent collection {id}, due to concurrent removal of that collection"
3058 );
3059 continue;
3060 };
3061
3062 if PartialOrder::less_than(&collection.write_frontier, *new_upper) {
3063 collection.write_frontier.clone_from(new_upper);
3064 }
3065
3066 let mut new_read_capability = collection
3067 .read_policy
3068 .frontier(collection.write_frontier.borrow());
3069
3070 if id.is_user() {
3071 trace!(
3072 %id,
3073 implied_capability = ?collection.implied_capability,
3074 policy = ?collection.read_policy,
3075 write_frontier = ?collection.write_frontier,
3076 ?new_read_capability,
3077 "update_write_frontiers");
3078 }
3079
3080 if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
3081 let mut update = ChangeBatch::new();
3082 update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
3083 std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
3084 update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
3085
3086 if !update.is_empty() {
3087 read_capability_changes.insert(*id, update);
3088 }
3089 }
3090 }
3091
3092 if !read_capability_changes.is_empty() {
3093 StorageCollectionsImpl::update_read_capabilities_inner(
3094 &self.cmds_tx,
3095 &mut self_collections,
3096 &mut read_capability_changes,
3097 );
3098 }
3099 }
3100
3101 async fn downgrade_sinces(&mut self, cmds: Vec<(GlobalId, Antichain<T>)>) {
3102 for (id, new_since) in cmds {
3103 let since_handle = if let Some(c) = self.since_handles.get_mut(&id) {
3104 c
3105 } else {
3106 trace!("downgrade_sinces: reference to absent collection {id}");
3108 continue;
3109 };
3110
3111 if id.is_user() {
3112 trace!("downgrading since of {} to {:?}", id, new_since);
3113 }
3114
3115 let epoch = since_handle.opaque().clone();
3116 let result = if new_since.is_empty() {
3117 let res = Some(
3121 since_handle
3122 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3123 .await,
3124 );
3125
3126 info!(%id, "removing persist handles because the since advanced to []!");
3127
3128 let _since_handle = self.since_handles.remove(&id).expect("known to exist");
3129 let dropped_shard_id = if let Some(shard_id) = self.shard_by_id.remove(&id) {
3130 shard_id
3131 } else {
3132 panic!("missing GlobalId -> ShardId mapping for id {id}");
3133 };
3134
3135 self.txns_shards.remove(&id);
3140
3141 if !self
3142 .config
3143 .lock()
3144 .expect("lock poisoned")
3145 .parameters
3146 .finalize_shards
3147 {
3148 info!(
3149 "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3150 );
3151 return;
3152 }
3153
3154 info!(%id, %dropped_shard_id, "enqueing shard finalization due to dropped collection and dropped persist handle");
3155
3156 self.finalizable_shards.lock().insert(dropped_shard_id);
3157
3158 res
3159 } else {
3160 since_handle
3161 .maybe_compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3162 .await
3163 };
3164
3165 if let Some(Err(other_epoch)) = result {
3166 mz_ore::halt!("fenced by envd @ {other_epoch:?}. ours = {epoch:?}");
3167 }
3168 }
3169 }
3170}
3171
3172struct FinalizeShardsTaskConfig {
3173 envd_epoch: NonZeroI64,
3174 config: Arc<Mutex<StorageConfiguration>>,
3175 metrics: StorageCollectionsMetrics,
3176 finalizable_shards: Arc<ShardIdSet>,
3177 finalized_shards: Arc<ShardIdSet>,
3178 persist_location: PersistLocation,
3179 persist: Arc<PersistClientCache>,
3180 read_only: bool,
3181}
3182
3183async fn finalize_shards_task<T>(
3184 FinalizeShardsTaskConfig {
3185 envd_epoch,
3186 config,
3187 metrics,
3188 finalizable_shards,
3189 finalized_shards,
3190 persist_location,
3191 persist,
3192 read_only,
3193 }: FinalizeShardsTaskConfig,
3194) where
3195 T: TimelyTimestamp + TotalOrder + Lattice + Codec64 + Sync,
3196{
3197 if read_only {
3198 info!("disabling shard finalization in read only mode");
3199 return;
3200 }
3201
3202 let mut interval = tokio::time::interval(Duration::from_secs(5));
3203 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
3204 loop {
3205 interval.tick().await;
3206
3207 if !config
3208 .lock()
3209 .expect("lock poisoned")
3210 .parameters
3211 .finalize_shards
3212 {
3213 debug!(
3214 "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3215 );
3216 continue;
3217 }
3218
3219 let current_finalizable_shards = {
3220 finalizable_shards.lock().iter().cloned().collect_vec()
3223 };
3224
3225 if current_finalizable_shards.is_empty() {
3226 debug!("no shards to finalize");
3227 continue;
3228 }
3229
3230 debug!(?current_finalizable_shards, "attempting to finalize shards");
3231
3232 let persist_client = persist.open(persist_location.clone()).await.unwrap();
3234
3235 let metrics = &metrics;
3236 let finalizable_shards = &finalizable_shards;
3237 let finalized_shards = &finalized_shards;
3238 let persist_client = &persist_client;
3239 let diagnostics = &Diagnostics::from_purpose("finalizing shards");
3240
3241 let force_downgrade_since = STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION
3242 .get(config.lock().expect("lock poisoned").config_set());
3243
3244 let epoch = &PersistEpoch::from(envd_epoch);
3245
3246 futures::stream::iter(current_finalizable_shards.clone())
3247 .map(|shard_id| async move {
3248 let persist_client = persist_client.clone();
3249 let diagnostics = diagnostics.clone();
3250 let epoch = epoch.clone();
3251
3252 metrics.finalization_started.inc();
3253
3254 let is_finalized = persist_client
3255 .is_finalized::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
3256 .await
3257 .expect("invalid persist usage");
3258
3259 if is_finalized {
3260 debug!(%shard_id, "shard is already finalized!");
3261 Some(shard_id)
3262 } else {
3263 debug!(%shard_id, "finalizing shard");
3264 let finalize = || async move {
3265 let diagnostics = Diagnostics::from_purpose("finalizing shards");
3267
3268 let mut write_handle: WriteHandle<SourceData, (), T, StorageDiff> =
3271 persist_client
3272 .open_writer(
3273 shard_id,
3274 Arc::new(RelationDesc::empty()),
3275 Arc::new(UnitSchema),
3276 diagnostics,
3277 )
3278 .await
3279 .expect("invalid persist usage");
3280 write_handle.advance_upper(&Antichain::new()).await;
3281 write_handle.expire().await;
3282
3283 if force_downgrade_since {
3284 let mut since_handle: SinceHandle<
3285 SourceData,
3286 (),
3287 T,
3288 StorageDiff,
3289 PersistEpoch,
3290 > = persist_client
3291 .open_critical_since(
3292 shard_id,
3293 PersistClient::CONTROLLER_CRITICAL_SINCE,
3294 Diagnostics::from_purpose("finalizing shards"),
3295 )
3296 .await
3297 .expect("invalid persist usage");
3298 let handle_epoch = since_handle.opaque().clone();
3299 let our_epoch = epoch.clone();
3300 let epoch = if our_epoch.0 > handle_epoch.0 {
3301 handle_epoch
3304 } else {
3305 our_epoch
3310 };
3311 let new_since = Antichain::new();
3312 let downgrade = since_handle
3313 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3314 .await;
3315 if let Err(e) = downgrade {
3316 warn!("tried to finalize a shard with an advancing epoch: {e:?}");
3317 return Ok(());
3318 }
3319 }
3322
3323 persist_client
3324 .finalize_shard::<SourceData, (), T, StorageDiff>(
3325 shard_id,
3326 Diagnostics::from_purpose("finalizing shards"),
3327 )
3328 .await
3329 };
3330
3331 match finalize().await {
3332 Err(e) => {
3333 warn!("error during finalization of shard {shard_id}: {e:?}");
3336 None
3337 }
3338 Ok(()) => {
3339 debug!(%shard_id, "finalize success!");
3340 Some(shard_id)
3341 }
3342 }
3343 }
3344 })
3345 .buffer_unordered(10)
3350 .for_each(|shard_id| async move {
3354 match shard_id {
3355 None => metrics.finalization_failed.inc(),
3356 Some(shard_id) => {
3357 {
3364 let mut finalizable_shards = finalizable_shards.lock();
3365 let mut finalized_shards = finalized_shards.lock();
3366 finalizable_shards.remove(&shard_id);
3367 finalized_shards.insert(shard_id);
3368 }
3369
3370 metrics.finalization_succeeded.inc();
3371 }
3372 }
3373 })
3374 .await;
3375
3376 debug!("done finalizing shards");
3377 }
3378}
3379
3380#[derive(Debug)]
3381pub(crate) enum SnapshotStatsAsOf<T: TimelyTimestamp + Lattice + Codec64> {
3382 Direct(Antichain<T>),
3385 Txns(DataSnapshot<T>),
3388}
3389
3390#[cfg(test)]
3391mod tests {
3392 use std::str::FromStr;
3393 use std::sync::Arc;
3394
3395 use mz_build_info::DUMMY_BUILD_INFO;
3396 use mz_dyncfg::ConfigSet;
3397 use mz_ore::assert_err;
3398 use mz_ore::metrics::{MetricsRegistry, UIntGauge};
3399 use mz_ore::now::SYSTEM_TIME;
3400 use mz_ore::url::SensitiveUrl;
3401 use mz_persist_client::cache::PersistClientCache;
3402 use mz_persist_client::cfg::PersistConfig;
3403 use mz_persist_client::rpc::PubSubClientConnection;
3404 use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
3405 use mz_persist_types::codec_impls::UnitSchema;
3406 use mz_repr::{RelationDesc, Row};
3407 use mz_secrets::InMemorySecretsController;
3408
3409 use super::*;
3410
3411 #[mz_ore::test(tokio::test)]
3412 #[cfg_attr(miri, ignore)] async fn test_snapshot_stats(&self) {
3414 let persist_location = PersistLocation {
3415 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3416 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3417 };
3418 let persist_client = PersistClientCache::new(
3419 PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
3420 &MetricsRegistry::new(),
3421 |_, _| PubSubClientConnection::noop(),
3422 );
3423 let persist_client = Arc::new(persist_client);
3424
3425 let (cmds_tx, mut background_task) =
3426 BackgroundTask::new_for_test(persist_location.clone(), Arc::clone(&persist_client));
3427 let background_task =
3428 mz_ore::task::spawn(|| "storage_collections::background_task", async move {
3429 background_task.run().await
3430 });
3431
3432 let persist = persist_client.open(persist_location).await.unwrap();
3433
3434 let shard_id = ShardId::new();
3435 let since_handle = persist
3436 .open_critical_since(
3437 shard_id,
3438 PersistClient::CONTROLLER_CRITICAL_SINCE,
3439 Diagnostics::for_tests(),
3440 )
3441 .await
3442 .unwrap();
3443 let write_handle = persist
3444 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3445 shard_id,
3446 Arc::new(RelationDesc::empty()),
3447 Arc::new(UnitSchema),
3448 Diagnostics::for_tests(),
3449 )
3450 .await
3451 .unwrap();
3452
3453 cmds_tx
3454 .send(BackgroundCmd::Register {
3455 id: GlobalId::User(1),
3456 is_in_txns: false,
3457 since_handle: SinceHandleWrapper::Critical(since_handle),
3458 write_handle,
3459 })
3460 .unwrap();
3461
3462 let mut write_handle = persist
3463 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3464 shard_id,
3465 Arc::new(RelationDesc::empty()),
3466 Arc::new(UnitSchema),
3467 Diagnostics::for_tests(),
3468 )
3469 .await
3470 .unwrap();
3471
3472 let stats =
3474 snapshot_stats(&cmds_tx, GlobalId::User(2), Antichain::from_elem(0.into())).await;
3475 assert_err!(stats);
3476
3477 let stats_fut = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3479 assert_none!(stats_fut.now_or_never());
3480
3481 let stats_ts1_fut =
3483 snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3484
3485 let data = (
3487 (SourceData(Ok(Row::default())), ()),
3488 mz_repr::Timestamp::from(0),
3489 1i64,
3490 );
3491 let () = write_handle
3492 .compare_and_append(
3493 &[data],
3494 Antichain::from_elem(0.into()),
3495 Antichain::from_elem(1.into()),
3496 )
3497 .await
3498 .unwrap()
3499 .unwrap();
3500
3501 let stats = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(0.into()))
3503 .await
3504 .unwrap();
3505 assert_eq!(stats.num_updates, 1);
3506
3507 let data = (
3509 (SourceData(Ok(Row::default())), ()),
3510 mz_repr::Timestamp::from(1),
3511 1i64,
3512 );
3513 let () = write_handle
3514 .compare_and_append(
3515 &[data],
3516 Antichain::from_elem(1.into()),
3517 Antichain::from_elem(2.into()),
3518 )
3519 .await
3520 .unwrap()
3521 .unwrap();
3522
3523 let stats = stats_ts1_fut.await.unwrap();
3524 assert_eq!(stats.num_updates, 2);
3525
3526 drop(background_task);
3528 }
3529
3530 async fn snapshot_stats<T: TimelyTimestamp + Lattice + Codec64>(
3531 cmds_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
3532 id: GlobalId,
3533 as_of: Antichain<T>,
3534 ) -> Result<SnapshotStats, StorageError<T>> {
3535 let (tx, rx) = oneshot::channel();
3536 cmds_tx
3537 .send(BackgroundCmd::SnapshotStats(
3538 id,
3539 SnapshotStatsAsOf::Direct(as_of),
3540 tx,
3541 ))
3542 .unwrap();
3543 let res = rx.await.expect("BackgroundTask should be live").0;
3544
3545 res.await
3546 }
3547
3548 impl<T: TimelyTimestamp + Lattice + Codec64> BackgroundTask<T> {
3549 fn new_for_test(
3550 _persist_location: PersistLocation,
3551 _persist_client: Arc<PersistClientCache>,
3552 ) -> (mpsc::UnboundedSender<BackgroundCmd<T>>, Self) {
3553 let (cmds_tx, cmds_rx) = mpsc::unbounded_channel();
3554 let (_holds_tx, holds_rx) = mpsc::unbounded_channel();
3555 let connection_context =
3556 ConnectionContext::for_tests(Arc::new(InMemorySecretsController::new()));
3557
3558 let task = Self {
3559 config: Arc::new(Mutex::new(StorageConfiguration::new(
3560 connection_context,
3561 ConfigSet::default(),
3562 ))),
3563 cmds_tx: cmds_tx.clone(),
3564 cmds_rx,
3565 holds_rx,
3566 finalizable_shards: Arc::new(ShardIdSet::new(
3567 UIntGauge::new("finalizable_shards", "dummy gauge for tests").unwrap(),
3568 )),
3569 collections: Arc::new(Mutex::new(BTreeMap::new())),
3570 shard_by_id: BTreeMap::new(),
3571 since_handles: BTreeMap::new(),
3572 txns_handle: None,
3573 txns_shards: BTreeSet::new(),
3574 };
3575
3576 (cmds_tx, task)
3577 }
3578 }
3579}