1use std::cmp::Reverse;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::num::NonZeroI64;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use async_trait::async_trait;
20use differential_dataflow::lattice::Lattice;
21use futures::future::BoxFuture;
22use futures::stream::{BoxStream, FuturesUnordered};
23use futures::{Future, FutureExt, StreamExt};
24use itertools::Itertools;
25
26use mz_ore::collections::CollectionExt;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::now::{EpochMillis, NowFn};
29use mz_ore::task::AbortOnDropHandle;
30use mz_ore::{assert_none, instrument, soft_assert_or_log};
31use mz_persist_client::cache::PersistClientCache;
32use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
33use mz_persist_client::critical::SinceHandle;
34use mz_persist_client::read::{Cursor, ReadHandle};
35use mz_persist_client::schema::CaESchema;
36use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
37use mz_persist_client::write::WriteHandle;
38use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
39use mz_persist_types::Codec64;
40use mz_persist_types::codec_impls::UnitSchema;
41use mz_persist_types::txn::TxnsCodec;
42use mz_repr::{GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
43use mz_storage_types::StorageDiff;
44use mz_storage_types::configuration::StorageConfiguration;
45use mz_storage_types::connections::ConnectionContext;
46use mz_storage_types::connections::inline::InlinedConnection;
47use mz_storage_types::controller::{CollectionMetadata, StorageError, TxnsCodecRow};
48use mz_storage_types::dyncfgs::STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION;
49use mz_storage_types::parameters::StorageParameters;
50use mz_storage_types::read_holds::{ReadHold, ReadHoldError};
51use mz_storage_types::read_policy::ReadPolicy;
52use mz_storage_types::sources::{
53 GenericSourceConnection, SourceData, SourceDesc, SourceEnvelope, SourceExport,
54 SourceExportDataConfig, Timeline,
55};
56use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
57use mz_txn_wal::metrics::Metrics as TxnMetrics;
58use mz_txn_wal::txn_read::{DataSnapshot, TxnsRead};
59use mz_txn_wal::txns::TxnsHandle;
60use timely::PartialOrder;
61use timely::order::TotalOrder;
62use timely::progress::frontier::MutableAntichain;
63use timely::progress::{Antichain, ChangeBatch, Timestamp as TimelyTimestamp};
64use tokio::sync::{mpsc, oneshot};
65use tokio::time::MissedTickBehavior;
66use tracing::{debug, info, trace, warn};
67
68use crate::client::TimestamplessUpdateBuilder;
69use crate::controller::{
70 CollectionDescription, DataSource, PersistEpoch, StorageMetadata, StorageTxn,
71};
72use crate::storage_collections::metrics::{ShardIdSet, StorageCollectionsMetrics};
73
74mod metrics;
75
76#[async_trait]
90pub trait StorageCollections: Debug {
91 type Timestamp: TimelyTimestamp;
92
93 async fn initialize_state(
100 &self,
101 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
102 init_ids: BTreeSet<GlobalId>,
103 ) -> Result<(), StorageError<Self::Timestamp>>;
104
105 fn update_parameters(&self, config_params: StorageParameters);
107
108 fn collection_metadata(
110 &self,
111 id: GlobalId,
112 ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>>;
113
114 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
120
121 fn collection_frontiers(
123 &self,
124 id: GlobalId,
125 ) -> Result<CollectionFrontiers<Self::Timestamp>, StorageError<Self::Timestamp>> {
126 let frontiers = self
127 .collections_frontiers(vec![id])?
128 .expect_element(|| "known to exist");
129
130 Ok(frontiers)
131 }
132
133 fn collections_frontiers(
136 &self,
137 id: Vec<GlobalId>,
138 ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>>;
139
140 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>>;
145
146 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
149
150 async fn snapshot_stats(
153 &self,
154 id: GlobalId,
155 as_of: Antichain<Self::Timestamp>,
156 ) -> Result<SnapshotStats, StorageError<Self::Timestamp>>;
157
158 async fn snapshot_parts_stats(
167 &self,
168 id: GlobalId,
169 as_of: Antichain<Self::Timestamp>,
170 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>;
171
172 fn snapshot(
174 &self,
175 id: GlobalId,
176 as_of: Self::Timestamp,
177 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>;
178
179 async fn snapshot_latest(
182 &self,
183 id: GlobalId,
184 ) -> Result<Vec<Row>, StorageError<Self::Timestamp>>;
185
186 fn snapshot_cursor(
188 &self,
189 id: GlobalId,
190 as_of: Self::Timestamp,
191 ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
192 where
193 Self::Timestamp: Codec64 + TimelyTimestamp + Lattice;
194
195 fn snapshot_and_stream(
200 &self,
201 id: GlobalId,
202 as_of: Self::Timestamp,
203 ) -> BoxFuture<
204 'static,
205 Result<
206 BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
207 StorageError<Self::Timestamp>,
208 >,
209 >;
210
211 fn create_update_builder(
214 &self,
215 id: GlobalId,
216 ) -> BoxFuture<
217 'static,
218 Result<
219 TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
220 StorageError<Self::Timestamp>,
221 >,
222 >
223 where
224 Self::Timestamp: Lattice + Codec64;
225
226 async fn prepare_state(
232 &self,
233 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
234 ids_to_add: BTreeSet<GlobalId>,
235 ids_to_drop: BTreeSet<GlobalId>,
236 ids_to_register: BTreeMap<GlobalId, ShardId>,
237 ) -> Result<(), StorageError<Self::Timestamp>>;
238
239 async fn create_collections_for_bootstrap(
265 &self,
266 storage_metadata: &StorageMetadata,
267 register_ts: Option<Self::Timestamp>,
268 collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
269 migrated_storage_collections: &BTreeSet<GlobalId>,
270 ) -> Result<(), StorageError<Self::Timestamp>>;
271
272 async fn alter_ingestion_source_desc(
280 &self,
281 ingestion_id: GlobalId,
282 source_desc: SourceDesc,
283 ) -> Result<(), StorageError<Self::Timestamp>>;
284
285 async fn alter_ingestion_export_data_configs(
287 &self,
288 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
289 ) -> Result<(), StorageError<Self::Timestamp>>;
290
291 async fn alter_ingestion_connections(
296 &self,
297 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
298 ) -> Result<(), StorageError<Self::Timestamp>>;
299
300 async fn alter_table_desc(
302 &self,
303 existing_collection: GlobalId,
304 new_collection: GlobalId,
305 new_desc: RelationDesc,
306 expected_version: RelationVersion,
307 ) -> Result<(), StorageError<Self::Timestamp>>;
308
309 fn drop_collections_unvalidated(
321 &self,
322 storage_metadata: &StorageMetadata,
323 identifiers: Vec<GlobalId>,
324 );
325
326 fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>);
340
341 fn acquire_read_holds(
344 &self,
345 desired_holds: Vec<GlobalId>,
346 ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError>;
347
348 fn determine_time_dependence(
351 &self,
352 id: GlobalId,
353 ) -> Result<Option<TimeDependence>, TimeDependenceError>;
354}
355
356pub struct SnapshotCursor<T: Codec64 + TimelyTimestamp + Lattice> {
359 pub _read_handle: ReadHandle<SourceData, (), T, StorageDiff>,
362 pub cursor: Cursor<SourceData, (), T, StorageDiff>,
363}
364
365impl<T: Codec64 + TimelyTimestamp + Lattice + Sync> SnapshotCursor<T> {
366 pub async fn next(
367 &mut self,
368 ) -> Option<
369 impl Iterator<
370 Item = (
371 (Result<SourceData, String>, Result<(), String>),
372 T,
373 StorageDiff,
374 ),
375 > + Sized
376 + '_,
377 > {
378 self.cursor.next().await
379 }
380}
381
382#[derive(Debug)]
384pub struct CollectionFrontiers<T> {
385 pub id: GlobalId,
387
388 pub write_frontier: Antichain<T>,
390
391 pub implied_capability: Antichain<T>,
398
399 pub read_capabilities: Antichain<T>,
402}
403
404#[derive(Debug, Clone)]
407pub struct StorageCollectionsImpl<
408 T: TimelyTimestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
409> {
410 envd_epoch: NonZeroI64,
413
414 read_only: bool,
420
421 finalizable_shards: Arc<ShardIdSet>,
424
425 finalized_shards: Arc<ShardIdSet>,
430
431 collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
433
434 txns_read: TxnsRead<T>,
436
437 config: Arc<Mutex<StorageConfiguration>>,
439
440 initial_txn_upper: Antichain<T>,
449
450 persist_location: PersistLocation,
452
453 persist: Arc<PersistClientCache>,
455
456 cmd_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
458
459 holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<T>)>,
461
462 _background_task: Arc<AbortOnDropHandle<()>>,
464 _finalize_shards_task: Arc<AbortOnDropHandle<()>>,
465}
466
467impl<T> StorageCollectionsImpl<T>
477where
478 T: TimelyTimestamp
479 + Lattice
480 + Codec64
481 + From<EpochMillis>
482 + TimestampManipulation
483 + Into<mz_repr::Timestamp>
484 + Sync,
485{
486 pub async fn new(
494 persist_location: PersistLocation,
495 persist_clients: Arc<PersistClientCache>,
496 metrics_registry: &MetricsRegistry,
497 _now: NowFn,
498 txns_metrics: Arc<TxnMetrics>,
499 envd_epoch: NonZeroI64,
500 read_only: bool,
501 connection_context: ConnectionContext,
502 txn: &dyn StorageTxn<T>,
503 ) -> Self {
504 let metrics = StorageCollectionsMetrics::register_into(metrics_registry);
505
506 let txns_id = txn
510 .get_txn_wal_shard()
511 .expect("must call prepare initialization before creating StorageCollections");
512
513 let txns_client = persist_clients
514 .open(persist_location.clone())
515 .await
516 .expect("location should be valid");
517
518 let _txns_handle: TxnsHandle<SourceData, (), T, StorageDiff, PersistEpoch, TxnsCodecRow> =
521 TxnsHandle::open(
522 T::minimum(),
523 txns_client.clone(),
524 txns_client.dyncfgs().clone(),
525 Arc::clone(&txns_metrics),
526 txns_id,
527 )
528 .await;
529
530 let (txns_key_schema, txns_val_schema) = TxnsCodecRow::schemas();
532 let mut txns_write = txns_client
533 .open_writer(
534 txns_id,
535 Arc::new(txns_key_schema),
536 Arc::new(txns_val_schema),
537 Diagnostics {
538 shard_name: "txns".to_owned(),
539 handle_purpose: "commit txns".to_owned(),
540 },
541 )
542 .await
543 .expect("txns schema shouldn't change");
544
545 let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
546
547 let collections = Arc::new(std::sync::Mutex::new(BTreeMap::default()));
548 let finalizable_shards =
549 Arc::new(ShardIdSet::new(metrics.finalization_outstanding.clone()));
550 let finalized_shards =
551 Arc::new(ShardIdSet::new(metrics.finalization_pending_commit.clone()));
552 let config = Arc::new(Mutex::new(StorageConfiguration::new(
553 connection_context,
554 mz_dyncfgs::all_dyncfgs(),
555 )));
556
557 let initial_txn_upper = txns_write.fetch_recent_upper().await.to_owned();
558
559 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
560 let (holds_tx, holds_rx) = mpsc::unbounded_channel();
561 let mut background_task = BackgroundTask {
562 config: Arc::clone(&config),
563 cmds_tx: cmd_tx.clone(),
564 cmds_rx: cmd_rx,
565 holds_rx,
566 collections: Arc::clone(&collections),
567 finalizable_shards: Arc::clone(&finalizable_shards),
568 shard_by_id: BTreeMap::new(),
569 since_handles: BTreeMap::new(),
570 txns_handle: Some(txns_write),
571 txns_shards: Default::default(),
572 };
573
574 let background_task =
575 mz_ore::task::spawn(|| "storage_collections::background_task", async move {
576 background_task.run().await
577 });
578
579 let finalize_shards_task = mz_ore::task::spawn(
580 || "storage_collections::finalize_shards_task",
581 finalize_shards_task::<T>(FinalizeShardsTaskConfig {
582 envd_epoch: envd_epoch.clone(),
583 config: Arc::clone(&config),
584 metrics,
585 finalizable_shards: Arc::clone(&finalizable_shards),
586 finalized_shards: Arc::clone(&finalized_shards),
587 persist_location: persist_location.clone(),
588 persist: Arc::clone(&persist_clients),
589 read_only,
590 }),
591 );
592
593 Self {
594 finalizable_shards,
595 finalized_shards,
596 collections,
597 txns_read,
598 envd_epoch,
599 read_only,
600 config,
601 initial_txn_upper,
602 persist_location,
603 persist: persist_clients,
604 cmd_tx,
605 holds_tx,
606 _background_task: Arc::new(background_task.abort_on_drop()),
607 _finalize_shards_task: Arc::new(finalize_shards_task.abort_on_drop()),
608 }
609 }
610
611 async fn open_data_handles(
619 &self,
620 id: &GlobalId,
621 shard: ShardId,
622 since: Option<&Antichain<T>>,
623 relation_desc: RelationDesc,
624 persist_client: &PersistClient,
625 ) -> (
626 WriteHandle<SourceData, (), T, StorageDiff>,
627 SinceHandleWrapper<T>,
628 ) {
629 let since_handle = if self.read_only {
630 let read_handle = self
631 .open_leased_handle(id, shard, relation_desc.clone(), since, persist_client)
632 .await;
633 SinceHandleWrapper::Leased(read_handle)
634 } else {
635 let since_handle = self
636 .open_critical_handle(id, shard, since, persist_client)
637 .await;
638
639 SinceHandleWrapper::Critical(since_handle)
640 };
641
642 let mut write_handle = self
643 .open_write_handle(id, shard, relation_desc, persist_client)
644 .await;
645
646 write_handle.fetch_recent_upper().await;
657
658 (write_handle, since_handle)
659 }
660
661 async fn open_write_handle(
663 &self,
664 id: &GlobalId,
665 shard: ShardId,
666 relation_desc: RelationDesc,
667 persist_client: &PersistClient,
668 ) -> WriteHandle<SourceData, (), T, StorageDiff> {
669 let diagnostics = Diagnostics {
670 shard_name: id.to_string(),
671 handle_purpose: format!("controller data for {}", id),
672 };
673
674 let write = persist_client
675 .open_writer(
676 shard,
677 Arc::new(relation_desc),
678 Arc::new(UnitSchema),
679 diagnostics.clone(),
680 )
681 .await
682 .expect("invalid persist usage");
683
684 write
685 }
686
687 async fn open_critical_handle(
695 &self,
696 id: &GlobalId,
697 shard: ShardId,
698 since: Option<&Antichain<T>>,
699 persist_client: &PersistClient,
700 ) -> SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch> {
701 tracing::debug!(%id, ?since, "opening critical handle");
702
703 assert!(
704 !self.read_only,
705 "attempting to open critical SinceHandle in read-only mode"
706 );
707
708 let diagnostics = Diagnostics {
709 shard_name: id.to_string(),
710 handle_purpose: format!("controller data for {}", id),
711 };
712
713 let since_handle = {
716 let mut handle: SinceHandle<_, _, _, _, PersistEpoch> = persist_client
719 .open_critical_since(
720 shard,
721 PersistClient::CONTROLLER_CRITICAL_SINCE,
722 diagnostics.clone(),
723 )
724 .await
725 .expect("invalid persist usage");
726
727 let provided_since = match since {
731 Some(since) => since,
732 None => &Antichain::from_elem(T::minimum()),
733 };
734 let since = handle.since().join(provided_since);
735
736 let our_epoch = self.envd_epoch;
737
738 loop {
739 let current_epoch: PersistEpoch = handle.opaque().clone();
740
741 let unchecked_success = current_epoch.0.map(|e| e <= our_epoch).unwrap_or(true);
743
744 if unchecked_success {
745 let checked_success = handle
748 .compare_and_downgrade_since(
749 ¤t_epoch,
750 (&PersistEpoch::from(our_epoch), &since),
751 )
752 .await
753 .is_ok();
754 if checked_success {
755 break handle;
756 }
757 } else {
758 mz_ore::halt!("fenced by envd @ {current_epoch:?}. ours = {our_epoch}");
759 }
760 }
761 };
762
763 since_handle
764 }
765
766 async fn open_leased_handle(
772 &self,
773 id: &GlobalId,
774 shard: ShardId,
775 relation_desc: RelationDesc,
776 since: Option<&Antichain<T>>,
777 persist_client: &PersistClient,
778 ) -> ReadHandle<SourceData, (), T, StorageDiff> {
779 tracing::debug!(%id, ?since, "opening leased handle");
780
781 let diagnostics = Diagnostics {
782 shard_name: id.to_string(),
783 handle_purpose: format!("controller data for {}", id),
784 };
785
786 let use_critical_since = false;
787 let mut handle: ReadHandle<_, _, _, _> = persist_client
788 .open_leased_reader(
789 shard,
790 Arc::new(relation_desc),
791 Arc::new(UnitSchema),
792 diagnostics.clone(),
793 use_critical_since,
794 )
795 .await
796 .expect("invalid persist usage");
797
798 let provided_since = match since {
802 Some(since) => since,
803 None => &Antichain::from_elem(T::minimum()),
804 };
805 let since = handle.since().join(provided_since);
806
807 handle.downgrade_since(&since).await;
808
809 handle
810 }
811
812 fn register_handles(
813 &self,
814 id: GlobalId,
815 is_in_txns: bool,
816 since_handle: SinceHandleWrapper<T>,
817 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
818 ) {
819 self.send(BackgroundCmd::Register {
820 id,
821 is_in_txns,
822 since_handle,
823 write_handle,
824 });
825 }
826
827 fn send(&self, cmd: BackgroundCmd<T>) {
828 let _ = self.cmd_tx.send(cmd);
829 }
830
831 async fn snapshot_stats_inner(
832 &self,
833 id: GlobalId,
834 as_of: SnapshotStatsAsOf<T>,
835 ) -> Result<SnapshotStats, StorageError<T>> {
836 let (tx, rx) = oneshot::channel();
843 self.send(BackgroundCmd::SnapshotStats(id, as_of, tx));
844 rx.await.expect("BackgroundTask should be live").0.await
845 }
846
847 fn install_collection_dependency_read_holds_inner(
853 &self,
854 self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
855 id: GlobalId,
856 ) -> Result<(), StorageError<T>> {
857 let (deps, collection_implied_capability) = match self_collections.get(&id) {
858 Some(CollectionState {
859 storage_dependencies: deps,
860 implied_capability,
861 ..
862 }) => (deps.clone(), implied_capability),
863 _ => return Ok(()),
864 };
865
866 for dep in deps.iter() {
867 let dep_collection = self_collections
868 .get(dep)
869 .ok_or(StorageError::IdentifierMissing(id))?;
870
871 mz_ore::soft_assert_or_log!(
872 PartialOrder::less_equal(
873 &dep_collection.implied_capability,
874 collection_implied_capability
875 ),
876 "dependency since ({dep}@{:?}) cannot be in advance of dependent's since ({id}@{:?})",
877 dep_collection.implied_capability,
878 collection_implied_capability,
879 );
880 }
881
882 self.install_read_capabilities_inner(
883 self_collections,
884 id,
885 &deps,
886 collection_implied_capability.clone(),
887 )?;
888
889 Ok(())
890 }
891
892 fn determine_collection_dependencies(
896 &self,
897 self_collections: &BTreeMap<GlobalId, CollectionState<T>>,
898 data_source: &DataSource<T>,
899 ) -> Result<Vec<GlobalId>, StorageError<T>> {
900 let dependencies = match &data_source {
901 DataSource::Introspection(_)
902 | DataSource::Webhook
903 | DataSource::Table { primary: None }
904 | DataSource::Progress
905 | DataSource::Other => Vec::new(),
906 DataSource::Table {
907 primary: Some(primary),
908 } => vec![*primary],
909 DataSource::IngestionExport {
910 ingestion_id,
911 data_config,
912 ..
913 } => {
914 let source = self_collections
917 .get(ingestion_id)
918 .ok_or(StorageError::IdentifierMissing(*ingestion_id))?;
919 let DataSource::Ingestion(ingestion) = &source.description.data_source else {
920 panic!("SourceExport must refer to a primary source that already exists");
921 };
922
923 match data_config.envelope {
924 SourceEnvelope::CdcV2 => Vec::new(),
925 _ => vec![ingestion.remap_collection_id],
926 }
927 }
928 DataSource::Ingestion(ingestion) => vec![ingestion.remap_collection_id],
930 DataSource::Sink { desc } => vec![desc.sink.from],
931 };
932
933 Ok(dependencies)
934 }
935
936 #[instrument(level = "debug")]
938 fn install_read_capabilities_inner(
939 &self,
940 self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
941 from_id: GlobalId,
942 storage_dependencies: &[GlobalId],
943 read_capability: Antichain<T>,
944 ) -> Result<(), StorageError<T>> {
945 let mut changes = ChangeBatch::new();
946 for time in read_capability.iter() {
947 changes.update(time.clone(), 1);
948 }
949
950 if tracing::span_enabled!(tracing::Level::TRACE) {
951 let user_capabilities = self_collections
953 .iter_mut()
954 .filter(|(id, _c)| id.is_user())
955 .map(|(id, c)| {
956 let updates = c.read_capabilities.updates().cloned().collect_vec();
957 (*id, c.implied_capability.clone(), updates)
958 })
959 .collect_vec();
960
961 trace!(
962 %from_id,
963 ?storage_dependencies,
964 ?read_capability,
965 ?user_capabilities,
966 "install_read_capabilities_inner");
967 }
968
969 let mut storage_read_updates = storage_dependencies
970 .iter()
971 .map(|id| (*id, changes.clone()))
972 .collect();
973
974 StorageCollectionsImpl::update_read_capabilities_inner(
975 &self.cmd_tx,
976 self_collections,
977 &mut storage_read_updates,
978 );
979
980 if tracing::span_enabled!(tracing::Level::TRACE) {
981 let user_capabilities = self_collections
983 .iter_mut()
984 .filter(|(id, _c)| id.is_user())
985 .map(|(id, c)| {
986 let updates = c.read_capabilities.updates().cloned().collect_vec();
987 (*id, c.implied_capability.clone(), updates)
988 })
989 .collect_vec();
990
991 trace!(
992 %from_id,
993 ?storage_dependencies,
994 ?read_capability,
995 ?user_capabilities,
996 "after install_read_capabilities_inner!");
997 }
998
999 Ok(())
1000 }
1001
1002 async fn recent_upper(&self, id: GlobalId) -> Result<Antichain<T>, StorageError<T>> {
1003 let metadata = &self.collection_metadata(id)?;
1004 let persist_client = self
1005 .persist
1006 .open(metadata.persist_location.clone())
1007 .await
1008 .unwrap();
1009 let diagnostics = Diagnostics {
1012 shard_name: id.to_string(),
1013 handle_purpose: format!("controller data for {}", id),
1014 };
1015 let write = persist_client
1018 .open_writer::<SourceData, (), T, StorageDiff>(
1019 metadata.data_shard,
1020 Arc::new(metadata.relation_desc.clone()),
1021 Arc::new(UnitSchema),
1022 diagnostics.clone(),
1023 )
1024 .await
1025 .expect("invalid persist usage");
1026 Ok(write.shared_upper())
1027 }
1028
1029 async fn read_handle_for_snapshot(
1030 persist: Arc<PersistClientCache>,
1031 metadata: &CollectionMetadata,
1032 id: GlobalId,
1033 ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
1034 let persist_client = persist
1035 .open(metadata.persist_location.clone())
1036 .await
1037 .unwrap();
1038
1039 let read_handle = persist_client
1045 .open_leased_reader::<SourceData, (), _, _>(
1046 metadata.data_shard,
1047 Arc::new(metadata.relation_desc.clone()),
1048 Arc::new(UnitSchema),
1049 Diagnostics {
1050 shard_name: id.to_string(),
1051 handle_purpose: format!("snapshot {}", id),
1052 },
1053 USE_CRITICAL_SINCE_SNAPSHOT.get(&persist.cfg),
1054 )
1055 .await
1056 .expect("invalid persist usage");
1057 Ok(read_handle)
1058 }
1059
1060 fn snapshot(
1066 &self,
1067 id: GlobalId,
1068 as_of: T,
1069 txns_read: &TxnsRead<T>,
1070 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<T>>>
1071 where
1072 T: Codec64 + From<EpochMillis> + TimestampManipulation,
1073 {
1074 let metadata = match self.collection_metadata(id) {
1075 Ok(metadata) => metadata.clone(),
1076 Err(e) => return async { Err(e) }.boxed(),
1077 };
1078 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1079 assert_eq!(txns_id, txns_read.txns_id());
1080 txns_read.clone()
1081 });
1082 let persist = Arc::clone(&self.persist);
1083 async move {
1084 let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1085 let contents = match txns_read {
1086 None => {
1087 read_handle
1089 .snapshot_and_fetch(Antichain::from_elem(as_of))
1090 .await
1091 }
1092 Some(txns_read) => {
1093 txns_read.update_gt(as_of.clone()).await;
1107 let data_snapshot = txns_read
1108 .data_snapshot(metadata.data_shard, as_of.clone())
1109 .await;
1110 data_snapshot.snapshot_and_fetch(&mut read_handle).await
1111 }
1112 };
1113 match contents {
1114 Ok(contents) => {
1115 let mut snapshot = Vec::with_capacity(contents.len());
1116 for ((data, _), _, diff) in contents {
1117 let row = data.expect("invalid protobuf data").0?;
1120 snapshot.push((row, diff));
1121 }
1122 Ok(snapshot)
1123 }
1124 Err(_) => Err(StorageError::ReadBeforeSince(id)),
1125 }
1126 }
1127 .boxed()
1128 }
1129
1130 fn snapshot_and_stream(
1131 &self,
1132 id: GlobalId,
1133 as_of: T,
1134 txns_read: &TxnsRead<T>,
1135 ) -> BoxFuture<'static, Result<BoxStream<'static, (SourceData, T, StorageDiff)>, StorageError<T>>>
1136 {
1137 use futures::stream::StreamExt;
1138
1139 let metadata = match self.collection_metadata(id) {
1140 Ok(metadata) => metadata.clone(),
1141 Err(e) => return async { Err(e) }.boxed(),
1142 };
1143 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1144 assert_eq!(txns_id, txns_read.txns_id());
1145 txns_read.clone()
1146 });
1147 let persist = Arc::clone(&self.persist);
1148
1149 async move {
1150 let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1151 let stream = match txns_read {
1152 None => {
1153 read_handle
1155 .snapshot_and_stream(Antichain::from_elem(as_of))
1156 .await
1157 .map_err(|_| StorageError::ReadBeforeSince(id))?
1158 .boxed()
1159 }
1160 Some(txns_read) => {
1161 txns_read.update_gt(as_of.clone()).await;
1162 let data_snapshot = txns_read
1163 .data_snapshot(metadata.data_shard, as_of.clone())
1164 .await;
1165 data_snapshot
1166 .snapshot_and_stream(&mut read_handle)
1167 .await
1168 .map_err(|_| StorageError::ReadBeforeSince(id))?
1169 .boxed()
1170 }
1171 };
1172
1173 let stream = stream
1175 .map(|((k, _v), t, d)| {
1176 let data = k.expect("error while streaming from Persist");
1179 (data, t, d)
1180 })
1181 .boxed();
1182 Ok(stream)
1183 }
1184 .boxed()
1185 }
1186
1187 fn set_read_policies_inner(
1188 &self,
1189 collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1190 policies: Vec<(GlobalId, ReadPolicy<T>)>,
1191 ) {
1192 trace!("set_read_policies: {:?}", policies);
1193
1194 let mut read_capability_changes = BTreeMap::default();
1195
1196 for (id, policy) in policies.into_iter() {
1197 let collection = match collections.get_mut(&id) {
1198 Some(c) => c,
1199 None => {
1200 panic!("Reference to absent collection {id}");
1201 }
1202 };
1203
1204 let mut new_read_capability = policy.frontier(collection.write_frontier.borrow());
1205
1206 if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
1207 let mut update = ChangeBatch::new();
1208 update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
1209 std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
1210 update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
1211 if !update.is_empty() {
1212 read_capability_changes.insert(id, update);
1213 }
1214 }
1215
1216 collection.read_policy = policy;
1217 }
1218
1219 for (id, changes) in read_capability_changes.iter() {
1220 if id.is_user() {
1221 trace!(%id, ?changes, "in set_read_policies, capability changes");
1222 }
1223 }
1224
1225 if !read_capability_changes.is_empty() {
1226 StorageCollectionsImpl::update_read_capabilities_inner(
1227 &self.cmd_tx,
1228 collections,
1229 &mut read_capability_changes,
1230 );
1231 }
1232 }
1233
1234 fn update_read_capabilities_inner(
1238 cmd_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
1239 collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1240 updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>,
1241 ) {
1242 let mut collections_net = BTreeMap::new();
1244
1245 while let Some(id) = updates.keys().rev().next().cloned() {
1250 let mut update = updates.remove(&id).unwrap();
1251
1252 if id.is_user() {
1253 trace!(id = ?id, update = ?update, "update_read_capabilities");
1254 }
1255
1256 let collection = if let Some(c) = collections.get_mut(&id) {
1257 c
1258 } else {
1259 let has_positive_updates = update.iter().any(|(_ts, diff)| *diff > 0);
1260 if has_positive_updates {
1261 panic!(
1262 "reference to absent collection {id} but we have positive updates: {:?}",
1263 update
1264 );
1265 } else {
1266 continue;
1269 }
1270 };
1271
1272 let current_read_capabilities = collection.read_capabilities.frontier().to_owned();
1273 for (time, diff) in update.iter() {
1274 assert!(
1275 collection.read_capabilities.count_for(time) + diff >= 0,
1276 "update {:?} for collection {id} would lead to negative \
1277 read capabilities, read capabilities before applying: {:?}",
1278 update,
1279 collection.read_capabilities
1280 );
1281
1282 if collection.read_capabilities.count_for(time) + diff > 0 {
1283 assert!(
1284 current_read_capabilities.less_equal(time),
1285 "update {:?} for collection {id} is trying to \
1286 install read capabilities before the current \
1287 frontier of read capabilities, read capabilities before applying: {:?}",
1288 update,
1289 collection.read_capabilities
1290 );
1291 }
1292 }
1293
1294 let changes = collection.read_capabilities.update_iter(update.drain());
1295 update.extend(changes);
1296
1297 if id.is_user() {
1298 trace!(
1299 %id,
1300 ?collection.storage_dependencies,
1301 ?update,
1302 "forwarding update to storage dependencies");
1303 }
1304
1305 for id in collection.storage_dependencies.iter() {
1306 updates
1307 .entry(*id)
1308 .or_insert_with(ChangeBatch::new)
1309 .extend(update.iter().cloned());
1310 }
1311
1312 let (changes, frontier) = collections_net
1313 .entry(id)
1314 .or_insert_with(|| (<ChangeBatch<_>>::new(), Antichain::new()));
1315
1316 changes.extend(update.drain());
1317 *frontier = collection.read_capabilities.frontier().to_owned();
1318 }
1319
1320 let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
1323 for (key, (mut changes, frontier)) in collections_net {
1324 if !changes.is_empty() {
1325 let collection = collections.get(&key).expect("must still exist");
1327 let should_emit_persist_compaction = !matches!(
1328 collection.description.data_source,
1329 DataSource::Table { primary: Some(_) }
1330 );
1331
1332 if frontier.is_empty() {
1333 info!(id = %key, "removing collection state because the since advanced to []!");
1334 collections.remove(&key).expect("must still exist");
1335 }
1336
1337 if should_emit_persist_compaction {
1338 persist_compaction_commands.push((key, frontier));
1339 }
1340 }
1341 }
1342
1343 if !persist_compaction_commands.is_empty() {
1344 cmd_tx
1345 .send(BackgroundCmd::DowngradeSince(persist_compaction_commands))
1346 .expect("cannot fail to send");
1347 }
1348 }
1349
1350 fn synchronize_finalized_shards(&self, storage_metadata: &StorageMetadata) {
1352 self.finalized_shards
1353 .lock()
1354 .retain(|shard| storage_metadata.unfinalized_shards.contains(shard));
1355 }
1356}
1357
1358#[async_trait]
1360impl<T> StorageCollections for StorageCollectionsImpl<T>
1361where
1362 T: TimelyTimestamp
1363 + Lattice
1364 + Codec64
1365 + From<EpochMillis>
1366 + TimestampManipulation
1367 + Into<mz_repr::Timestamp>
1368 + Sync,
1369{
1370 type Timestamp = T;
1371
1372 async fn initialize_state(
1373 &self,
1374 txn: &mut (dyn StorageTxn<T> + Send),
1375 init_ids: BTreeSet<GlobalId>,
1376 ) -> Result<(), StorageError<T>> {
1377 let metadata = txn.get_collection_metadata();
1378 let existing_metadata: BTreeSet<_> = metadata.into_iter().map(|(id, _)| id).collect();
1379
1380 let new_collections: BTreeSet<GlobalId> =
1382 init_ids.difference(&existing_metadata).cloned().collect();
1383
1384 self.prepare_state(
1385 txn,
1386 new_collections,
1387 BTreeSet::default(),
1388 BTreeMap::default(),
1389 )
1390 .await?;
1391
1392 let unfinalized_shards = txn.get_unfinalized_shards().into_iter().collect_vec();
1400
1401 info!(?unfinalized_shards, "initializing finalizable_shards");
1402
1403 self.finalizable_shards.lock().extend(unfinalized_shards);
1404
1405 Ok(())
1406 }
1407
1408 fn update_parameters(&self, config_params: StorageParameters) {
1409 config_params.dyncfg_updates.apply(self.persist.cfg());
1412
1413 self.config
1414 .lock()
1415 .expect("lock poisoned")
1416 .update(config_params);
1417 }
1418
1419 fn collection_metadata(
1420 &self,
1421 id: GlobalId,
1422 ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>> {
1423 let collections = self.collections.lock().expect("lock poisoned");
1424
1425 collections
1426 .get(&id)
1427 .map(|c| c.collection_metadata.clone())
1428 .ok_or(StorageError::IdentifierMissing(id))
1429 }
1430
1431 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
1432 let collections = self.collections.lock().expect("lock poisoned");
1433
1434 collections
1435 .iter()
1436 .filter(|(_id, c)| !c.is_dropped())
1437 .map(|(id, c)| (*id, c.collection_metadata.clone()))
1438 .collect()
1439 }
1440
1441 fn collections_frontiers(
1442 &self,
1443 ids: Vec<GlobalId>,
1444 ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>> {
1445 if ids.is_empty() {
1446 return Ok(vec![]);
1447 }
1448
1449 let collections = self.collections.lock().expect("lock poisoned");
1450
1451 let res = ids
1452 .into_iter()
1453 .map(|id| {
1454 collections
1455 .get(&id)
1456 .map(|c| CollectionFrontiers {
1457 id: id.clone(),
1458 write_frontier: c.write_frontier.clone(),
1459 implied_capability: c.implied_capability.clone(),
1460 read_capabilities: c.read_capabilities.frontier().to_owned(),
1461 })
1462 .ok_or(StorageError::IdentifierMissing(id))
1463 })
1464 .collect::<Result<Vec<_>, _>>()?;
1465
1466 Ok(res)
1467 }
1468
1469 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
1470 let collections = self.collections.lock().expect("lock poisoned");
1471
1472 let res = collections
1473 .iter()
1474 .filter(|(_id, c)| !c.is_dropped())
1475 .map(|(id, c)| CollectionFrontiers {
1476 id: id.clone(),
1477 write_frontier: c.write_frontier.clone(),
1478 implied_capability: c.implied_capability.clone(),
1479 read_capabilities: c.read_capabilities.frontier().to_owned(),
1480 })
1481 .collect_vec();
1482
1483 res
1484 }
1485
1486 async fn snapshot_stats(
1487 &self,
1488 id: GlobalId,
1489 as_of: Antichain<Self::Timestamp>,
1490 ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
1491 let metadata = self.collection_metadata(id)?;
1492
1493 let as_of = match metadata.txns_shard.as_ref() {
1496 None => SnapshotStatsAsOf::Direct(as_of),
1497 Some(txns_id) => {
1498 assert_eq!(txns_id, self.txns_read.txns_id());
1499 let as_of = as_of
1500 .into_option()
1501 .expect("cannot read as_of the empty antichain");
1502 self.txns_read.update_gt(as_of.clone()).await;
1503 let data_snapshot = self
1504 .txns_read
1505 .data_snapshot(metadata.data_shard, as_of.clone())
1506 .await;
1507 SnapshotStatsAsOf::Txns(data_snapshot)
1508 }
1509 };
1510 self.snapshot_stats_inner(id, as_of).await
1511 }
1512
1513 async fn snapshot_parts_stats(
1514 &self,
1515 id: GlobalId,
1516 as_of: Antichain<Self::Timestamp>,
1517 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
1518 let metadata = {
1519 let self_collections = self.collections.lock().expect("lock poisoned");
1520
1521 let collection_metadata = self_collections
1522 .get(&id)
1523 .ok_or(StorageError::IdentifierMissing(id))
1524 .map(|c| c.collection_metadata.clone());
1525
1526 match collection_metadata {
1527 Ok(m) => m,
1528 Err(e) => return Box::pin(async move { Err(e) }),
1529 }
1530 };
1531
1532 let persist = Arc::clone(&self.persist);
1535 let read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await;
1536
1537 let data_snapshot = match (metadata, as_of.as_option()) {
1538 (
1539 CollectionMetadata {
1540 txns_shard: Some(txns_id),
1541 data_shard,
1542 ..
1543 },
1544 Some(as_of),
1545 ) => {
1546 assert_eq!(txns_id, *self.txns_read.txns_id());
1547 self.txns_read.update_gt(as_of.clone()).await;
1548 let data_snapshot = self
1549 .txns_read
1550 .data_snapshot(data_shard, as_of.clone())
1551 .await;
1552 Some(data_snapshot)
1553 }
1554 _ => None,
1555 };
1556
1557 Box::pin(async move {
1558 let read_handle = read_handle?;
1559 let result = match data_snapshot {
1560 Some(data_snapshot) => data_snapshot.snapshot_parts_stats(&read_handle).await,
1561 None => read_handle.snapshot_parts_stats(as_of).await,
1562 };
1563 read_handle.expire().await;
1564 result.map_err(|_| StorageError::ReadBeforeSince(id))
1565 })
1566 }
1567
1568 fn snapshot(
1574 &self,
1575 id: GlobalId,
1576 as_of: Self::Timestamp,
1577 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>> {
1578 self.snapshot(id, as_of, &self.txns_read)
1579 }
1580
1581 async fn snapshot_latest(
1582 &self,
1583 id: GlobalId,
1584 ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
1585 let upper = self.recent_upper(id).await?;
1586 let res = match upper.as_option() {
1587 Some(f) if f > &T::minimum() => {
1588 let as_of = f.step_back().unwrap();
1589
1590 let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap();
1591 snapshot
1592 .into_iter()
1593 .map(|(row, diff)| {
1594 assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
1595 row
1596 })
1597 .collect()
1598 }
1599 Some(_min) => {
1600 Vec::new()
1602 }
1603 _ => {
1606 return Err(StorageError::InvalidUsage(
1607 "collection closed, cannot determine a read timestamp based on the upper"
1608 .to_string(),
1609 ));
1610 }
1611 };
1612
1613 Ok(res)
1614 }
1615
1616 fn snapshot_cursor(
1617 &self,
1618 id: GlobalId,
1619 as_of: Self::Timestamp,
1620 ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
1621 where
1622 Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
1623 {
1624 let metadata = match self.collection_metadata(id) {
1625 Ok(metadata) => metadata.clone(),
1626 Err(e) => return async { Err(e) }.boxed(),
1627 };
1628 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1629 assert_eq!(txns_id, self.txns_read.txns_id());
1632 self.txns_read.clone()
1633 });
1634 let persist = Arc::clone(&self.persist);
1635
1636 async move {
1638 let mut handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1639 let cursor = match txns_read {
1640 None => {
1641 let cursor = handle
1642 .snapshot_cursor(Antichain::from_elem(as_of), |_| true)
1643 .await
1644 .map_err(|_| StorageError::ReadBeforeSince(id))?;
1645 SnapshotCursor {
1646 _read_handle: handle,
1647 cursor,
1648 }
1649 }
1650 Some(txns_read) => {
1651 txns_read.update_gt(as_of.clone()).await;
1652 let data_snapshot = txns_read
1653 .data_snapshot(metadata.data_shard, as_of.clone())
1654 .await;
1655 let cursor = data_snapshot
1656 .snapshot_cursor(&mut handle, |_| true)
1657 .await
1658 .map_err(|_| StorageError::ReadBeforeSince(id))?;
1659 SnapshotCursor {
1660 _read_handle: handle,
1661 cursor,
1662 }
1663 }
1664 };
1665
1666 Ok(cursor)
1667 }
1668 .boxed()
1669 }
1670
1671 fn snapshot_and_stream(
1672 &self,
1673 id: GlobalId,
1674 as_of: Self::Timestamp,
1675 ) -> BoxFuture<
1676 'static,
1677 Result<
1678 BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
1679 StorageError<Self::Timestamp>,
1680 >,
1681 >
1682 where
1683 Self::Timestamp: TimelyTimestamp + Lattice + Codec64 + 'static,
1684 {
1685 self.snapshot_and_stream(id, as_of, &self.txns_read)
1686 }
1687
1688 fn create_update_builder(
1689 &self,
1690 id: GlobalId,
1691 ) -> BoxFuture<
1692 'static,
1693 Result<
1694 TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
1695 StorageError<Self::Timestamp>,
1696 >,
1697 > {
1698 let metadata = match self.collection_metadata(id) {
1699 Ok(m) => m,
1700 Err(e) => return Box::pin(async move { Err(e) }),
1701 };
1702 let persist = Arc::clone(&self.persist);
1703
1704 async move {
1705 let persist_client = persist
1706 .open(metadata.persist_location.clone())
1707 .await
1708 .expect("invalid persist usage");
1709 let write_handle = persist_client
1710 .open_writer::<SourceData, (), Self::Timestamp, StorageDiff>(
1711 metadata.data_shard,
1712 Arc::new(metadata.relation_desc.clone()),
1713 Arc::new(UnitSchema),
1714 Diagnostics {
1715 shard_name: id.to_string(),
1716 handle_purpose: format!("create write batch {}", id),
1717 },
1718 )
1719 .await
1720 .expect("invalid persist usage");
1721 let builder = TimestamplessUpdateBuilder::new(&write_handle);
1722
1723 Ok(builder)
1724 }
1725 .boxed()
1726 }
1727
1728 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
1729 let collections = self.collections.lock().expect("lock poisoned");
1730
1731 if collections.contains_key(&id) {
1732 Ok(())
1733 } else {
1734 Err(StorageError::IdentifierMissing(id))
1735 }
1736 }
1737
1738 async fn prepare_state(
1739 &self,
1740 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
1741 ids_to_add: BTreeSet<GlobalId>,
1742 ids_to_drop: BTreeSet<GlobalId>,
1743 ids_to_register: BTreeMap<GlobalId, ShardId>,
1744 ) -> Result<(), StorageError<T>> {
1745 txn.insert_collection_metadata(
1746 ids_to_add
1747 .into_iter()
1748 .map(|id| (id, ShardId::new()))
1749 .collect(),
1750 )?;
1751 txn.insert_collection_metadata(ids_to_register)?;
1752
1753 let dropped_mappings = txn.delete_collection_metadata(ids_to_drop);
1755
1756 let dropped_shards = dropped_mappings
1757 .into_iter()
1758 .map(|(_id, shard)| shard)
1759 .collect();
1760
1761 txn.insert_unfinalized_shards(dropped_shards)?;
1762
1763 let finalized_shards = self.finalized_shards.lock().iter().copied().collect();
1766 txn.mark_shards_as_finalized(finalized_shards);
1767
1768 Ok(())
1769 }
1770
1771 #[instrument(level = "debug")]
1774 async fn create_collections_for_bootstrap(
1775 &self,
1776 storage_metadata: &StorageMetadata,
1777 register_ts: Option<Self::Timestamp>,
1778 mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
1779 migrated_storage_collections: &BTreeSet<GlobalId>,
1780 ) -> Result<(), StorageError<Self::Timestamp>> {
1781 let is_in_txns = |id, metadata: &CollectionMetadata| {
1782 metadata.txns_shard.is_some()
1783 && !(self.read_only && migrated_storage_collections.contains(&id))
1784 };
1785
1786 collections.sort_by_key(|(id, _)| *id);
1791 collections.dedup();
1792 for pos in 1..collections.len() {
1793 if collections[pos - 1].0 == collections[pos].0 {
1794 return Err(StorageError::CollectionIdReused(collections[pos].0));
1795 }
1796 }
1797
1798 {
1799 let self_collections = self.collections.lock().expect("lock poisoned");
1805 for (id, description) in collections.iter() {
1806 if let Some(existing_collection) = self_collections.get(id) {
1807 if &existing_collection.description != description {
1808 return Err(StorageError::CollectionIdReused(*id));
1809 }
1810 }
1811 }
1812 }
1813
1814 let enriched_with_metadata = collections
1817 .into_iter()
1818 .map(|(id, description)| {
1819 let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
1820
1821 let txns_shard = description
1825 .data_source
1826 .in_txns()
1827 .then(|| *self.txns_read.txns_id());
1828
1829 let metadata = CollectionMetadata {
1830 persist_location: self.persist_location.clone(),
1831 data_shard,
1832 relation_desc: description.desc.clone(),
1833 txns_shard,
1834 };
1835
1836 Ok((id, description, metadata))
1837 })
1838 .collect_vec();
1839
1840 let persist_client = self
1842 .persist
1843 .open(self.persist_location.clone())
1844 .await
1845 .unwrap();
1846 let persist_client = &persist_client;
1847 use futures::stream::{StreamExt, TryStreamExt};
1850 let this = &*self;
1851 let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
1852 .map(|data: Result<_, StorageError<Self::Timestamp>>| {
1853 let register_ts = register_ts.clone();
1854 async move {
1855 let (id, description, metadata) = data?;
1856
1857 debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
1862
1863 let (write, mut since_handle) = this
1864 .open_data_handles(
1865 &id,
1866 metadata.data_shard,
1867 description.since.as_ref(),
1868 metadata.relation_desc.clone(),
1869 persist_client,
1870 )
1871 .await;
1872
1873 match description.data_source {
1882 DataSource::Introspection(_)
1883 | DataSource::IngestionExport { .. }
1884 | DataSource::Webhook
1885 | DataSource::Ingestion(_)
1886 | DataSource::Progress
1887 | DataSource::Other => {}
1888 DataSource::Sink { .. } => {}
1889 DataSource::Table { .. } => {
1890 let register_ts = register_ts.expect(
1891 "caller should have provided a register_ts when creating a table",
1892 );
1893 if since_handle.since().elements() == &[T::minimum()]
1894 && !migrated_storage_collections.contains(&id)
1895 {
1896 debug!("advancing {} to initial since of {:?}", id, register_ts);
1897 let token = since_handle.opaque();
1898 let _ = since_handle
1899 .compare_and_downgrade_since(
1900 &token,
1901 (&token, &Antichain::from_elem(register_ts.clone())),
1902 )
1903 .await;
1904 }
1905 }
1906 }
1907
1908 Ok::<_, StorageError<Self::Timestamp>>((
1909 id,
1910 description,
1911 write,
1912 since_handle,
1913 metadata,
1914 ))
1915 }
1916 })
1917 .buffer_unordered(50)
1919 .try_collect()
1933 .await?;
1934
1935 #[derive(Ord, PartialOrd, Eq, PartialEq)]
1937 enum DependencyOrder {
1938 Table(Reverse<GlobalId>),
1940 Collection(GlobalId),
1942 Sink(GlobalId),
1944 }
1945 to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
1946 DataSource::Table { .. } => DependencyOrder::Table(Reverse(*id)),
1947 DataSource::Sink { .. } => DependencyOrder::Sink(*id),
1948 _ => DependencyOrder::Collection(*id),
1949 });
1950
1951 let mut self_collections = self.collections.lock().expect("lock poisoned");
1954
1955 for (id, description, write_handle, since_handle, metadata) in to_register {
1956 let write_frontier = write_handle.upper();
1957 let data_shard_since = since_handle.since().clone();
1958
1959 let storage_dependencies = self
1961 .determine_collection_dependencies(&*self_collections, &description.data_source)?;
1962
1963 let initial_since = match storage_dependencies
1965 .iter()
1966 .at_most_one()
1967 .expect("should have at most one dependency")
1968 {
1969 Some(dep) => {
1970 let dependency_collection = self_collections
1971 .get(dep)
1972 .ok_or(StorageError::IdentifierMissing(*dep))?;
1973 let dependency_since = dependency_collection.implied_capability.clone();
1974
1975 if PartialOrder::less_than(&data_shard_since, &dependency_since) {
1986 mz_ore::soft_assert_or_log!(
2005 write_frontier.elements() == &[T::minimum()]
2006 || write_frontier.is_empty()
2007 || PartialOrder::less_than(&dependency_since, write_frontier),
2008 "dependency ({dep}) since has advanced past dependent ({id}) upper \n
2009 dependent ({id}): since {:?}, upper {:?} \n
2010 dependency ({dep}): since {:?}",
2011 data_shard_since,
2012 write_frontier,
2013 dependency_since
2014 );
2015
2016 dependency_since
2017 } else {
2018 data_shard_since
2019 }
2020 }
2021 None => data_shard_since,
2022 };
2023
2024 let mut collection_state = CollectionState::new(
2025 description,
2026 initial_since,
2027 write_frontier.clone(),
2028 storage_dependencies,
2029 metadata.clone(),
2030 );
2031
2032 match &collection_state.description.data_source {
2034 DataSource::Introspection(_) => {
2035 self_collections.insert(id, collection_state);
2036 }
2037 DataSource::Webhook => {
2038 self_collections.insert(id, collection_state);
2039 }
2040 DataSource::IngestionExport {
2041 ingestion_id,
2042 details,
2043 data_config,
2044 } => {
2045 let source_collection = self_collections
2047 .get_mut(ingestion_id)
2048 .expect("known to exist");
2049 match &mut source_collection.description {
2050 CollectionDescription {
2051 data_source: DataSource::Ingestion(ingestion_desc),
2052 ..
2053 } => ingestion_desc.source_exports.insert(
2054 id,
2055 SourceExport {
2056 storage_metadata: (),
2057 details: details.clone(),
2058 data_config: data_config.clone(),
2059 },
2060 ),
2061 _ => unreachable!(
2062 "SourceExport must only refer to primary sources that already exist"
2063 ),
2064 };
2065
2066 self_collections.insert(id, collection_state);
2067 }
2068 DataSource::Table { .. } => {
2069 if is_in_txns(id, &metadata)
2072 && PartialOrder::less_than(
2073 &collection_state.write_frontier,
2074 &self.initial_txn_upper,
2075 )
2076 {
2077 collection_state
2083 .write_frontier
2084 .clone_from(&self.initial_txn_upper);
2085 }
2086 self_collections.insert(id, collection_state);
2087 }
2088 DataSource::Progress | DataSource::Other => {
2089 self_collections.insert(id, collection_state);
2090 }
2091 DataSource::Ingestion(_) => {
2092 self_collections.insert(id, collection_state);
2093 }
2094 DataSource::Sink { .. } => {
2095 self_collections.insert(id, collection_state);
2096 }
2097 }
2098
2099 self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle);
2100
2101 self.install_collection_dependency_read_holds_inner(&mut *self_collections, id)?;
2103 }
2104
2105 drop(self_collections);
2106
2107 self.synchronize_finalized_shards(storage_metadata);
2108
2109 Ok(())
2110 }
2111
2112 async fn alter_ingestion_source_desc(
2113 &self,
2114 ingestion_id: GlobalId,
2115 source_desc: SourceDesc,
2116 ) -> Result<(), StorageError<Self::Timestamp>> {
2117 let mut self_collections = self.collections.lock().expect("lock poisoned");
2121 let collection = self_collections
2122 .get_mut(&ingestion_id)
2123 .ok_or(StorageError::IdentifierMissing(ingestion_id))?;
2124
2125 let curr_ingestion = match &mut collection.description.data_source {
2126 DataSource::Ingestion(active_ingestion) => active_ingestion,
2127 _ => unreachable!("verified collection refers to ingestion"),
2128 };
2129
2130 curr_ingestion.desc = source_desc;
2131 debug!("altered {ingestion_id}'s SourceDesc");
2132
2133 Ok(())
2134 }
2135
2136 async fn alter_ingestion_export_data_configs(
2137 &self,
2138 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
2139 ) -> Result<(), StorageError<Self::Timestamp>> {
2140 let mut self_collections = self.collections.lock().expect("lock poisoned");
2141
2142 for (source_export_id, new_data_config) in source_exports {
2143 let source_export_collection = self_collections
2146 .get_mut(&source_export_id)
2147 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2148 let ingestion_id = match &mut source_export_collection.description.data_source {
2149 DataSource::IngestionExport {
2150 ingestion_id,
2151 details: _,
2152 data_config,
2153 } => {
2154 *data_config = new_data_config.clone();
2155 *ingestion_id
2156 }
2157 o => {
2158 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2159 Err(StorageError::IdentifierInvalid(source_export_id))?
2160 }
2161 };
2162 let ingestion_collection = self_collections
2165 .get_mut(&ingestion_id)
2166 .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
2167
2168 match &mut ingestion_collection.description.data_source {
2169 DataSource::Ingestion(ingestion_desc) => {
2170 let source_export = ingestion_desc
2171 .source_exports
2172 .get_mut(&source_export_id)
2173 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2174
2175 if source_export.data_config != new_data_config {
2176 tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
2177 source_export.data_config = new_data_config;
2178 } else {
2179 tracing::warn!(
2180 "alter_ingestion_export_data_configs called on \
2181 export {source_export_id} of {ingestion_id} but \
2182 the data config was the same"
2183 );
2184 }
2185 }
2186 o => {
2187 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2188 Err(StorageError::IdentifierInvalid(ingestion_id))?;
2189 }
2190 }
2191 }
2192
2193 Ok(())
2194 }
2195
2196 async fn alter_ingestion_connections(
2197 &self,
2198 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
2199 ) -> Result<(), StorageError<Self::Timestamp>> {
2200 let mut self_collections = self.collections.lock().expect("lock poisoned");
2201
2202 for (id, conn) in source_connections {
2203 let collection = self_collections
2204 .get_mut(&id)
2205 .ok_or_else(|| StorageError::IdentifierMissing(id))?;
2206
2207 match &mut collection.description.data_source {
2208 DataSource::Ingestion(ingestion) => {
2209 if ingestion.desc.connection != conn {
2212 info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
2213 ingestion.desc.connection = conn;
2214 } else {
2215 warn!(
2216 "update_source_connection called on {id} but the \
2217 connection was the same"
2218 );
2219 }
2220 }
2221 o => {
2222 warn!("update_source_connection called on {:?}", o);
2223 Err(StorageError::IdentifierInvalid(id))?;
2224 }
2225 }
2226 }
2227
2228 Ok(())
2229 }
2230
2231 async fn alter_table_desc(
2232 &self,
2233 existing_collection: GlobalId,
2234 new_collection: GlobalId,
2235 new_desc: RelationDesc,
2236 expected_version: RelationVersion,
2237 ) -> Result<(), StorageError<Self::Timestamp>> {
2238 let data_shard = {
2239 let self_collections = self.collections.lock().expect("lock poisoned");
2240 let existing = self_collections
2241 .get(&existing_collection)
2242 .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;
2243
2244 if !matches!(&existing.description.data_source, DataSource::Table { .. }) {
2246 return Err(StorageError::IdentifierInvalid(existing_collection));
2247 }
2248
2249 existing.collection_metadata.data_shard
2250 };
2251
2252 let persist_client = self
2253 .persist
2254 .open(self.persist_location.clone())
2255 .await
2256 .unwrap();
2257
2258 let diagnostics = Diagnostics {
2260 shard_name: existing_collection.to_string(),
2261 handle_purpose: "alter_table_desc".to_string(),
2262 };
2263 let expected_schema = expected_version.into();
2265 let schema_result = persist_client
2266 .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
2267 data_shard,
2268 expected_schema,
2269 &new_desc,
2270 &UnitSchema,
2271 diagnostics,
2272 )
2273 .await
2274 .map_err(|e| StorageError::InvalidUsage(e.to_string()))?;
2275 tracing::info!(
2276 ?existing_collection,
2277 ?new_collection,
2278 ?new_desc,
2279 "evolved schema"
2280 );
2281
2282 match schema_result {
2283 CaESchema::Ok(id) => id,
2284 CaESchema::ExpectedMismatch {
2286 schema_id,
2287 key,
2288 val,
2289 } => {
2290 mz_ore::soft_panic_or_log!(
2291 "schema expectation mismatch {schema_id:?}, {key:?}, {val:?}"
2292 );
2293 return Err(StorageError::Generic(anyhow::anyhow!(
2294 "schema expected mismatch, {existing_collection:?}",
2295 )));
2296 }
2297 CaESchema::Incompatible => {
2298 mz_ore::soft_panic_or_log!(
2299 "incompatible schema! {existing_collection} {new_desc:?}"
2300 );
2301 return Err(StorageError::Generic(anyhow::anyhow!(
2302 "schema incompatible, {existing_collection:?}"
2303 )));
2304 }
2305 };
2306
2307 let (write_handle, since_handle) = self
2309 .open_data_handles(
2310 &new_collection,
2311 data_shard,
2312 None,
2313 new_desc.clone(),
2314 &persist_client,
2315 )
2316 .await;
2317
2318 {
2324 let mut self_collections = self.collections.lock().expect("lock poisoned");
2325
2326 let existing = self_collections
2328 .get_mut(&existing_collection)
2329 .expect("existing collection missing");
2330
2331 assert!(matches!(
2333 existing.description.data_source,
2334 DataSource::Table { primary: None }
2335 ));
2336
2337 existing.description.data_source = DataSource::Table {
2339 primary: Some(new_collection),
2340 };
2341 existing.storage_dependencies.push(new_collection);
2342
2343 let implied_capability = existing.read_capabilities.frontier().to_owned();
2347 let write_frontier = existing.write_frontier.clone();
2348
2349 let mut changes = ChangeBatch::new();
2356 changes.extend(implied_capability.iter().map(|t| (t.clone(), 1)));
2357
2358 let collection_desc = CollectionDescription::for_table(new_desc.clone(), None);
2360 let collection_meta = CollectionMetadata {
2361 persist_location: self.persist_location.clone(),
2362 relation_desc: collection_desc.desc.clone(),
2363 data_shard,
2364 txns_shard: Some(self.txns_read.txns_id().clone()),
2365 };
2366 let collection_state = CollectionState::new(
2367 collection_desc,
2368 implied_capability,
2369 write_frontier,
2370 Vec::new(),
2371 collection_meta,
2372 );
2373
2374 self_collections.insert(new_collection, collection_state);
2376
2377 let mut updates = BTreeMap::from([(new_collection, changes)]);
2378 StorageCollectionsImpl::update_read_capabilities_inner(
2379 &self.cmd_tx,
2380 &mut *self_collections,
2381 &mut updates,
2382 );
2383 };
2384
2385 self.register_handles(new_collection, true, since_handle, write_handle);
2387
2388 info!(%existing_collection, %new_collection, ?new_desc, "altered table");
2389
2390 Ok(())
2391 }
2392
2393 fn drop_collections_unvalidated(
2394 &self,
2395 storage_metadata: &StorageMetadata,
2396 identifiers: Vec<GlobalId>,
2397 ) {
2398 debug!(?identifiers, "drop_collections_unvalidated");
2399
2400 let mut self_collections = self.collections.lock().expect("lock poisoned");
2401
2402 for id in identifiers.iter() {
2403 let metadata = storage_metadata.get_collection_shard::<T>(*id);
2404 mz_ore::soft_assert_or_log!(
2405 matches!(metadata, Err(StorageError::IdentifierMissing(_))),
2406 "dropping {id}, but drop was not synchronized with storage \
2407 controller via `synchronize_collections`"
2408 );
2409
2410 let dropped_data_source = match self_collections.get(id) {
2411 Some(col) => col.description.data_source.clone(),
2412 None => continue,
2413 };
2414
2415 if let DataSource::IngestionExport { ingestion_id, .. } = dropped_data_source {
2418 let ingestion = match self_collections.get_mut(&ingestion_id) {
2420 Some(ingestion) => ingestion,
2421 None => {
2423 tracing::error!(
2424 "primary source {ingestion_id} seemingly dropped before subsource {id}",
2425 );
2426 continue;
2427 }
2428 };
2429
2430 match &mut ingestion.description {
2431 CollectionDescription {
2432 data_source: DataSource::Ingestion(ingestion_desc),
2433 ..
2434 } => {
2435 let removed = ingestion_desc.source_exports.remove(id);
2436 mz_ore::soft_assert_or_log!(
2437 removed.is_some(),
2438 "dropped subsource {id} already removed from source exports"
2439 );
2440 }
2441 _ => unreachable!(
2442 "SourceExport must only refer to primary sources that already exist"
2443 ),
2444 };
2445 }
2446 }
2447
2448 let mut finalized_policies = Vec::new();
2456
2457 for id in identifiers {
2458 if self_collections.contains_key(&id) {
2460 finalized_policies.push((id, ReadPolicy::ValidFrom(Antichain::new())));
2461 }
2462 }
2463 self.set_read_policies_inner(&mut self_collections, finalized_policies);
2464
2465 drop(self_collections);
2466
2467 self.synchronize_finalized_shards(storage_metadata);
2468 }
2469
2470 fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
2471 let mut collections = self.collections.lock().expect("lock poisoned");
2472
2473 if tracing::enabled!(tracing::Level::TRACE) {
2474 let user_capabilities = collections
2475 .iter_mut()
2476 .filter(|(id, _c)| id.is_user())
2477 .map(|(id, c)| {
2478 let updates = c.read_capabilities.updates().cloned().collect_vec();
2479 (*id, c.implied_capability.clone(), updates)
2480 })
2481 .collect_vec();
2482
2483 trace!(?policies, ?user_capabilities, "set_read_policies");
2484 }
2485
2486 self.set_read_policies_inner(&mut collections, policies);
2487
2488 if tracing::enabled!(tracing::Level::TRACE) {
2489 let user_capabilities = collections
2490 .iter_mut()
2491 .filter(|(id, _c)| id.is_user())
2492 .map(|(id, c)| {
2493 let updates = c.read_capabilities.updates().cloned().collect_vec();
2494 (*id, c.implied_capability.clone(), updates)
2495 })
2496 .collect_vec();
2497
2498 trace!(?user_capabilities, "after! set_read_policies");
2499 }
2500 }
2501
2502 fn acquire_read_holds(
2503 &self,
2504 desired_holds: Vec<GlobalId>,
2505 ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError> {
2506 if desired_holds.is_empty() {
2507 return Ok(vec![]);
2508 }
2509
2510 let mut collections = self.collections.lock().expect("lock poisoned");
2511
2512 let mut advanced_holds = Vec::new();
2513 for id in desired_holds.iter() {
2524 let collection = collections
2525 .get(id)
2526 .ok_or(ReadHoldError::CollectionMissing(*id))?;
2527 let since = collection.read_capabilities.frontier().to_owned();
2528 advanced_holds.push((*id, since));
2529 }
2530
2531 let mut updates = advanced_holds
2532 .iter()
2533 .map(|(id, hold)| {
2534 let mut changes = ChangeBatch::new();
2535 changes.extend(hold.iter().map(|time| (time.clone(), 1)));
2536 (*id, changes)
2537 })
2538 .collect::<BTreeMap<_, _>>();
2539
2540 StorageCollectionsImpl::update_read_capabilities_inner(
2541 &self.cmd_tx,
2542 &mut collections,
2543 &mut updates,
2544 );
2545
2546 let acquired_holds = advanced_holds
2547 .into_iter()
2548 .map(|(id, since)| ReadHold::with_channel(id, since, self.holds_tx.clone()))
2549 .collect_vec();
2550
2551 trace!(?desired_holds, ?acquired_holds, "acquire_read_holds");
2552
2553 Ok(acquired_holds)
2554 }
2555
2556 fn determine_time_dependence(
2558 &self,
2559 id: GlobalId,
2560 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
2561 use TimeDependenceError::CollectionMissing;
2562 let collections = self.collections.lock().expect("lock poisoned");
2563 let mut collection = Some(collections.get(&id).ok_or(CollectionMissing(id))?);
2564
2565 let mut result = None;
2566
2567 while let Some(c) = collection.take() {
2568 use DataSource::*;
2569 if let Some(timeline) = &c.description.timeline {
2570 if *timeline != Timeline::EpochMilliseconds {
2572 break;
2573 }
2574 }
2575 match &c.description.data_source {
2576 Ingestion(ingestion) => {
2577 use GenericSourceConnection::*;
2578 match ingestion.desc.connection {
2579 Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => {
2582 result = Some(TimeDependence::default())
2583 }
2584 LoadGenerator(_) => {}
2586 }
2587 }
2588 IngestionExport { ingestion_id, .. } => {
2589 let c = collections
2590 .get(ingestion_id)
2591 .ok_or(CollectionMissing(*ingestion_id))?;
2592 collection = Some(c);
2593 }
2594 Introspection(_) | Progress | Table { .. } | Webhook { .. } => {
2596 result = Some(TimeDependence::default())
2597 }
2598 Other => {}
2600 Sink { .. } => {}
2601 };
2602 }
2603 Ok(result)
2604 }
2605}
2606
2607#[derive(Debug)]
2614enum SinceHandleWrapper<T>
2615where
2616 T: TimelyTimestamp + Lattice + Codec64,
2617{
2618 Critical(SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch>),
2619 Leased(ReadHandle<SourceData, (), T, StorageDiff>),
2620}
2621
2622impl<T> SinceHandleWrapper<T>
2623where
2624 T: TimelyTimestamp + Lattice + Codec64 + TotalOrder + Sync,
2625{
2626 pub fn since(&self) -> &Antichain<T> {
2627 match self {
2628 Self::Critical(handle) => handle.since(),
2629 Self::Leased(handle) => handle.since(),
2630 }
2631 }
2632
2633 pub fn opaque(&self) -> PersistEpoch {
2634 match self {
2635 Self::Critical(handle) => handle.opaque().clone(),
2636 Self::Leased(_handle) => {
2637 PersistEpoch(None)
2642 }
2643 }
2644 }
2645
2646 pub async fn compare_and_downgrade_since(
2647 &mut self,
2648 expected: &PersistEpoch,
2649 new: (&PersistEpoch, &Antichain<T>),
2650 ) -> Result<Antichain<T>, PersistEpoch> {
2651 match self {
2652 Self::Critical(handle) => handle.compare_and_downgrade_since(expected, new).await,
2653 Self::Leased(handle) => {
2654 let (opaque, since) = new;
2655 assert_none!(opaque.0);
2656
2657 handle.downgrade_since(since).await;
2658
2659 Ok(since.clone())
2660 }
2661 }
2662 }
2663
2664 pub async fn maybe_compare_and_downgrade_since(
2665 &mut self,
2666 expected: &PersistEpoch,
2667 new: (&PersistEpoch, &Antichain<T>),
2668 ) -> Option<Result<Antichain<T>, PersistEpoch>> {
2669 match self {
2670 Self::Critical(handle) => {
2671 handle
2672 .maybe_compare_and_downgrade_since(expected, new)
2673 .await
2674 }
2675 Self::Leased(handle) => {
2676 let (opaque, since) = new;
2677 assert_none!(opaque.0);
2678
2679 handle.maybe_downgrade_since(since).await;
2680
2681 Some(Ok(since.clone()))
2682 }
2683 }
2684 }
2685
2686 pub fn snapshot_stats(
2687 &self,
2688 id: GlobalId,
2689 as_of: Option<Antichain<T>>,
2690 ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2691 match self {
2692 Self::Critical(handle) => {
2693 let res = handle
2694 .snapshot_stats(as_of)
2695 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2696 Box::pin(res)
2697 }
2698 Self::Leased(handle) => {
2699 let res = handle
2700 .snapshot_stats(as_of)
2701 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2702 Box::pin(res)
2703 }
2704 }
2705 }
2706
2707 pub fn snapshot_stats_from_txn(
2708 &self,
2709 id: GlobalId,
2710 data_snapshot: DataSnapshot<T>,
2711 ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2712 match self {
2713 Self::Critical(handle) => Box::pin(
2714 data_snapshot
2715 .snapshot_stats_from_critical(handle)
2716 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2717 ),
2718 Self::Leased(handle) => Box::pin(
2719 data_snapshot
2720 .snapshot_stats_from_leased(handle)
2721 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2722 ),
2723 }
2724 }
2725}
2726
2727#[derive(Debug, Clone)]
2729struct CollectionState<T> {
2730 pub description: CollectionDescription<T>,
2732
2733 pub read_capabilities: MutableAntichain<T>,
2739
2740 pub implied_capability: Antichain<T>,
2744
2745 pub read_policy: ReadPolicy<T>,
2747
2748 pub storage_dependencies: Vec<GlobalId>,
2750
2751 pub write_frontier: Antichain<T>,
2753
2754 pub collection_metadata: CollectionMetadata,
2755}
2756
2757impl<T: TimelyTimestamp> CollectionState<T> {
2758 pub fn new(
2761 description: CollectionDescription<T>,
2762 since: Antichain<T>,
2763 write_frontier: Antichain<T>,
2764 storage_dependencies: Vec<GlobalId>,
2765 metadata: CollectionMetadata,
2766 ) -> Self {
2767 let mut read_capabilities = MutableAntichain::new();
2768 read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2769 Self {
2770 description,
2771 read_capabilities,
2772 implied_capability: since.clone(),
2773 read_policy: ReadPolicy::NoPolicy {
2774 initial_since: since,
2775 },
2776 storage_dependencies,
2777 write_frontier,
2778 collection_metadata: metadata,
2779 }
2780 }
2781
2782 pub fn is_dropped(&self) -> bool {
2784 self.read_capabilities.is_empty()
2785 }
2786}
2787
2788#[derive(Debug)]
2794struct BackgroundTask<T: TimelyTimestamp + Lattice + Codec64> {
2795 config: Arc<Mutex<StorageConfiguration>>,
2796 cmds_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
2797 cmds_rx: mpsc::UnboundedReceiver<BackgroundCmd<T>>,
2798 holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch<T>)>,
2799 finalizable_shards: Arc<ShardIdSet>,
2800 collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
2801 shard_by_id: BTreeMap<GlobalId, ShardId>,
2804 since_handles: BTreeMap<GlobalId, SinceHandleWrapper<T>>,
2805 txns_handle: Option<WriteHandle<SourceData, (), T, StorageDiff>>,
2806 txns_shards: BTreeSet<GlobalId>,
2807}
2808
2809#[derive(Debug)]
2810enum BackgroundCmd<T: TimelyTimestamp + Lattice + Codec64> {
2811 Register {
2812 id: GlobalId,
2813 is_in_txns: bool,
2814 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
2815 since_handle: SinceHandleWrapper<T>,
2816 },
2817 DowngradeSince(Vec<(GlobalId, Antichain<T>)>),
2818 SnapshotStats(
2819 GlobalId,
2820 SnapshotStatsAsOf<T>,
2821 oneshot::Sender<SnapshotStatsRes<T>>,
2822 ),
2823}
2824
2825pub(crate) struct SnapshotStatsRes<T>(BoxFuture<'static, Result<SnapshotStats, StorageError<T>>>);
2827
2828impl<T> Debug for SnapshotStatsRes<T> {
2829 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2830 f.debug_struct("SnapshotStatsRes").finish_non_exhaustive()
2831 }
2832}
2833
2834impl<T> BackgroundTask<T>
2835where
2836 T: TimelyTimestamp
2837 + Lattice
2838 + Codec64
2839 + From<EpochMillis>
2840 + TimestampManipulation
2841 + Into<mz_repr::Timestamp>
2842 + Sync,
2843{
2844 async fn run(&mut self) {
2845 let mut upper_futures: FuturesUnordered<
2847 std::pin::Pin<
2848 Box<
2849 dyn Future<
2850 Output = (
2851 GlobalId,
2852 WriteHandle<SourceData, (), T, StorageDiff>,
2853 Antichain<T>,
2854 ),
2855 > + Send,
2856 >,
2857 >,
2858 > = FuturesUnordered::new();
2859
2860 let gen_upper_future =
2861 |id, mut handle: WriteHandle<_, _, _, _>, prev_upper: Antichain<T>| {
2862 let fut = async move {
2863 soft_assert_or_log!(
2864 !prev_upper.is_empty(),
2865 "cannot await progress when upper is already empty"
2866 );
2867 handle.wait_for_upper_past(&prev_upper).await;
2868 let new_upper = handle.shared_upper();
2869 (id, handle, new_upper)
2870 };
2871
2872 fut
2873 };
2874
2875 let mut txns_upper_future = match self.txns_handle.take() {
2876 Some(txns_handle) => {
2877 let upper = txns_handle.upper().clone();
2878 let txns_upper_future =
2879 gen_upper_future(GlobalId::Transient(1), txns_handle, upper);
2880 txns_upper_future.boxed()
2881 }
2882 None => async { std::future::pending().await }.boxed(),
2883 };
2884
2885 loop {
2886 tokio::select! {
2887 (id, handle, upper) = &mut txns_upper_future => {
2888 trace!("new upper from txns shard: {:?}", upper);
2889 let mut uppers = Vec::new();
2890 for id in self.txns_shards.iter() {
2891 uppers.push((*id, &upper));
2892 }
2893 self.update_write_frontiers(&uppers).await;
2894
2895 let fut = gen_upper_future(id, handle, upper);
2896 txns_upper_future = fut.boxed();
2897 }
2898 Some((id, handle, upper)) = upper_futures.next() => {
2899 if id.is_user() {
2900 trace!("new upper for collection {id}: {:?}", upper);
2901 }
2902 let current_shard = self.shard_by_id.get(&id);
2903 if let Some(shard_id) = current_shard {
2904 if shard_id == &handle.shard_id() {
2905 let uppers = &[(id, &upper)];
2908 self.update_write_frontiers(uppers).await;
2909 if !upper.is_empty() {
2910 let fut = gen_upper_future(id, handle, upper);
2911 upper_futures.push(fut.boxed());
2912 }
2913 } else {
2914 handle.expire().await;
2918 }
2919 }
2920 }
2921 cmd = self.cmds_rx.recv() => {
2922 let cmd = if let Some(cmd) = cmd {
2923 cmd
2924 } else {
2925 break;
2927 };
2928
2929 match cmd {
2930 BackgroundCmd::Register{ id, is_in_txns, write_handle, since_handle } => {
2931 debug!("registering handles for {}", id);
2932 let previous = self.shard_by_id.insert(id, write_handle.shard_id());
2933 if previous.is_some() {
2934 panic!("already registered a WriteHandle for collection {id}");
2935 }
2936
2937 let previous = self.since_handles.insert(id, since_handle);
2938 if previous.is_some() {
2939 panic!("already registered a SinceHandle for collection {id}");
2940 }
2941
2942 if is_in_txns {
2943 self.txns_shards.insert(id);
2944 } else {
2945 let upper = write_handle.upper().clone();
2946 if !upper.is_empty() {
2947 let fut = gen_upper_future(id, write_handle, upper);
2948 upper_futures.push(fut.boxed());
2949 }
2950 }
2951
2952 }
2953 BackgroundCmd::DowngradeSince(cmds) => {
2954 self.downgrade_sinces(cmds).await;
2955 }
2956 BackgroundCmd::SnapshotStats(id, as_of, tx) => {
2957 let res = match self.since_handles.get(&id) {
2963 Some(x) => {
2964 let fut: BoxFuture<
2965 'static,
2966 Result<SnapshotStats, StorageError<T>>,
2967 > = match as_of {
2968 SnapshotStatsAsOf::Direct(as_of) => {
2969 x.snapshot_stats(id, Some(as_of))
2970 }
2971 SnapshotStatsAsOf::Txns(data_snapshot) => {
2972 x.snapshot_stats_from_txn(id, data_snapshot)
2973 }
2974 };
2975 SnapshotStatsRes(fut)
2976 }
2977 None => SnapshotStatsRes(Box::pin(futures::future::ready(Err(
2978 StorageError::IdentifierMissing(id),
2979 )))),
2980 };
2981 let _ = tx.send(res);
2983 }
2984 }
2985 }
2986 Some(holds_changes) = self.holds_rx.recv() => {
2987 let mut batched_changes = BTreeMap::new();
2988 batched_changes.insert(holds_changes.0, holds_changes.1);
2989
2990 while let Ok(mut holds_changes) = self.holds_rx.try_recv() {
2991 let entry = batched_changes.entry(holds_changes.0);
2992 entry
2993 .and_modify(|existing| existing.extend(holds_changes.1.drain()))
2994 .or_insert_with(|| holds_changes.1);
2995 }
2996
2997 let mut collections = self.collections.lock().expect("lock poisoned");
2998
2999 let user_changes = batched_changes
3000 .iter()
3001 .filter(|(id, _c)| id.is_user())
3002 .map(|(id, c)| {
3003 (id.clone(), c.clone())
3004 })
3005 .collect_vec();
3006
3007 if !user_changes.is_empty() {
3008 trace!(?user_changes, "applying holds changes from channel");
3009 }
3010
3011 StorageCollectionsImpl::update_read_capabilities_inner(
3012 &self.cmds_tx,
3013 &mut collections,
3014 &mut batched_changes,
3015 );
3016 }
3017 }
3018 }
3019
3020 warn!("BackgroundTask shutting down");
3021 }
3022
3023 #[instrument(level = "debug")]
3024 async fn update_write_frontiers(&mut self, updates: &[(GlobalId, &Antichain<T>)]) {
3025 let mut read_capability_changes = BTreeMap::default();
3026
3027 let mut self_collections = self.collections.lock().expect("lock poisoned");
3028
3029 for (id, new_upper) in updates.iter() {
3030 let collection = if let Some(c) = self_collections.get_mut(id) {
3031 c
3032 } else {
3033 trace!(
3034 "Reference to absent collection {id}, due to concurrent removal of that collection"
3035 );
3036 continue;
3037 };
3038
3039 if PartialOrder::less_than(&collection.write_frontier, *new_upper) {
3040 collection.write_frontier.clone_from(new_upper);
3041 }
3042
3043 let mut new_read_capability = collection
3044 .read_policy
3045 .frontier(collection.write_frontier.borrow());
3046
3047 if id.is_user() {
3048 trace!(
3049 %id,
3050 implied_capability = ?collection.implied_capability,
3051 policy = ?collection.read_policy,
3052 write_frontier = ?collection.write_frontier,
3053 ?new_read_capability,
3054 "update_write_frontiers");
3055 }
3056
3057 if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
3058 let mut update = ChangeBatch::new();
3059 update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
3060 std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
3061 update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
3062
3063 if !update.is_empty() {
3064 read_capability_changes.insert(*id, update);
3065 }
3066 }
3067 }
3068
3069 if !read_capability_changes.is_empty() {
3070 StorageCollectionsImpl::update_read_capabilities_inner(
3071 &self.cmds_tx,
3072 &mut self_collections,
3073 &mut read_capability_changes,
3074 );
3075 }
3076 }
3077
3078 async fn downgrade_sinces(&mut self, cmds: Vec<(GlobalId, Antichain<T>)>) {
3079 for (id, new_since) in cmds {
3080 let since_handle = if let Some(c) = self.since_handles.get_mut(&id) {
3081 c
3082 } else {
3083 trace!("downgrade_sinces: reference to absent collection {id}");
3085 continue;
3086 };
3087
3088 if id.is_user() {
3089 trace!("downgrading since of {} to {:?}", id, new_since);
3090 }
3091
3092 let epoch = since_handle.opaque().clone();
3093 let result = if new_since.is_empty() {
3094 let res = Some(
3098 since_handle
3099 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3100 .await,
3101 );
3102
3103 info!(%id, "removing persist handles because the since advanced to []!");
3104
3105 let _since_handle = self.since_handles.remove(&id).expect("known to exist");
3106 let dropped_shard_id = if let Some(shard_id) = self.shard_by_id.remove(&id) {
3107 shard_id
3108 } else {
3109 panic!("missing GlobalId -> ShardId mapping for id {id}");
3110 };
3111
3112 self.txns_shards.remove(&id);
3117
3118 if !self
3119 .config
3120 .lock()
3121 .expect("lock poisoned")
3122 .parameters
3123 .finalize_shards
3124 {
3125 info!(
3126 "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3127 );
3128 return;
3129 }
3130
3131 info!(%id, %dropped_shard_id, "enqueing shard finalization due to dropped collection and dropped persist handle");
3132
3133 self.finalizable_shards.lock().insert(dropped_shard_id);
3134
3135 res
3136 } else {
3137 since_handle
3138 .maybe_compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3139 .await
3140 };
3141
3142 if let Some(Err(other_epoch)) = result {
3143 mz_ore::halt!("fenced by envd @ {other_epoch:?}. ours = {epoch:?}");
3144 }
3145 }
3146 }
3147}
3148
3149struct FinalizeShardsTaskConfig {
3150 envd_epoch: NonZeroI64,
3151 config: Arc<Mutex<StorageConfiguration>>,
3152 metrics: StorageCollectionsMetrics,
3153 finalizable_shards: Arc<ShardIdSet>,
3154 finalized_shards: Arc<ShardIdSet>,
3155 persist_location: PersistLocation,
3156 persist: Arc<PersistClientCache>,
3157 read_only: bool,
3158}
3159
3160async fn finalize_shards_task<T>(
3161 FinalizeShardsTaskConfig {
3162 envd_epoch,
3163 config,
3164 metrics,
3165 finalizable_shards,
3166 finalized_shards,
3167 persist_location,
3168 persist,
3169 read_only,
3170 }: FinalizeShardsTaskConfig,
3171) where
3172 T: TimelyTimestamp + TotalOrder + Lattice + Codec64 + Sync,
3173{
3174 if read_only {
3175 info!("disabling shard finalization in read only mode");
3176 return;
3177 }
3178
3179 let mut interval = tokio::time::interval(Duration::from_secs(5));
3180 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
3181 loop {
3182 interval.tick().await;
3183
3184 if !config
3185 .lock()
3186 .expect("lock poisoned")
3187 .parameters
3188 .finalize_shards
3189 {
3190 debug!(
3191 "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3192 );
3193 continue;
3194 }
3195
3196 let current_finalizable_shards = {
3197 finalizable_shards.lock().iter().cloned().collect_vec()
3200 };
3201
3202 if current_finalizable_shards.is_empty() {
3203 debug!("no shards to finalize");
3204 continue;
3205 }
3206
3207 debug!(?current_finalizable_shards, "attempting to finalize shards");
3208
3209 let persist_client = persist.open(persist_location.clone()).await.unwrap();
3211
3212 let metrics = &metrics;
3213 let finalizable_shards = &finalizable_shards;
3214 let finalized_shards = &finalized_shards;
3215 let persist_client = &persist_client;
3216 let diagnostics = &Diagnostics::from_purpose("finalizing shards");
3217
3218 let force_downgrade_since = STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION
3219 .get(config.lock().expect("lock poisoned").config_set());
3220
3221 let epoch = &PersistEpoch::from(envd_epoch);
3222
3223 futures::stream::iter(current_finalizable_shards.clone())
3224 .map(|shard_id| async move {
3225 let persist_client = persist_client.clone();
3226 let diagnostics = diagnostics.clone();
3227 let epoch = epoch.clone();
3228
3229 metrics.finalization_started.inc();
3230
3231 let is_finalized = persist_client
3232 .is_finalized::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
3233 .await
3234 .expect("invalid persist usage");
3235
3236 if is_finalized {
3237 debug!(%shard_id, "shard is already finalized!");
3238 Some(shard_id)
3239 } else {
3240 debug!(%shard_id, "finalizing shard");
3241 let finalize = || async move {
3242 let diagnostics = Diagnostics::from_purpose("finalizing shards");
3244
3245 let schemas = persist_client.latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics.clone()).await.expect("codecs have not changed");
3246 let (key_schema, val_schema) = match schemas {
3247 Some((_, key_schema, val_schema)) => (key_schema, val_schema),
3248 None => (RelationDesc::empty(), UnitSchema),
3249 };
3250
3251 let empty_batch: Vec<((SourceData, ()), T, StorageDiff)> = vec![];
3252 let mut write_handle: WriteHandle<SourceData, (), T, StorageDiff> =
3253 persist_client
3254 .open_writer(
3255 shard_id,
3256 Arc::new(key_schema),
3257 Arc::new(val_schema),
3258 diagnostics,
3259 )
3260 .await
3261 .expect("invalid persist usage");
3262
3263 let upper = write_handle.upper();
3264
3265 if !upper.is_empty() {
3266 let append = write_handle
3267 .append(empty_batch, upper.clone(), Antichain::new())
3268 .await?;
3269
3270 if let Err(e) = append {
3271 warn!(%shard_id, "tried to finalize a shard with an advancing upper: {e:?}");
3272 return Ok(());
3273 }
3274 }
3275 write_handle.expire().await;
3276
3277 if force_downgrade_since {
3278 let mut since_handle: SinceHandle<
3279 SourceData,
3280 (),
3281 T,
3282 StorageDiff,
3283 PersistEpoch,
3284 > = persist_client
3285 .open_critical_since(
3286 shard_id,
3287 PersistClient::CONTROLLER_CRITICAL_SINCE,
3288 Diagnostics::from_purpose("finalizing shards"),
3289 )
3290 .await
3291 .expect("invalid persist usage");
3292 let handle_epoch = since_handle.opaque().clone();
3293 let our_epoch = epoch.clone();
3294 let epoch = if our_epoch.0 > handle_epoch.0 {
3295 handle_epoch
3298 } else {
3299 our_epoch
3304 };
3305 let new_since = Antichain::new();
3306 let downgrade = since_handle
3307 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3308 .await;
3309 if let Err(e) = downgrade {
3310 warn!(
3311 "tried to finalize a shard with an advancing epoch: {e:?}"
3312 );
3313 return Ok(());
3314 }
3315 }
3318
3319 persist_client
3320 .finalize_shard::<SourceData, (), T, StorageDiff>(
3321 shard_id,
3322 Diagnostics::from_purpose("finalizing shards"),
3323 )
3324 .await
3325 };
3326
3327 match finalize().await {
3328 Err(e) => {
3329 warn!("error during finalization of shard {shard_id}: {e:?}");
3332 None
3333 }
3334 Ok(()) => {
3335 debug!(%shard_id, "finalize success!");
3336 Some(shard_id)
3337 }
3338 }
3339 }
3340 })
3341 .buffer_unordered(10)
3346 .for_each(|shard_id| async move {
3350 match shard_id {
3351 None => metrics.finalization_failed.inc(),
3352 Some(shard_id) => {
3353 {
3360 let mut finalizable_shards = finalizable_shards.lock();
3361 let mut finalized_shards = finalized_shards.lock();
3362 finalizable_shards.remove(&shard_id);
3363 finalized_shards.insert(shard_id);
3364 }
3365
3366 metrics.finalization_succeeded.inc();
3367 }
3368 }
3369 })
3370 .await;
3371
3372 debug!("done finalizing shards");
3373 }
3374}
3375
3376#[derive(Debug)]
3377pub(crate) enum SnapshotStatsAsOf<T: TimelyTimestamp + Lattice + Codec64> {
3378 Direct(Antichain<T>),
3381 Txns(DataSnapshot<T>),
3384}
3385
3386#[cfg(test)]
3387mod tests {
3388 use std::str::FromStr;
3389 use std::sync::Arc;
3390
3391 use mz_build_info::DUMMY_BUILD_INFO;
3392 use mz_dyncfg::ConfigSet;
3393 use mz_ore::assert_err;
3394 use mz_ore::metrics::{MetricsRegistry, UIntGauge};
3395 use mz_ore::now::SYSTEM_TIME;
3396 use mz_ore::url::SensitiveUrl;
3397 use mz_persist_client::cache::PersistClientCache;
3398 use mz_persist_client::cfg::PersistConfig;
3399 use mz_persist_client::rpc::PubSubClientConnection;
3400 use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
3401 use mz_persist_types::codec_impls::UnitSchema;
3402 use mz_repr::{RelationDesc, Row};
3403 use mz_secrets::InMemorySecretsController;
3404
3405 use super::*;
3406
3407 #[mz_ore::test(tokio::test)]
3408 #[cfg_attr(miri, ignore)] async fn test_snapshot_stats(&self) {
3410 let persist_location = PersistLocation {
3411 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3412 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3413 };
3414 let persist_client = PersistClientCache::new(
3415 PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
3416 &MetricsRegistry::new(),
3417 |_, _| PubSubClientConnection::noop(),
3418 );
3419 let persist_client = Arc::new(persist_client);
3420
3421 let (cmds_tx, mut background_task) =
3422 BackgroundTask::new_for_test(persist_location.clone(), Arc::clone(&persist_client));
3423 let background_task =
3424 mz_ore::task::spawn(|| "storage_collections::background_task", async move {
3425 background_task.run().await
3426 });
3427
3428 let persist = persist_client.open(persist_location).await.unwrap();
3429
3430 let shard_id = ShardId::new();
3431 let since_handle = persist
3432 .open_critical_since(
3433 shard_id,
3434 PersistClient::CONTROLLER_CRITICAL_SINCE,
3435 Diagnostics::for_tests(),
3436 )
3437 .await
3438 .unwrap();
3439 let write_handle = persist
3440 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3441 shard_id,
3442 Arc::new(RelationDesc::empty()),
3443 Arc::new(UnitSchema),
3444 Diagnostics::for_tests(),
3445 )
3446 .await
3447 .unwrap();
3448
3449 cmds_tx
3450 .send(BackgroundCmd::Register {
3451 id: GlobalId::User(1),
3452 is_in_txns: false,
3453 since_handle: SinceHandleWrapper::Critical(since_handle),
3454 write_handle,
3455 })
3456 .unwrap();
3457
3458 let mut write_handle = persist
3459 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3460 shard_id,
3461 Arc::new(RelationDesc::empty()),
3462 Arc::new(UnitSchema),
3463 Diagnostics::for_tests(),
3464 )
3465 .await
3466 .unwrap();
3467
3468 let stats =
3470 snapshot_stats(&cmds_tx, GlobalId::User(2), Antichain::from_elem(0.into())).await;
3471 assert_err!(stats);
3472
3473 let stats_fut = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3475 assert_none!(stats_fut.now_or_never());
3476
3477 let stats_ts1_fut =
3479 snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3480
3481 let data = (
3483 (SourceData(Ok(Row::default())), ()),
3484 mz_repr::Timestamp::from(0),
3485 1i64,
3486 );
3487 let () = write_handle
3488 .compare_and_append(
3489 &[data],
3490 Antichain::from_elem(0.into()),
3491 Antichain::from_elem(1.into()),
3492 )
3493 .await
3494 .unwrap()
3495 .unwrap();
3496
3497 let stats = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(0.into()))
3499 .await
3500 .unwrap();
3501 assert_eq!(stats.num_updates, 1);
3502
3503 let data = (
3505 (SourceData(Ok(Row::default())), ()),
3506 mz_repr::Timestamp::from(1),
3507 1i64,
3508 );
3509 let () = write_handle
3510 .compare_and_append(
3511 &[data],
3512 Antichain::from_elem(1.into()),
3513 Antichain::from_elem(2.into()),
3514 )
3515 .await
3516 .unwrap()
3517 .unwrap();
3518
3519 let stats = stats_ts1_fut.await.unwrap();
3520 assert_eq!(stats.num_updates, 2);
3521
3522 drop(background_task);
3524 }
3525
3526 async fn snapshot_stats<T: TimelyTimestamp + Lattice + Codec64>(
3527 cmds_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
3528 id: GlobalId,
3529 as_of: Antichain<T>,
3530 ) -> Result<SnapshotStats, StorageError<T>> {
3531 let (tx, rx) = oneshot::channel();
3532 cmds_tx
3533 .send(BackgroundCmd::SnapshotStats(
3534 id,
3535 SnapshotStatsAsOf::Direct(as_of),
3536 tx,
3537 ))
3538 .unwrap();
3539 let res = rx.await.expect("BackgroundTask should be live").0;
3540
3541 res.await
3542 }
3543
3544 impl<T: TimelyTimestamp + Lattice + Codec64> BackgroundTask<T> {
3545 fn new_for_test(
3546 _persist_location: PersistLocation,
3547 _persist_client: Arc<PersistClientCache>,
3548 ) -> (mpsc::UnboundedSender<BackgroundCmd<T>>, Self) {
3549 let (cmds_tx, cmds_rx) = mpsc::unbounded_channel();
3550 let (_holds_tx, holds_rx) = mpsc::unbounded_channel();
3551 let connection_context =
3552 ConnectionContext::for_tests(Arc::new(InMemorySecretsController::new()));
3553
3554 let task = Self {
3555 config: Arc::new(Mutex::new(StorageConfiguration::new(
3556 connection_context,
3557 ConfigSet::default(),
3558 ))),
3559 cmds_tx: cmds_tx.clone(),
3560 cmds_rx,
3561 holds_rx,
3562 finalizable_shards: Arc::new(ShardIdSet::new(
3563 UIntGauge::new("finalizable_shards", "dummy gauge for tests").unwrap(),
3564 )),
3565 collections: Arc::new(Mutex::new(BTreeMap::new())),
3566 shard_by_id: BTreeMap::new(),
3567 since_handles: BTreeMap::new(),
3568 txns_handle: None,
3569 txns_shards: BTreeSet::new(),
3570 };
3571
3572 (cmds_tx, task)
3573 }
3574 }
3575}