1use std::cmp::Reverse;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::num::NonZeroI64;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use async_trait::async_trait;
20use differential_dataflow::lattice::Lattice;
21use futures::future::BoxFuture;
22use futures::stream::{BoxStream, FuturesUnordered};
23use futures::{Future, FutureExt, StreamExt};
24use itertools::Itertools;
25
26use mz_ore::collections::CollectionExt;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::now::{EpochMillis, NowFn};
29use mz_ore::task::AbortOnDropHandle;
30use mz_ore::{assert_none, instrument, soft_assert_or_log};
31use mz_persist_client::cache::PersistClientCache;
32use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
33use mz_persist_client::critical::SinceHandle;
34use mz_persist_client::read::{Cursor, ReadHandle};
35use mz_persist_client::schema::CaESchema;
36use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
37use mz_persist_client::write::WriteHandle;
38use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
39use mz_persist_types::Codec64;
40use mz_persist_types::codec_impls::UnitSchema;
41use mz_persist_types::txn::TxnsCodec;
42use mz_repr::{GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
43use mz_storage_types::StorageDiff;
44use mz_storage_types::configuration::StorageConfiguration;
45use mz_storage_types::connections::ConnectionContext;
46use mz_storage_types::connections::inline::InlinedConnection;
47use mz_storage_types::controller::{CollectionMetadata, StorageError, TxnsCodecRow};
48use mz_storage_types::dyncfgs::STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION;
49use mz_storage_types::errors::CollectionMissing;
50use mz_storage_types::parameters::StorageParameters;
51use mz_storage_types::read_holds::ReadHold;
52use mz_storage_types::read_policy::ReadPolicy;
53use mz_storage_types::sources::{
54 GenericSourceConnection, SourceData, SourceDesc, SourceEnvelope, SourceExport,
55 SourceExportDataConfig, Timeline,
56};
57use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
58use mz_txn_wal::metrics::Metrics as TxnMetrics;
59use mz_txn_wal::txn_read::{DataSnapshot, TxnsRead};
60use mz_txn_wal::txns::TxnsHandle;
61use timely::PartialOrder;
62use timely::order::TotalOrder;
63use timely::progress::frontier::MutableAntichain;
64use timely::progress::{Antichain, ChangeBatch, Timestamp as TimelyTimestamp};
65use tokio::sync::{mpsc, oneshot};
66use tokio::time::MissedTickBehavior;
67use tracing::{debug, info, trace, warn};
68
69use crate::client::TimestamplessUpdateBuilder;
70use crate::controller::{
71 CollectionDescription, DataSource, PersistEpoch, StorageMetadata, StorageTxn,
72};
73use crate::storage_collections::metrics::{ShardIdSet, StorageCollectionsMetrics};
74
75mod metrics;
76
77#[async_trait]
91pub trait StorageCollections: Debug + Sync {
92 type Timestamp: TimelyTimestamp;
93
94 async fn initialize_state(
101 &self,
102 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
103 init_ids: BTreeSet<GlobalId>,
104 ) -> Result<(), StorageError<Self::Timestamp>>;
105
106 fn update_parameters(&self, config_params: StorageParameters);
108
109 fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing>;
111
112 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
118
119 fn collection_frontiers(
121 &self,
122 id: GlobalId,
123 ) -> Result<CollectionFrontiers<Self::Timestamp>, CollectionMissing> {
124 let frontiers = self
125 .collections_frontiers(vec![id])?
126 .expect_element(|| "known to exist");
127
128 Ok(frontiers)
129 }
130
131 fn collections_frontiers(
134 &self,
135 id: Vec<GlobalId>,
136 ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, CollectionMissing>;
137
138 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>>;
143
144 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
147
148 async fn snapshot_stats(
151 &self,
152 id: GlobalId,
153 as_of: Antichain<Self::Timestamp>,
154 ) -> Result<SnapshotStats, StorageError<Self::Timestamp>>;
155
156 async fn snapshot_parts_stats(
165 &self,
166 id: GlobalId,
167 as_of: Antichain<Self::Timestamp>,
168 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>;
169
170 fn snapshot(
172 &self,
173 id: GlobalId,
174 as_of: Self::Timestamp,
175 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>;
176
177 async fn snapshot_latest(
180 &self,
181 id: GlobalId,
182 ) -> Result<Vec<Row>, StorageError<Self::Timestamp>>;
183
184 fn snapshot_cursor(
186 &self,
187 id: GlobalId,
188 as_of: Self::Timestamp,
189 ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
190 where
191 Self::Timestamp: Codec64 + TimelyTimestamp + Lattice;
192
193 fn snapshot_and_stream(
198 &self,
199 id: GlobalId,
200 as_of: Self::Timestamp,
201 ) -> BoxFuture<
202 'static,
203 Result<
204 BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
205 StorageError<Self::Timestamp>,
206 >,
207 >;
208
209 fn create_update_builder(
212 &self,
213 id: GlobalId,
214 ) -> BoxFuture<
215 'static,
216 Result<
217 TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
218 StorageError<Self::Timestamp>,
219 >,
220 >
221 where
222 Self::Timestamp: Lattice + Codec64;
223
224 async fn prepare_state(
230 &self,
231 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
232 ids_to_add: BTreeSet<GlobalId>,
233 ids_to_drop: BTreeSet<GlobalId>,
234 ids_to_register: BTreeMap<GlobalId, ShardId>,
235 ) -> Result<(), StorageError<Self::Timestamp>>;
236
237 async fn create_collections_for_bootstrap(
263 &self,
264 storage_metadata: &StorageMetadata,
265 register_ts: Option<Self::Timestamp>,
266 collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
267 migrated_storage_collections: &BTreeSet<GlobalId>,
268 ) -> Result<(), StorageError<Self::Timestamp>>;
269
270 async fn alter_ingestion_source_desc(
278 &self,
279 ingestion_id: GlobalId,
280 source_desc: SourceDesc,
281 ) -> Result<(), StorageError<Self::Timestamp>>;
282
283 async fn alter_ingestion_export_data_configs(
285 &self,
286 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
287 ) -> Result<(), StorageError<Self::Timestamp>>;
288
289 async fn alter_ingestion_connections(
294 &self,
295 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
296 ) -> Result<(), StorageError<Self::Timestamp>>;
297
298 async fn alter_table_desc(
300 &self,
301 existing_collection: GlobalId,
302 new_collection: GlobalId,
303 new_desc: RelationDesc,
304 expected_version: RelationVersion,
305 ) -> Result<(), StorageError<Self::Timestamp>>;
306
307 fn drop_collections_unvalidated(
319 &self,
320 storage_metadata: &StorageMetadata,
321 identifiers: Vec<GlobalId>,
322 );
323
324 fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>);
338
339 fn acquire_read_holds(
342 &self,
343 desired_holds: Vec<GlobalId>,
344 ) -> Result<Vec<ReadHold<Self::Timestamp>>, CollectionMissing>;
345
346 fn determine_time_dependence(
349 &self,
350 id: GlobalId,
351 ) -> Result<Option<TimeDependence>, TimeDependenceError>;
352}
353
354pub struct SnapshotCursor<T: Codec64 + TimelyTimestamp + Lattice> {
357 pub _read_handle: ReadHandle<SourceData, (), T, StorageDiff>,
360 pub cursor: Cursor<SourceData, (), T, StorageDiff>,
361}
362
363impl<T: Codec64 + TimelyTimestamp + Lattice + Sync> SnapshotCursor<T> {
364 pub async fn next(
365 &mut self,
366 ) -> Option<
367 impl Iterator<
368 Item = (
369 (Result<SourceData, String>, Result<(), String>),
370 T,
371 StorageDiff,
372 ),
373 > + Sized
374 + '_,
375 > {
376 self.cursor.next().await
377 }
378}
379
380#[derive(Debug)]
382pub struct CollectionFrontiers<T> {
383 pub id: GlobalId,
385
386 pub write_frontier: Antichain<T>,
388
389 pub implied_capability: Antichain<T>,
396
397 pub read_capabilities: Antichain<T>,
400}
401
402#[derive(Debug, Clone)]
405pub struct StorageCollectionsImpl<
406 T: TimelyTimestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
407> {
408 envd_epoch: NonZeroI64,
411
412 read_only: bool,
418
419 finalizable_shards: Arc<ShardIdSet>,
422
423 finalized_shards: Arc<ShardIdSet>,
428
429 collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
431
432 txns_read: TxnsRead<T>,
434
435 config: Arc<Mutex<StorageConfiguration>>,
437
438 initial_txn_upper: Antichain<T>,
447
448 persist_location: PersistLocation,
450
451 persist: Arc<PersistClientCache>,
453
454 cmd_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
456
457 holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<T>)>,
459
460 _background_task: Arc<AbortOnDropHandle<()>>,
462 _finalize_shards_task: Arc<AbortOnDropHandle<()>>,
463}
464
465impl<T> StorageCollectionsImpl<T>
475where
476 T: TimelyTimestamp
477 + Lattice
478 + Codec64
479 + From<EpochMillis>
480 + TimestampManipulation
481 + Into<mz_repr::Timestamp>
482 + Sync,
483{
484 pub async fn new(
492 persist_location: PersistLocation,
493 persist_clients: Arc<PersistClientCache>,
494 metrics_registry: &MetricsRegistry,
495 _now: NowFn,
496 txns_metrics: Arc<TxnMetrics>,
497 envd_epoch: NonZeroI64,
498 read_only: bool,
499 connection_context: ConnectionContext,
500 txn: &dyn StorageTxn<T>,
501 ) -> Self {
502 let metrics = StorageCollectionsMetrics::register_into(metrics_registry);
503
504 let txns_id = txn
508 .get_txn_wal_shard()
509 .expect("must call prepare initialization before creating StorageCollections");
510
511 let txns_client = persist_clients
512 .open(persist_location.clone())
513 .await
514 .expect("location should be valid");
515
516 let _txns_handle: TxnsHandle<SourceData, (), T, StorageDiff, PersistEpoch, TxnsCodecRow> =
519 TxnsHandle::open(
520 T::minimum(),
521 txns_client.clone(),
522 txns_client.dyncfgs().clone(),
523 Arc::clone(&txns_metrics),
524 txns_id,
525 )
526 .await;
527
528 let (txns_key_schema, txns_val_schema) = TxnsCodecRow::schemas();
530 let mut txns_write = txns_client
531 .open_writer(
532 txns_id,
533 Arc::new(txns_key_schema),
534 Arc::new(txns_val_schema),
535 Diagnostics {
536 shard_name: "txns".to_owned(),
537 handle_purpose: "commit txns".to_owned(),
538 },
539 )
540 .await
541 .expect("txns schema shouldn't change");
542
543 let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
544
545 let collections = Arc::new(std::sync::Mutex::new(BTreeMap::default()));
546 let finalizable_shards =
547 Arc::new(ShardIdSet::new(metrics.finalization_outstanding.clone()));
548 let finalized_shards =
549 Arc::new(ShardIdSet::new(metrics.finalization_pending_commit.clone()));
550 let config = Arc::new(Mutex::new(StorageConfiguration::new(
551 connection_context,
552 mz_dyncfgs::all_dyncfgs(),
553 )));
554
555 let initial_txn_upper = txns_write.fetch_recent_upper().await.to_owned();
556
557 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
558 let (holds_tx, holds_rx) = mpsc::unbounded_channel();
559 let mut background_task = BackgroundTask {
560 config: Arc::clone(&config),
561 cmds_tx: cmd_tx.clone(),
562 cmds_rx: cmd_rx,
563 holds_rx,
564 collections: Arc::clone(&collections),
565 finalizable_shards: Arc::clone(&finalizable_shards),
566 shard_by_id: BTreeMap::new(),
567 since_handles: BTreeMap::new(),
568 txns_handle: Some(txns_write),
569 txns_shards: Default::default(),
570 };
571
572 let background_task =
573 mz_ore::task::spawn(|| "storage_collections::background_task", async move {
574 background_task.run().await
575 });
576
577 let finalize_shards_task = mz_ore::task::spawn(
578 || "storage_collections::finalize_shards_task",
579 finalize_shards_task::<T>(FinalizeShardsTaskConfig {
580 envd_epoch: envd_epoch.clone(),
581 config: Arc::clone(&config),
582 metrics,
583 finalizable_shards: Arc::clone(&finalizable_shards),
584 finalized_shards: Arc::clone(&finalized_shards),
585 persist_location: persist_location.clone(),
586 persist: Arc::clone(&persist_clients),
587 read_only,
588 }),
589 );
590
591 Self {
592 finalizable_shards,
593 finalized_shards,
594 collections,
595 txns_read,
596 envd_epoch,
597 read_only,
598 config,
599 initial_txn_upper,
600 persist_location,
601 persist: persist_clients,
602 cmd_tx,
603 holds_tx,
604 _background_task: Arc::new(background_task.abort_on_drop()),
605 _finalize_shards_task: Arc::new(finalize_shards_task.abort_on_drop()),
606 }
607 }
608
609 async fn open_data_handles(
617 &self,
618 id: &GlobalId,
619 shard: ShardId,
620 since: Option<&Antichain<T>>,
621 relation_desc: RelationDesc,
622 persist_client: &PersistClient,
623 ) -> (
624 WriteHandle<SourceData, (), T, StorageDiff>,
625 SinceHandleWrapper<T>,
626 ) {
627 let since_handle = if self.read_only {
628 let read_handle = self
629 .open_leased_handle(id, shard, relation_desc.clone(), since, persist_client)
630 .await;
631 SinceHandleWrapper::Leased(read_handle)
632 } else {
633 persist_client
636 .upgrade_version::<SourceData, (), T, StorageDiff>(
637 shard,
638 Diagnostics {
639 shard_name: id.to_string(),
640 handle_purpose: format!("controller data for {}", id),
641 },
642 )
643 .await
644 .expect("invalid persist usage");
645
646 let since_handle = self
647 .open_critical_handle(id, shard, since, persist_client)
648 .await;
649
650 SinceHandleWrapper::Critical(since_handle)
651 };
652
653 let mut write_handle = self
654 .open_write_handle(id, shard, relation_desc, persist_client)
655 .await;
656
657 write_handle.fetch_recent_upper().await;
668
669 (write_handle, since_handle)
670 }
671
672 async fn open_write_handle(
674 &self,
675 id: &GlobalId,
676 shard: ShardId,
677 relation_desc: RelationDesc,
678 persist_client: &PersistClient,
679 ) -> WriteHandle<SourceData, (), T, StorageDiff> {
680 let diagnostics = Diagnostics {
681 shard_name: id.to_string(),
682 handle_purpose: format!("controller data for {}", id),
683 };
684
685 let write = persist_client
686 .open_writer(
687 shard,
688 Arc::new(relation_desc),
689 Arc::new(UnitSchema),
690 diagnostics.clone(),
691 )
692 .await
693 .expect("invalid persist usage");
694
695 write
696 }
697
698 async fn open_critical_handle(
706 &self,
707 id: &GlobalId,
708 shard: ShardId,
709 since: Option<&Antichain<T>>,
710 persist_client: &PersistClient,
711 ) -> SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch> {
712 tracing::debug!(%id, ?since, "opening critical handle");
713
714 assert!(
715 !self.read_only,
716 "attempting to open critical SinceHandle in read-only mode"
717 );
718
719 let diagnostics = Diagnostics {
720 shard_name: id.to_string(),
721 handle_purpose: format!("controller data for {}", id),
722 };
723
724 let since_handle = {
727 let mut handle: SinceHandle<_, _, _, _, PersistEpoch> = persist_client
730 .open_critical_since(
731 shard,
732 PersistClient::CONTROLLER_CRITICAL_SINCE,
733 diagnostics.clone(),
734 )
735 .await
736 .expect("invalid persist usage");
737
738 let provided_since = match since {
742 Some(since) => since,
743 None => &Antichain::from_elem(T::minimum()),
744 };
745 let since = handle.since().join(provided_since);
746
747 let our_epoch = self.envd_epoch;
748
749 loop {
750 let current_epoch: PersistEpoch = handle.opaque().clone();
751
752 let unchecked_success = current_epoch.0.map(|e| e <= our_epoch).unwrap_or(true);
754
755 if unchecked_success {
756 let checked_success = handle
759 .compare_and_downgrade_since(
760 ¤t_epoch,
761 (&PersistEpoch::from(our_epoch), &since),
762 )
763 .await
764 .is_ok();
765 if checked_success {
766 break handle;
767 }
768 } else {
769 mz_ore::halt!("fenced by envd @ {current_epoch:?}. ours = {our_epoch}");
770 }
771 }
772 };
773
774 since_handle
775 }
776
777 async fn open_leased_handle(
783 &self,
784 id: &GlobalId,
785 shard: ShardId,
786 relation_desc: RelationDesc,
787 since: Option<&Antichain<T>>,
788 persist_client: &PersistClient,
789 ) -> ReadHandle<SourceData, (), T, StorageDiff> {
790 tracing::debug!(%id, ?since, "opening leased handle");
791
792 let diagnostics = Diagnostics {
793 shard_name: id.to_string(),
794 handle_purpose: format!("controller data for {}", id),
795 };
796
797 let use_critical_since = false;
798 let mut handle: ReadHandle<_, _, _, _> = persist_client
799 .open_leased_reader(
800 shard,
801 Arc::new(relation_desc),
802 Arc::new(UnitSchema),
803 diagnostics.clone(),
804 use_critical_since,
805 )
806 .await
807 .expect("invalid persist usage");
808
809 let provided_since = match since {
813 Some(since) => since,
814 None => &Antichain::from_elem(T::minimum()),
815 };
816 let since = handle.since().join(provided_since);
817
818 handle.downgrade_since(&since).await;
819
820 handle
821 }
822
823 fn register_handles(
824 &self,
825 id: GlobalId,
826 is_in_txns: bool,
827 since_handle: SinceHandleWrapper<T>,
828 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
829 ) {
830 self.send(BackgroundCmd::Register {
831 id,
832 is_in_txns,
833 since_handle,
834 write_handle,
835 });
836 }
837
838 fn send(&self, cmd: BackgroundCmd<T>) {
839 let _ = self.cmd_tx.send(cmd);
840 }
841
842 async fn snapshot_stats_inner(
843 &self,
844 id: GlobalId,
845 as_of: SnapshotStatsAsOf<T>,
846 ) -> Result<SnapshotStats, StorageError<T>> {
847 let (tx, rx) = oneshot::channel();
854 self.send(BackgroundCmd::SnapshotStats(id, as_of, tx));
855 rx.await.expect("BackgroundTask should be live").0.await
856 }
857
858 fn install_collection_dependency_read_holds_inner(
864 &self,
865 self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
866 id: GlobalId,
867 ) -> Result<(), StorageError<T>> {
868 let (deps, collection_implied_capability) = match self_collections.get(&id) {
869 Some(CollectionState {
870 storage_dependencies: deps,
871 implied_capability,
872 ..
873 }) => (deps.clone(), implied_capability),
874 _ => return Ok(()),
875 };
876
877 for dep in deps.iter() {
878 let dep_collection = self_collections
879 .get(dep)
880 .ok_or(StorageError::IdentifierMissing(id))?;
881
882 mz_ore::soft_assert_or_log!(
883 PartialOrder::less_equal(
884 &dep_collection.implied_capability,
885 collection_implied_capability
886 ),
887 "dependency since ({dep}@{:?}) cannot be in advance of dependent's since ({id}@{:?})",
888 dep_collection.implied_capability,
889 collection_implied_capability,
890 );
891 }
892
893 self.install_read_capabilities_inner(
894 self_collections,
895 id,
896 &deps,
897 collection_implied_capability.clone(),
898 )?;
899
900 Ok(())
901 }
902
903 fn determine_collection_dependencies(
905 &self,
906 self_collections: &BTreeMap<GlobalId, CollectionState<T>>,
907 source_id: GlobalId,
908 collection_desc: &CollectionDescription<T>,
909 ) -> Result<Vec<GlobalId>, StorageError<T>> {
910 let mut dependencies = Vec::new();
911
912 if let Some(id) = collection_desc.primary {
913 dependencies.push(id);
914 }
915
916 match &collection_desc.data_source {
917 DataSource::Introspection(_)
918 | DataSource::Webhook
919 | DataSource::Table
920 | DataSource::Progress
921 | DataSource::Other => (),
922 DataSource::IngestionExport {
923 ingestion_id,
924 data_config,
925 ..
926 } => {
927 let source = self_collections
930 .get(ingestion_id)
931 .ok_or(StorageError::IdentifierMissing(*ingestion_id))?;
932 let DataSource::Ingestion(ingestion) = &source.description.data_source else {
933 panic!("SourceExport must refer to a primary source that already exists");
934 };
935
936 match data_config.envelope {
937 SourceEnvelope::CdcV2 => (),
938 _ => dependencies.push(ingestion.remap_collection_id),
939 }
940 }
941 DataSource::Ingestion(ingestion) => {
943 if ingestion.remap_collection_id != source_id {
944 dependencies.push(ingestion.remap_collection_id);
945 }
946 }
947 DataSource::Sink { desc } => dependencies.push(desc.sink.from),
948 }
949
950 Ok(dependencies)
951 }
952
953 #[instrument(level = "debug")]
955 fn install_read_capabilities_inner(
956 &self,
957 self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
958 from_id: GlobalId,
959 storage_dependencies: &[GlobalId],
960 read_capability: Antichain<T>,
961 ) -> Result<(), StorageError<T>> {
962 let mut changes = ChangeBatch::new();
963 for time in read_capability.iter() {
964 changes.update(time.clone(), 1);
965 }
966
967 if tracing::span_enabled!(tracing::Level::TRACE) {
968 let user_capabilities = self_collections
970 .iter_mut()
971 .filter(|(id, _c)| id.is_user())
972 .map(|(id, c)| {
973 let updates = c.read_capabilities.updates().cloned().collect_vec();
974 (*id, c.implied_capability.clone(), updates)
975 })
976 .collect_vec();
977
978 trace!(
979 %from_id,
980 ?storage_dependencies,
981 ?read_capability,
982 ?user_capabilities,
983 "install_read_capabilities_inner");
984 }
985
986 let mut storage_read_updates = storage_dependencies
987 .iter()
988 .map(|id| (*id, changes.clone()))
989 .collect();
990
991 StorageCollectionsImpl::update_read_capabilities_inner(
992 &self.cmd_tx,
993 self_collections,
994 &mut storage_read_updates,
995 );
996
997 if tracing::span_enabled!(tracing::Level::TRACE) {
998 let user_capabilities = self_collections
1000 .iter_mut()
1001 .filter(|(id, _c)| id.is_user())
1002 .map(|(id, c)| {
1003 let updates = c.read_capabilities.updates().cloned().collect_vec();
1004 (*id, c.implied_capability.clone(), updates)
1005 })
1006 .collect_vec();
1007
1008 trace!(
1009 %from_id,
1010 ?storage_dependencies,
1011 ?read_capability,
1012 ?user_capabilities,
1013 "after install_read_capabilities_inner!");
1014 }
1015
1016 Ok(())
1017 }
1018
1019 async fn recent_upper(&self, id: GlobalId) -> Result<Antichain<T>, StorageError<T>> {
1020 let metadata = &self.collection_metadata(id)?;
1021 let persist_client = self
1022 .persist
1023 .open(metadata.persist_location.clone())
1024 .await
1025 .unwrap();
1026 let diagnostics = Diagnostics {
1029 shard_name: id.to_string(),
1030 handle_purpose: format!("controller data for {}", id),
1031 };
1032 let write = persist_client
1035 .open_writer::<SourceData, (), T, StorageDiff>(
1036 metadata.data_shard,
1037 Arc::new(metadata.relation_desc.clone()),
1038 Arc::new(UnitSchema),
1039 diagnostics.clone(),
1040 )
1041 .await
1042 .expect("invalid persist usage");
1043 Ok(write.shared_upper())
1044 }
1045
1046 async fn read_handle_for_snapshot(
1047 persist: Arc<PersistClientCache>,
1048 metadata: &CollectionMetadata,
1049 id: GlobalId,
1050 ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
1051 let persist_client = persist
1052 .open(metadata.persist_location.clone())
1053 .await
1054 .unwrap();
1055
1056 let read_handle = persist_client
1062 .open_leased_reader::<SourceData, (), _, _>(
1063 metadata.data_shard,
1064 Arc::new(metadata.relation_desc.clone()),
1065 Arc::new(UnitSchema),
1066 Diagnostics {
1067 shard_name: id.to_string(),
1068 handle_purpose: format!("snapshot {}", id),
1069 },
1070 USE_CRITICAL_SINCE_SNAPSHOT.get(&persist.cfg),
1071 )
1072 .await
1073 .expect("invalid persist usage");
1074 Ok(read_handle)
1075 }
1076
1077 fn snapshot(
1083 &self,
1084 id: GlobalId,
1085 as_of: T,
1086 txns_read: &TxnsRead<T>,
1087 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<T>>>
1088 where
1089 T: Codec64 + From<EpochMillis> + TimestampManipulation,
1090 {
1091 let metadata = match self.collection_metadata(id) {
1092 Ok(metadata) => metadata.clone(),
1093 Err(e) => return async { Err(e.into()) }.boxed(),
1094 };
1095 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1096 assert_eq!(txns_id, txns_read.txns_id());
1097 txns_read.clone()
1098 });
1099 let persist = Arc::clone(&self.persist);
1100 async move {
1101 let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1102 let contents = match txns_read {
1103 None => {
1104 read_handle
1106 .snapshot_and_fetch(Antichain::from_elem(as_of))
1107 .await
1108 }
1109 Some(txns_read) => {
1110 txns_read.update_gt(as_of.clone()).await;
1124 let data_snapshot = txns_read
1125 .data_snapshot(metadata.data_shard, as_of.clone())
1126 .await;
1127 data_snapshot.snapshot_and_fetch(&mut read_handle).await
1128 }
1129 };
1130 match contents {
1131 Ok(contents) => {
1132 let mut snapshot = Vec::with_capacity(contents.len());
1133 for ((data, _), _, diff) in contents {
1134 let row = data.expect("invalid protobuf data").0?;
1137 snapshot.push((row, diff));
1138 }
1139 Ok(snapshot)
1140 }
1141 Err(_) => Err(StorageError::ReadBeforeSince(id)),
1142 }
1143 }
1144 .boxed()
1145 }
1146
1147 fn snapshot_and_stream(
1148 &self,
1149 id: GlobalId,
1150 as_of: T,
1151 txns_read: &TxnsRead<T>,
1152 ) -> BoxFuture<'static, Result<BoxStream<'static, (SourceData, T, StorageDiff)>, StorageError<T>>>
1153 {
1154 use futures::stream::StreamExt;
1155
1156 let metadata = match self.collection_metadata(id) {
1157 Ok(metadata) => metadata.clone(),
1158 Err(e) => return async { Err(e.into()) }.boxed(),
1159 };
1160 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1161 assert_eq!(txns_id, txns_read.txns_id());
1162 txns_read.clone()
1163 });
1164 let persist = Arc::clone(&self.persist);
1165
1166 async move {
1167 let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1168 let stream = match txns_read {
1169 None => {
1170 read_handle
1172 .snapshot_and_stream(Antichain::from_elem(as_of))
1173 .await
1174 .map_err(|_| StorageError::ReadBeforeSince(id))?
1175 .boxed()
1176 }
1177 Some(txns_read) => {
1178 txns_read.update_gt(as_of.clone()).await;
1179 let data_snapshot = txns_read
1180 .data_snapshot(metadata.data_shard, as_of.clone())
1181 .await;
1182 data_snapshot
1183 .snapshot_and_stream(&mut read_handle)
1184 .await
1185 .map_err(|_| StorageError::ReadBeforeSince(id))?
1186 .boxed()
1187 }
1188 };
1189
1190 let stream = stream
1192 .map(|((k, _v), t, d)| {
1193 let data = k.expect("error while streaming from Persist");
1196 (data, t, d)
1197 })
1198 .boxed();
1199 Ok(stream)
1200 }
1201 .boxed()
1202 }
1203
1204 fn set_read_policies_inner(
1205 &self,
1206 collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1207 policies: Vec<(GlobalId, ReadPolicy<T>)>,
1208 ) {
1209 trace!("set_read_policies: {:?}", policies);
1210
1211 let mut read_capability_changes = BTreeMap::default();
1212
1213 for (id, policy) in policies.into_iter() {
1214 let collection = match collections.get_mut(&id) {
1215 Some(c) => c,
1216 None => {
1217 panic!("Reference to absent collection {id}");
1218 }
1219 };
1220
1221 let mut new_read_capability = policy.frontier(collection.write_frontier.borrow());
1222
1223 if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
1224 let mut update = ChangeBatch::new();
1225 update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
1226 std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
1227 update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
1228 if !update.is_empty() {
1229 read_capability_changes.insert(id, update);
1230 }
1231 }
1232
1233 collection.read_policy = policy;
1234 }
1235
1236 for (id, changes) in read_capability_changes.iter() {
1237 if id.is_user() {
1238 trace!(%id, ?changes, "in set_read_policies, capability changes");
1239 }
1240 }
1241
1242 if !read_capability_changes.is_empty() {
1243 StorageCollectionsImpl::update_read_capabilities_inner(
1244 &self.cmd_tx,
1245 collections,
1246 &mut read_capability_changes,
1247 );
1248 }
1249 }
1250
1251 fn update_read_capabilities_inner(
1255 cmd_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
1256 collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1257 updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>,
1258 ) {
1259 let mut collections_net = BTreeMap::new();
1261
1262 while let Some(id) = updates.keys().rev().next().cloned() {
1267 let mut update = updates.remove(&id).unwrap();
1268
1269 if id.is_user() {
1270 trace!(id = ?id, update = ?update, "update_read_capabilities");
1271 }
1272
1273 let collection = if let Some(c) = collections.get_mut(&id) {
1274 c
1275 } else {
1276 let has_positive_updates = update.iter().any(|(_ts, diff)| *diff > 0);
1277 if has_positive_updates {
1278 panic!(
1279 "reference to absent collection {id} but we have positive updates: {:?}",
1280 update
1281 );
1282 } else {
1283 continue;
1286 }
1287 };
1288
1289 let current_read_capabilities = collection.read_capabilities.frontier().to_owned();
1290 for (time, diff) in update.iter() {
1291 assert!(
1292 collection.read_capabilities.count_for(time) + diff >= 0,
1293 "update {:?} for collection {id} would lead to negative \
1294 read capabilities, read capabilities before applying: {:?}",
1295 update,
1296 collection.read_capabilities
1297 );
1298
1299 if collection.read_capabilities.count_for(time) + diff > 0 {
1300 assert!(
1301 current_read_capabilities.less_equal(time),
1302 "update {:?} for collection {id} is trying to \
1303 install read capabilities before the current \
1304 frontier of read capabilities, read capabilities before applying: {:?}",
1305 update,
1306 collection.read_capabilities
1307 );
1308 }
1309 }
1310
1311 let changes = collection.read_capabilities.update_iter(update.drain());
1312 update.extend(changes);
1313
1314 if id.is_user() {
1315 trace!(
1316 %id,
1317 ?collection.storage_dependencies,
1318 ?update,
1319 "forwarding update to storage dependencies");
1320 }
1321
1322 for id in collection.storage_dependencies.iter() {
1323 updates
1324 .entry(*id)
1325 .or_insert_with(ChangeBatch::new)
1326 .extend(update.iter().cloned());
1327 }
1328
1329 let (changes, frontier) = collections_net
1330 .entry(id)
1331 .or_insert_with(|| (<ChangeBatch<_>>::new(), Antichain::new()));
1332
1333 changes.extend(update.drain());
1334 *frontier = collection.read_capabilities.frontier().to_owned();
1335 }
1336
1337 let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
1340 for (key, (mut changes, frontier)) in collections_net {
1341 if !changes.is_empty() {
1342 let collection = collections.get(&key).expect("must still exist");
1344 let should_emit_persist_compaction = collection.description.primary.is_none();
1345
1346 if frontier.is_empty() {
1347 info!(id = %key, "removing collection state because the since advanced to []!");
1348 collections.remove(&key).expect("must still exist");
1349 }
1350
1351 if should_emit_persist_compaction {
1352 persist_compaction_commands.push((key, frontier));
1353 }
1354 }
1355 }
1356
1357 if !persist_compaction_commands.is_empty() {
1358 cmd_tx
1359 .send(BackgroundCmd::DowngradeSince(persist_compaction_commands))
1360 .expect("cannot fail to send");
1361 }
1362 }
1363
1364 fn synchronize_finalized_shards(&self, storage_metadata: &StorageMetadata) {
1366 self.finalized_shards
1367 .lock()
1368 .retain(|shard| storage_metadata.unfinalized_shards.contains(shard));
1369 }
1370}
1371
1372#[async_trait]
1374impl<T> StorageCollections for StorageCollectionsImpl<T>
1375where
1376 T: TimelyTimestamp
1377 + Lattice
1378 + Codec64
1379 + From<EpochMillis>
1380 + TimestampManipulation
1381 + Into<mz_repr::Timestamp>
1382 + Sync,
1383{
1384 type Timestamp = T;
1385
1386 async fn initialize_state(
1387 &self,
1388 txn: &mut (dyn StorageTxn<T> + Send),
1389 init_ids: BTreeSet<GlobalId>,
1390 ) -> Result<(), StorageError<T>> {
1391 let metadata = txn.get_collection_metadata();
1392 let existing_metadata: BTreeSet<_> = metadata.into_iter().map(|(id, _)| id).collect();
1393
1394 let new_collections: BTreeSet<GlobalId> =
1396 init_ids.difference(&existing_metadata).cloned().collect();
1397
1398 self.prepare_state(
1399 txn,
1400 new_collections,
1401 BTreeSet::default(),
1402 BTreeMap::default(),
1403 )
1404 .await?;
1405
1406 let unfinalized_shards = txn.get_unfinalized_shards().into_iter().collect_vec();
1414
1415 info!(?unfinalized_shards, "initializing finalizable_shards");
1416
1417 self.finalizable_shards.lock().extend(unfinalized_shards);
1418
1419 Ok(())
1420 }
1421
1422 fn update_parameters(&self, config_params: StorageParameters) {
1423 config_params.dyncfg_updates.apply(self.persist.cfg());
1426
1427 self.config
1428 .lock()
1429 .expect("lock poisoned")
1430 .update(config_params);
1431 }
1432
1433 fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing> {
1434 let collections = self.collections.lock().expect("lock poisoned");
1435
1436 collections
1437 .get(&id)
1438 .map(|c| c.collection_metadata.clone())
1439 .ok_or(CollectionMissing(id))
1440 }
1441
1442 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
1443 let collections = self.collections.lock().expect("lock poisoned");
1444
1445 collections
1446 .iter()
1447 .filter(|(_id, c)| !c.is_dropped())
1448 .map(|(id, c)| (*id, c.collection_metadata.clone()))
1449 .collect()
1450 }
1451
1452 fn collections_frontiers(
1453 &self,
1454 ids: Vec<GlobalId>,
1455 ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, CollectionMissing> {
1456 if ids.is_empty() {
1457 return Ok(vec![]);
1458 }
1459
1460 let collections = self.collections.lock().expect("lock poisoned");
1461
1462 let res = ids
1463 .into_iter()
1464 .map(|id| {
1465 collections
1466 .get(&id)
1467 .map(|c| CollectionFrontiers {
1468 id: id.clone(),
1469 write_frontier: c.write_frontier.clone(),
1470 implied_capability: c.implied_capability.clone(),
1471 read_capabilities: c.read_capabilities.frontier().to_owned(),
1472 })
1473 .ok_or(CollectionMissing(id))
1474 })
1475 .collect::<Result<Vec<_>, _>>()?;
1476
1477 Ok(res)
1478 }
1479
1480 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
1481 let collections = self.collections.lock().expect("lock poisoned");
1482
1483 let res = collections
1484 .iter()
1485 .filter(|(_id, c)| !c.is_dropped())
1486 .map(|(id, c)| CollectionFrontiers {
1487 id: id.clone(),
1488 write_frontier: c.write_frontier.clone(),
1489 implied_capability: c.implied_capability.clone(),
1490 read_capabilities: c.read_capabilities.frontier().to_owned(),
1491 })
1492 .collect_vec();
1493
1494 res
1495 }
1496
1497 async fn snapshot_stats(
1498 &self,
1499 id: GlobalId,
1500 as_of: Antichain<Self::Timestamp>,
1501 ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
1502 let metadata = self.collection_metadata(id)?;
1503
1504 let as_of = match metadata.txns_shard.as_ref() {
1507 None => SnapshotStatsAsOf::Direct(as_of),
1508 Some(txns_id) => {
1509 assert_eq!(txns_id, self.txns_read.txns_id());
1510 let as_of = as_of
1511 .into_option()
1512 .expect("cannot read as_of the empty antichain");
1513 self.txns_read.update_gt(as_of.clone()).await;
1514 let data_snapshot = self
1515 .txns_read
1516 .data_snapshot(metadata.data_shard, as_of.clone())
1517 .await;
1518 SnapshotStatsAsOf::Txns(data_snapshot)
1519 }
1520 };
1521 self.snapshot_stats_inner(id, as_of).await
1522 }
1523
1524 async fn snapshot_parts_stats(
1525 &self,
1526 id: GlobalId,
1527 as_of: Antichain<Self::Timestamp>,
1528 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
1529 let metadata = {
1530 let self_collections = self.collections.lock().expect("lock poisoned");
1531
1532 let collection_metadata = self_collections
1533 .get(&id)
1534 .ok_or(StorageError::IdentifierMissing(id))
1535 .map(|c| c.collection_metadata.clone());
1536
1537 match collection_metadata {
1538 Ok(m) => m,
1539 Err(e) => return Box::pin(async move { Err(e) }),
1540 }
1541 };
1542
1543 let persist = Arc::clone(&self.persist);
1546 let read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await;
1547
1548 let data_snapshot = match (metadata, as_of.as_option()) {
1549 (
1550 CollectionMetadata {
1551 txns_shard: Some(txns_id),
1552 data_shard,
1553 ..
1554 },
1555 Some(as_of),
1556 ) => {
1557 assert_eq!(txns_id, *self.txns_read.txns_id());
1558 self.txns_read.update_gt(as_of.clone()).await;
1559 let data_snapshot = self
1560 .txns_read
1561 .data_snapshot(data_shard, as_of.clone())
1562 .await;
1563 Some(data_snapshot)
1564 }
1565 _ => None,
1566 };
1567
1568 Box::pin(async move {
1569 let read_handle = read_handle?;
1570 let result = match data_snapshot {
1571 Some(data_snapshot) => data_snapshot.snapshot_parts_stats(&read_handle).await,
1572 None => read_handle.snapshot_parts_stats(as_of).await,
1573 };
1574 read_handle.expire().await;
1575 result.map_err(|_| StorageError::ReadBeforeSince(id))
1576 })
1577 }
1578
1579 fn snapshot(
1585 &self,
1586 id: GlobalId,
1587 as_of: Self::Timestamp,
1588 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>> {
1589 self.snapshot(id, as_of, &self.txns_read)
1590 }
1591
1592 async fn snapshot_latest(
1593 &self,
1594 id: GlobalId,
1595 ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
1596 let upper = self.recent_upper(id).await?;
1597 let res = match upper.as_option() {
1598 Some(f) if f > &T::minimum() => {
1599 let as_of = f.step_back().unwrap();
1600
1601 let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap();
1602 snapshot
1603 .into_iter()
1604 .map(|(row, diff)| {
1605 assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
1606 row
1607 })
1608 .collect()
1609 }
1610 Some(_min) => {
1611 Vec::new()
1613 }
1614 _ => {
1617 return Err(StorageError::InvalidUsage(
1618 "collection closed, cannot determine a read timestamp based on the upper"
1619 .to_string(),
1620 ));
1621 }
1622 };
1623
1624 Ok(res)
1625 }
1626
1627 fn snapshot_cursor(
1628 &self,
1629 id: GlobalId,
1630 as_of: Self::Timestamp,
1631 ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
1632 where
1633 Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
1634 {
1635 let metadata = match self.collection_metadata(id) {
1636 Ok(metadata) => metadata.clone(),
1637 Err(e) => return async { Err(e.into()) }.boxed(),
1638 };
1639 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1640 assert_eq!(txns_id, self.txns_read.txns_id());
1643 self.txns_read.clone()
1644 });
1645 let persist = Arc::clone(&self.persist);
1646
1647 async move {
1649 let mut handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1650 let cursor = match txns_read {
1651 None => {
1652 let cursor = handle
1653 .snapshot_cursor(Antichain::from_elem(as_of), |_| true)
1654 .await
1655 .map_err(|_| StorageError::ReadBeforeSince(id))?;
1656 SnapshotCursor {
1657 _read_handle: handle,
1658 cursor,
1659 }
1660 }
1661 Some(txns_read) => {
1662 txns_read.update_gt(as_of.clone()).await;
1663 let data_snapshot = txns_read
1664 .data_snapshot(metadata.data_shard, as_of.clone())
1665 .await;
1666 let cursor = data_snapshot
1667 .snapshot_cursor(&mut handle, |_| true)
1668 .await
1669 .map_err(|_| StorageError::ReadBeforeSince(id))?;
1670 SnapshotCursor {
1671 _read_handle: handle,
1672 cursor,
1673 }
1674 }
1675 };
1676
1677 Ok(cursor)
1678 }
1679 .boxed()
1680 }
1681
1682 fn snapshot_and_stream(
1683 &self,
1684 id: GlobalId,
1685 as_of: Self::Timestamp,
1686 ) -> BoxFuture<
1687 'static,
1688 Result<
1689 BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
1690 StorageError<Self::Timestamp>,
1691 >,
1692 >
1693 where
1694 Self::Timestamp: TimelyTimestamp + Lattice + Codec64 + 'static,
1695 {
1696 self.snapshot_and_stream(id, as_of, &self.txns_read)
1697 }
1698
1699 fn create_update_builder(
1700 &self,
1701 id: GlobalId,
1702 ) -> BoxFuture<
1703 'static,
1704 Result<
1705 TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
1706 StorageError<Self::Timestamp>,
1707 >,
1708 > {
1709 let metadata = match self.collection_metadata(id) {
1710 Ok(m) => m,
1711 Err(e) => return Box::pin(async move { Err(e.into()) }),
1712 };
1713 let persist = Arc::clone(&self.persist);
1714
1715 async move {
1716 let persist_client = persist
1717 .open(metadata.persist_location.clone())
1718 .await
1719 .expect("invalid persist usage");
1720 let write_handle = persist_client
1721 .open_writer::<SourceData, (), Self::Timestamp, StorageDiff>(
1722 metadata.data_shard,
1723 Arc::new(metadata.relation_desc.clone()),
1724 Arc::new(UnitSchema),
1725 Diagnostics {
1726 shard_name: id.to_string(),
1727 handle_purpose: format!("create write batch {}", id),
1728 },
1729 )
1730 .await
1731 .expect("invalid persist usage");
1732 let builder = TimestamplessUpdateBuilder::new(&write_handle);
1733
1734 Ok(builder)
1735 }
1736 .boxed()
1737 }
1738
1739 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
1740 let collections = self.collections.lock().expect("lock poisoned");
1741
1742 if collections.contains_key(&id) {
1743 Ok(())
1744 } else {
1745 Err(StorageError::IdentifierMissing(id))
1746 }
1747 }
1748
1749 async fn prepare_state(
1750 &self,
1751 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
1752 ids_to_add: BTreeSet<GlobalId>,
1753 ids_to_drop: BTreeSet<GlobalId>,
1754 ids_to_register: BTreeMap<GlobalId, ShardId>,
1755 ) -> Result<(), StorageError<T>> {
1756 txn.insert_collection_metadata(
1757 ids_to_add
1758 .into_iter()
1759 .map(|id| (id, ShardId::new()))
1760 .collect(),
1761 )?;
1762 txn.insert_collection_metadata(ids_to_register)?;
1763
1764 let dropped_mappings = txn.delete_collection_metadata(ids_to_drop);
1766
1767 let dropped_shards = dropped_mappings
1768 .into_iter()
1769 .map(|(_id, shard)| shard)
1770 .collect();
1771
1772 txn.insert_unfinalized_shards(dropped_shards)?;
1773
1774 let finalized_shards = self.finalized_shards.lock().iter().copied().collect();
1777 txn.mark_shards_as_finalized(finalized_shards);
1778
1779 Ok(())
1780 }
1781
1782 #[instrument(level = "debug")]
1785 async fn create_collections_for_bootstrap(
1786 &self,
1787 storage_metadata: &StorageMetadata,
1788 register_ts: Option<Self::Timestamp>,
1789 mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
1790 migrated_storage_collections: &BTreeSet<GlobalId>,
1791 ) -> Result<(), StorageError<Self::Timestamp>> {
1792 let is_in_txns = |id, metadata: &CollectionMetadata| {
1793 metadata.txns_shard.is_some()
1794 && !(self.read_only && migrated_storage_collections.contains(&id))
1795 };
1796
1797 collections.sort_by_key(|(id, _)| *id);
1802 collections.dedup();
1803 for pos in 1..collections.len() {
1804 if collections[pos - 1].0 == collections[pos].0 {
1805 return Err(StorageError::CollectionIdReused(collections[pos].0));
1806 }
1807 }
1808
1809 {
1810 let self_collections = self.collections.lock().expect("lock poisoned");
1816 for (id, description) in collections.iter() {
1817 if let Some(existing_collection) = self_collections.get(id) {
1818 if &existing_collection.description != description {
1819 return Err(StorageError::CollectionIdReused(*id));
1820 }
1821 }
1822 }
1823 }
1824
1825 let enriched_with_metadata = collections
1828 .into_iter()
1829 .map(|(id, description)| {
1830 let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
1831
1832 let txns_shard = description
1836 .data_source
1837 .in_txns()
1838 .then(|| *self.txns_read.txns_id());
1839
1840 let metadata = CollectionMetadata {
1841 persist_location: self.persist_location.clone(),
1842 data_shard,
1843 relation_desc: description.desc.clone(),
1844 txns_shard,
1845 };
1846
1847 Ok((id, description, metadata))
1848 })
1849 .collect_vec();
1850
1851 let persist_client = self
1853 .persist
1854 .open(self.persist_location.clone())
1855 .await
1856 .unwrap();
1857 let persist_client = &persist_client;
1858 use futures::stream::{StreamExt, TryStreamExt};
1861 let this = &*self;
1862 let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
1863 .map(|data: Result<_, StorageError<Self::Timestamp>>| {
1864 let register_ts = register_ts.clone();
1865 async move {
1866 let (id, description, metadata) = data?;
1867
1868 debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
1873
1874 let (write, mut since_handle) = this
1875 .open_data_handles(
1876 &id,
1877 metadata.data_shard,
1878 description.since.as_ref(),
1879 metadata.relation_desc.clone(),
1880 persist_client,
1881 )
1882 .await;
1883
1884 match description.data_source {
1893 DataSource::Introspection(_)
1894 | DataSource::IngestionExport { .. }
1895 | DataSource::Webhook
1896 | DataSource::Ingestion(_)
1897 | DataSource::Progress
1898 | DataSource::Other => {}
1899 DataSource::Sink { .. } => {}
1900 DataSource::Table => {
1901 let register_ts = register_ts.expect(
1902 "caller should have provided a register_ts when creating a table",
1903 );
1904 if since_handle.since().elements() == &[T::minimum()]
1905 && !migrated_storage_collections.contains(&id)
1906 {
1907 debug!("advancing {} to initial since of {:?}", id, register_ts);
1908 let token = since_handle.opaque();
1909 let _ = since_handle
1910 .compare_and_downgrade_since(
1911 &token,
1912 (&token, &Antichain::from_elem(register_ts.clone())),
1913 )
1914 .await;
1915 }
1916 }
1917 }
1918
1919 Ok::<_, StorageError<Self::Timestamp>>((
1920 id,
1921 description,
1922 write,
1923 since_handle,
1924 metadata,
1925 ))
1926 }
1927 })
1928 .buffer_unordered(50)
1930 .try_collect()
1944 .await?;
1945
1946 #[derive(Ord, PartialOrd, Eq, PartialEq)]
1948 enum DependencyOrder {
1949 Table(Reverse<GlobalId>),
1951 Collection(GlobalId),
1953 Sink(GlobalId),
1955 }
1956 to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
1957 DataSource::Table => DependencyOrder::Table(Reverse(*id)),
1958 DataSource::Sink { .. } => DependencyOrder::Sink(*id),
1959 _ => DependencyOrder::Collection(*id),
1960 });
1961
1962 let mut self_collections = self.collections.lock().expect("lock poisoned");
1965
1966 for (id, description, write_handle, since_handle, metadata) in to_register {
1967 let write_frontier = write_handle.upper();
1968 let data_shard_since = since_handle.since().clone();
1969
1970 let storage_dependencies =
1972 self.determine_collection_dependencies(&*self_collections, id, &description)?;
1973
1974 let initial_since = match storage_dependencies
1976 .iter()
1977 .at_most_one()
1978 .expect("should have at most one dependency")
1979 {
1980 Some(dep) => {
1981 let dependency_collection = self_collections
1982 .get(dep)
1983 .ok_or(StorageError::IdentifierMissing(*dep))?;
1984 let dependency_since = dependency_collection.implied_capability.clone();
1985
1986 if PartialOrder::less_than(&data_shard_since, &dependency_since) {
1997 mz_ore::soft_assert_or_log!(
2016 write_frontier.elements() == &[T::minimum()]
2017 || write_frontier.is_empty()
2018 || PartialOrder::less_than(&dependency_since, write_frontier),
2019 "dependency ({dep}) since has advanced past dependent ({id}) upper \n
2020 dependent ({id}): since {:?}, upper {:?} \n
2021 dependency ({dep}): since {:?}",
2022 data_shard_since,
2023 write_frontier,
2024 dependency_since
2025 );
2026
2027 dependency_since
2028 } else {
2029 data_shard_since
2030 }
2031 }
2032 None => data_shard_since,
2033 };
2034
2035 let mut collection_state = CollectionState::new(
2036 description,
2037 initial_since,
2038 write_frontier.clone(),
2039 storage_dependencies,
2040 metadata.clone(),
2041 );
2042
2043 match &collection_state.description.data_source {
2045 DataSource::Introspection(_) => {
2046 self_collections.insert(id, collection_state);
2047 }
2048 DataSource::Webhook => {
2049 self_collections.insert(id, collection_state);
2050 }
2051 DataSource::IngestionExport {
2052 ingestion_id,
2053 details,
2054 data_config,
2055 } => {
2056 let source_collection = self_collections
2058 .get_mut(ingestion_id)
2059 .expect("known to exist");
2060 match &mut source_collection.description {
2061 CollectionDescription {
2062 data_source: DataSource::Ingestion(ingestion_desc),
2063 ..
2064 } => ingestion_desc.source_exports.insert(
2065 id,
2066 SourceExport {
2067 storage_metadata: (),
2068 details: details.clone(),
2069 data_config: data_config.clone(),
2070 },
2071 ),
2072 _ => unreachable!(
2073 "SourceExport must only refer to primary sources that already exist"
2074 ),
2075 };
2076
2077 self_collections.insert(id, collection_state);
2078 }
2079 DataSource::Table => {
2080 if is_in_txns(id, &metadata)
2083 && PartialOrder::less_than(
2084 &collection_state.write_frontier,
2085 &self.initial_txn_upper,
2086 )
2087 {
2088 collection_state
2094 .write_frontier
2095 .clone_from(&self.initial_txn_upper);
2096 }
2097 self_collections.insert(id, collection_state);
2098 }
2099 DataSource::Progress | DataSource::Other => {
2100 self_collections.insert(id, collection_state);
2101 }
2102 DataSource::Ingestion(_) => {
2103 self_collections.insert(id, collection_state);
2104 }
2105 DataSource::Sink { .. } => {
2106 self_collections.insert(id, collection_state);
2107 }
2108 }
2109
2110 self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle);
2111
2112 self.install_collection_dependency_read_holds_inner(&mut *self_collections, id)?;
2114 }
2115
2116 drop(self_collections);
2117
2118 self.synchronize_finalized_shards(storage_metadata);
2119
2120 Ok(())
2121 }
2122
2123 async fn alter_ingestion_source_desc(
2124 &self,
2125 ingestion_id: GlobalId,
2126 source_desc: SourceDesc,
2127 ) -> Result<(), StorageError<Self::Timestamp>> {
2128 let mut self_collections = self.collections.lock().expect("lock poisoned");
2132 let collection = self_collections
2133 .get_mut(&ingestion_id)
2134 .ok_or(StorageError::IdentifierMissing(ingestion_id))?;
2135
2136 let curr_ingestion = match &mut collection.description.data_source {
2137 DataSource::Ingestion(active_ingestion) => active_ingestion,
2138 _ => unreachable!("verified collection refers to ingestion"),
2139 };
2140
2141 curr_ingestion.desc = source_desc;
2142 debug!("altered {ingestion_id}'s SourceDesc");
2143
2144 Ok(())
2145 }
2146
2147 async fn alter_ingestion_export_data_configs(
2148 &self,
2149 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
2150 ) -> Result<(), StorageError<Self::Timestamp>> {
2151 let mut self_collections = self.collections.lock().expect("lock poisoned");
2152
2153 for (source_export_id, new_data_config) in source_exports {
2154 let source_export_collection = self_collections
2157 .get_mut(&source_export_id)
2158 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2159 let ingestion_id = match &mut source_export_collection.description.data_source {
2160 DataSource::IngestionExport {
2161 ingestion_id,
2162 details: _,
2163 data_config,
2164 } => {
2165 *data_config = new_data_config.clone();
2166 *ingestion_id
2167 }
2168 o => {
2169 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2170 Err(StorageError::IdentifierInvalid(source_export_id))?
2171 }
2172 };
2173 let ingestion_collection = self_collections
2176 .get_mut(&ingestion_id)
2177 .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
2178
2179 match &mut ingestion_collection.description.data_source {
2180 DataSource::Ingestion(ingestion_desc) => {
2181 let source_export = ingestion_desc
2182 .source_exports
2183 .get_mut(&source_export_id)
2184 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2185
2186 if source_export.data_config != new_data_config {
2187 tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
2188 source_export.data_config = new_data_config;
2189 } else {
2190 tracing::warn!(
2191 "alter_ingestion_export_data_configs called on \
2192 export {source_export_id} of {ingestion_id} but \
2193 the data config was the same"
2194 );
2195 }
2196 }
2197 o => {
2198 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2199 Err(StorageError::IdentifierInvalid(ingestion_id))?;
2200 }
2201 }
2202 }
2203
2204 Ok(())
2205 }
2206
2207 async fn alter_ingestion_connections(
2208 &self,
2209 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
2210 ) -> Result<(), StorageError<Self::Timestamp>> {
2211 let mut self_collections = self.collections.lock().expect("lock poisoned");
2212
2213 for (id, conn) in source_connections {
2214 let collection = self_collections
2215 .get_mut(&id)
2216 .ok_or_else(|| StorageError::IdentifierMissing(id))?;
2217
2218 match &mut collection.description.data_source {
2219 DataSource::Ingestion(ingestion) => {
2220 if ingestion.desc.connection != conn {
2223 info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
2224 ingestion.desc.connection = conn;
2225 } else {
2226 warn!(
2227 "update_source_connection called on {id} but the \
2228 connection was the same"
2229 );
2230 }
2231 }
2232 o => {
2233 warn!("update_source_connection called on {:?}", o);
2234 Err(StorageError::IdentifierInvalid(id))?;
2235 }
2236 }
2237 }
2238
2239 Ok(())
2240 }
2241
2242 async fn alter_table_desc(
2243 &self,
2244 existing_collection: GlobalId,
2245 new_collection: GlobalId,
2246 new_desc: RelationDesc,
2247 expected_version: RelationVersion,
2248 ) -> Result<(), StorageError<Self::Timestamp>> {
2249 let data_shard = {
2250 let self_collections = self.collections.lock().expect("lock poisoned");
2251 let existing = self_collections
2252 .get(&existing_collection)
2253 .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;
2254
2255 if existing.description.data_source != DataSource::Table {
2257 return Err(StorageError::IdentifierInvalid(existing_collection));
2258 }
2259
2260 existing.collection_metadata.data_shard
2261 };
2262
2263 let persist_client = self
2264 .persist
2265 .open(self.persist_location.clone())
2266 .await
2267 .unwrap();
2268
2269 let diagnostics = Diagnostics {
2271 shard_name: existing_collection.to_string(),
2272 handle_purpose: "alter_table_desc".to_string(),
2273 };
2274 let expected_schema = expected_version.into();
2276 let schema_result = persist_client
2277 .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
2278 data_shard,
2279 expected_schema,
2280 &new_desc,
2281 &UnitSchema,
2282 diagnostics,
2283 )
2284 .await
2285 .map_err(|e| StorageError::InvalidUsage(e.to_string()))?;
2286 tracing::info!(
2287 ?existing_collection,
2288 ?new_collection,
2289 ?new_desc,
2290 "evolved schema"
2291 );
2292
2293 match schema_result {
2294 CaESchema::Ok(id) => id,
2295 CaESchema::ExpectedMismatch {
2297 schema_id,
2298 key,
2299 val,
2300 } => {
2301 mz_ore::soft_panic_or_log!(
2302 "schema expectation mismatch {schema_id:?}, {key:?}, {val:?}"
2303 );
2304 return Err(StorageError::Generic(anyhow::anyhow!(
2305 "schema expected mismatch, {existing_collection:?}",
2306 )));
2307 }
2308 CaESchema::Incompatible => {
2309 mz_ore::soft_panic_or_log!(
2310 "incompatible schema! {existing_collection} {new_desc:?}"
2311 );
2312 return Err(StorageError::Generic(anyhow::anyhow!(
2313 "schema incompatible, {existing_collection:?}"
2314 )));
2315 }
2316 };
2317
2318 let (write_handle, since_handle) = self
2320 .open_data_handles(
2321 &new_collection,
2322 data_shard,
2323 None,
2324 new_desc.clone(),
2325 &persist_client,
2326 )
2327 .await;
2328
2329 {
2335 let mut self_collections = self.collections.lock().expect("lock poisoned");
2336
2337 let existing = self_collections
2339 .get_mut(&existing_collection)
2340 .expect("existing collection missing");
2341
2342 assert_eq!(existing.description.data_source, DataSource::Table);
2344 assert_none!(existing.description.primary);
2345
2346 existing.description.primary = Some(new_collection);
2348 existing.storage_dependencies.push(new_collection);
2349
2350 let implied_capability = existing.read_capabilities.frontier().to_owned();
2354 let write_frontier = existing.write_frontier.clone();
2355
2356 let mut changes = ChangeBatch::new();
2363 changes.extend(implied_capability.iter().map(|t| (t.clone(), 1)));
2364
2365 let collection_desc = CollectionDescription::for_table(new_desc.clone());
2367 let collection_meta = CollectionMetadata {
2368 persist_location: self.persist_location.clone(),
2369 relation_desc: collection_desc.desc.clone(),
2370 data_shard,
2371 txns_shard: Some(self.txns_read.txns_id().clone()),
2372 };
2373 let collection_state = CollectionState::new(
2374 collection_desc,
2375 implied_capability,
2376 write_frontier,
2377 Vec::new(),
2378 collection_meta,
2379 );
2380
2381 self_collections.insert(new_collection, collection_state);
2383
2384 let mut updates = BTreeMap::from([(new_collection, changes)]);
2385 StorageCollectionsImpl::update_read_capabilities_inner(
2386 &self.cmd_tx,
2387 &mut *self_collections,
2388 &mut updates,
2389 );
2390 };
2391
2392 self.register_handles(new_collection, true, since_handle, write_handle);
2394
2395 info!(%existing_collection, %new_collection, ?new_desc, "altered table");
2396
2397 Ok(())
2398 }
2399
2400 fn drop_collections_unvalidated(
2401 &self,
2402 storage_metadata: &StorageMetadata,
2403 identifiers: Vec<GlobalId>,
2404 ) {
2405 debug!(?identifiers, "drop_collections_unvalidated");
2406
2407 let mut self_collections = self.collections.lock().expect("lock poisoned");
2408
2409 for id in identifiers.iter() {
2410 let metadata = storage_metadata.get_collection_shard::<T>(*id);
2411 mz_ore::soft_assert_or_log!(
2412 matches!(metadata, Err(StorageError::IdentifierMissing(_))),
2413 "dropping {id}, but drop was not synchronized with storage \
2414 controller via `synchronize_collections`"
2415 );
2416
2417 let dropped_data_source = match self_collections.get(id) {
2418 Some(col) => col.description.data_source.clone(),
2419 None => continue,
2420 };
2421
2422 if let DataSource::IngestionExport { ingestion_id, .. } = dropped_data_source {
2425 let ingestion = match self_collections.get_mut(&ingestion_id) {
2427 Some(ingestion) => ingestion,
2428 None => {
2430 tracing::error!(
2431 "primary source {ingestion_id} seemingly dropped before subsource {id}",
2432 );
2433 continue;
2434 }
2435 };
2436
2437 match &mut ingestion.description {
2438 CollectionDescription {
2439 data_source: DataSource::Ingestion(ingestion_desc),
2440 ..
2441 } => {
2442 let removed = ingestion_desc.source_exports.remove(id);
2443 mz_ore::soft_assert_or_log!(
2444 removed.is_some(),
2445 "dropped subsource {id} already removed from source exports"
2446 );
2447 }
2448 _ => unreachable!(
2449 "SourceExport must only refer to primary sources that already exist"
2450 ),
2451 };
2452 }
2453 }
2454
2455 let mut finalized_policies = Vec::new();
2463
2464 for id in identifiers {
2465 if self_collections.contains_key(&id) {
2467 finalized_policies.push((id, ReadPolicy::ValidFrom(Antichain::new())));
2468 }
2469 }
2470 self.set_read_policies_inner(&mut self_collections, finalized_policies);
2471
2472 drop(self_collections);
2473
2474 self.synchronize_finalized_shards(storage_metadata);
2475 }
2476
2477 fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
2478 let mut collections = self.collections.lock().expect("lock poisoned");
2479
2480 if tracing::enabled!(tracing::Level::TRACE) {
2481 let user_capabilities = collections
2482 .iter_mut()
2483 .filter(|(id, _c)| id.is_user())
2484 .map(|(id, c)| {
2485 let updates = c.read_capabilities.updates().cloned().collect_vec();
2486 (*id, c.implied_capability.clone(), updates)
2487 })
2488 .collect_vec();
2489
2490 trace!(?policies, ?user_capabilities, "set_read_policies");
2491 }
2492
2493 self.set_read_policies_inner(&mut collections, policies);
2494
2495 if tracing::enabled!(tracing::Level::TRACE) {
2496 let user_capabilities = collections
2497 .iter_mut()
2498 .filter(|(id, _c)| id.is_user())
2499 .map(|(id, c)| {
2500 let updates = c.read_capabilities.updates().cloned().collect_vec();
2501 (*id, c.implied_capability.clone(), updates)
2502 })
2503 .collect_vec();
2504
2505 trace!(?user_capabilities, "after! set_read_policies");
2506 }
2507 }
2508
2509 fn acquire_read_holds(
2510 &self,
2511 desired_holds: Vec<GlobalId>,
2512 ) -> Result<Vec<ReadHold<Self::Timestamp>>, CollectionMissing> {
2513 if desired_holds.is_empty() {
2514 return Ok(vec![]);
2515 }
2516
2517 let mut collections = self.collections.lock().expect("lock poisoned");
2518
2519 let mut advanced_holds = Vec::new();
2520 for id in desired_holds.iter() {
2531 let collection = collections.get(id).ok_or(CollectionMissing(*id))?;
2532 let since = collection.read_capabilities.frontier().to_owned();
2533 advanced_holds.push((*id, since));
2534 }
2535
2536 let mut updates = advanced_holds
2537 .iter()
2538 .map(|(id, hold)| {
2539 let mut changes = ChangeBatch::new();
2540 changes.extend(hold.iter().map(|time| (time.clone(), 1)));
2541 (*id, changes)
2542 })
2543 .collect::<BTreeMap<_, _>>();
2544
2545 StorageCollectionsImpl::update_read_capabilities_inner(
2546 &self.cmd_tx,
2547 &mut collections,
2548 &mut updates,
2549 );
2550
2551 let acquired_holds = advanced_holds
2552 .into_iter()
2553 .map(|(id, since)| ReadHold::with_channel(id, since, self.holds_tx.clone()))
2554 .collect_vec();
2555
2556 trace!(?desired_holds, ?acquired_holds, "acquire_read_holds");
2557
2558 Ok(acquired_holds)
2559 }
2560
2561 fn determine_time_dependence(
2563 &self,
2564 id: GlobalId,
2565 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
2566 use TimeDependenceError::CollectionMissing;
2567 let collections = self.collections.lock().expect("lock poisoned");
2568 let mut collection = Some(collections.get(&id).ok_or(CollectionMissing(id))?);
2569
2570 let mut result = None;
2571
2572 while let Some(c) = collection.take() {
2573 use DataSource::*;
2574 if let Some(timeline) = &c.description.timeline {
2575 if *timeline != Timeline::EpochMilliseconds {
2577 break;
2578 }
2579 }
2580 match &c.description.data_source {
2581 Ingestion(ingestion) => {
2582 use GenericSourceConnection::*;
2583 match ingestion.desc.connection {
2584 Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => {
2587 result = Some(TimeDependence::default())
2588 }
2589 LoadGenerator(_) => {}
2591 }
2592 }
2593 IngestionExport { ingestion_id, .. } => {
2594 let c = collections
2595 .get(ingestion_id)
2596 .ok_or(CollectionMissing(*ingestion_id))?;
2597 collection = Some(c);
2598 }
2599 Introspection(_) | Progress | Table { .. } | Webhook { .. } => {
2601 result = Some(TimeDependence::default())
2602 }
2603 Other => {}
2605 Sink { .. } => {}
2606 };
2607 }
2608 Ok(result)
2609 }
2610}
2611
2612#[derive(Debug)]
2619enum SinceHandleWrapper<T>
2620where
2621 T: TimelyTimestamp + Lattice + Codec64,
2622{
2623 Critical(SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch>),
2624 Leased(ReadHandle<SourceData, (), T, StorageDiff>),
2625}
2626
2627impl<T> SinceHandleWrapper<T>
2628where
2629 T: TimelyTimestamp + Lattice + Codec64 + TotalOrder + Sync,
2630{
2631 pub fn since(&self) -> &Antichain<T> {
2632 match self {
2633 Self::Critical(handle) => handle.since(),
2634 Self::Leased(handle) => handle.since(),
2635 }
2636 }
2637
2638 pub fn opaque(&self) -> PersistEpoch {
2639 match self {
2640 Self::Critical(handle) => handle.opaque().clone(),
2641 Self::Leased(_handle) => {
2642 PersistEpoch(None)
2647 }
2648 }
2649 }
2650
2651 pub async fn compare_and_downgrade_since(
2652 &mut self,
2653 expected: &PersistEpoch,
2654 new: (&PersistEpoch, &Antichain<T>),
2655 ) -> Result<Antichain<T>, PersistEpoch> {
2656 match self {
2657 Self::Critical(handle) => handle.compare_and_downgrade_since(expected, new).await,
2658 Self::Leased(handle) => {
2659 let (opaque, since) = new;
2660 assert_none!(opaque.0);
2661
2662 handle.downgrade_since(since).await;
2663
2664 Ok(since.clone())
2665 }
2666 }
2667 }
2668
2669 pub async fn maybe_compare_and_downgrade_since(
2670 &mut self,
2671 expected: &PersistEpoch,
2672 new: (&PersistEpoch, &Antichain<T>),
2673 ) -> Option<Result<Antichain<T>, PersistEpoch>> {
2674 match self {
2675 Self::Critical(handle) => {
2676 handle
2677 .maybe_compare_and_downgrade_since(expected, new)
2678 .await
2679 }
2680 Self::Leased(handle) => {
2681 let (opaque, since) = new;
2682 assert_none!(opaque.0);
2683
2684 handle.maybe_downgrade_since(since).await;
2685
2686 Some(Ok(since.clone()))
2687 }
2688 }
2689 }
2690
2691 pub fn snapshot_stats(
2692 &self,
2693 id: GlobalId,
2694 as_of: Option<Antichain<T>>,
2695 ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2696 match self {
2697 Self::Critical(handle) => {
2698 let res = handle
2699 .snapshot_stats(as_of)
2700 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2701 Box::pin(res)
2702 }
2703 Self::Leased(handle) => {
2704 let res = handle
2705 .snapshot_stats(as_of)
2706 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2707 Box::pin(res)
2708 }
2709 }
2710 }
2711
2712 pub fn snapshot_stats_from_txn(
2713 &self,
2714 id: GlobalId,
2715 data_snapshot: DataSnapshot<T>,
2716 ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2717 match self {
2718 Self::Critical(handle) => Box::pin(
2719 data_snapshot
2720 .snapshot_stats_from_critical(handle)
2721 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2722 ),
2723 Self::Leased(handle) => Box::pin(
2724 data_snapshot
2725 .snapshot_stats_from_leased(handle)
2726 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2727 ),
2728 }
2729 }
2730}
2731
2732#[derive(Debug, Clone)]
2734struct CollectionState<T> {
2735 pub description: CollectionDescription<T>,
2737
2738 pub read_capabilities: MutableAntichain<T>,
2744
2745 pub implied_capability: Antichain<T>,
2749
2750 pub read_policy: ReadPolicy<T>,
2752
2753 pub storage_dependencies: Vec<GlobalId>,
2755
2756 pub write_frontier: Antichain<T>,
2758
2759 pub collection_metadata: CollectionMetadata,
2760}
2761
2762impl<T: TimelyTimestamp> CollectionState<T> {
2763 pub fn new(
2766 description: CollectionDescription<T>,
2767 since: Antichain<T>,
2768 write_frontier: Antichain<T>,
2769 storage_dependencies: Vec<GlobalId>,
2770 metadata: CollectionMetadata,
2771 ) -> Self {
2772 let mut read_capabilities = MutableAntichain::new();
2773 read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2774 Self {
2775 description,
2776 read_capabilities,
2777 implied_capability: since.clone(),
2778 read_policy: ReadPolicy::NoPolicy {
2779 initial_since: since,
2780 },
2781 storage_dependencies,
2782 write_frontier,
2783 collection_metadata: metadata,
2784 }
2785 }
2786
2787 pub fn is_dropped(&self) -> bool {
2789 self.read_capabilities.is_empty()
2790 }
2791}
2792
2793#[derive(Debug)]
2799struct BackgroundTask<T: TimelyTimestamp + Lattice + Codec64> {
2800 config: Arc<Mutex<StorageConfiguration>>,
2801 cmds_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
2802 cmds_rx: mpsc::UnboundedReceiver<BackgroundCmd<T>>,
2803 holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch<T>)>,
2804 finalizable_shards: Arc<ShardIdSet>,
2805 collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
2806 shard_by_id: BTreeMap<GlobalId, ShardId>,
2809 since_handles: BTreeMap<GlobalId, SinceHandleWrapper<T>>,
2810 txns_handle: Option<WriteHandle<SourceData, (), T, StorageDiff>>,
2811 txns_shards: BTreeSet<GlobalId>,
2812}
2813
2814#[derive(Debug)]
2815enum BackgroundCmd<T: TimelyTimestamp + Lattice + Codec64> {
2816 Register {
2817 id: GlobalId,
2818 is_in_txns: bool,
2819 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
2820 since_handle: SinceHandleWrapper<T>,
2821 },
2822 DowngradeSince(Vec<(GlobalId, Antichain<T>)>),
2823 SnapshotStats(
2824 GlobalId,
2825 SnapshotStatsAsOf<T>,
2826 oneshot::Sender<SnapshotStatsRes<T>>,
2827 ),
2828}
2829
2830pub(crate) struct SnapshotStatsRes<T>(BoxFuture<'static, Result<SnapshotStats, StorageError<T>>>);
2832
2833impl<T> Debug for SnapshotStatsRes<T> {
2834 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2835 f.debug_struct("SnapshotStatsRes").finish_non_exhaustive()
2836 }
2837}
2838
2839impl<T> BackgroundTask<T>
2840where
2841 T: TimelyTimestamp
2842 + Lattice
2843 + Codec64
2844 + From<EpochMillis>
2845 + TimestampManipulation
2846 + Into<mz_repr::Timestamp>
2847 + Sync,
2848{
2849 async fn run(&mut self) {
2850 let mut upper_futures: FuturesUnordered<
2852 std::pin::Pin<
2853 Box<
2854 dyn Future<
2855 Output = (
2856 GlobalId,
2857 WriteHandle<SourceData, (), T, StorageDiff>,
2858 Antichain<T>,
2859 ),
2860 > + Send,
2861 >,
2862 >,
2863 > = FuturesUnordered::new();
2864
2865 let gen_upper_future =
2866 |id, mut handle: WriteHandle<_, _, _, _>, prev_upper: Antichain<T>| {
2867 let fut = async move {
2868 soft_assert_or_log!(
2869 !prev_upper.is_empty(),
2870 "cannot await progress when upper is already empty"
2871 );
2872 handle.wait_for_upper_past(&prev_upper).await;
2873 let new_upper = handle.shared_upper();
2874 (id, handle, new_upper)
2875 };
2876
2877 fut
2878 };
2879
2880 let mut txns_upper_future = match self.txns_handle.take() {
2881 Some(txns_handle) => {
2882 let upper = txns_handle.upper().clone();
2883 let txns_upper_future =
2884 gen_upper_future(GlobalId::Transient(1), txns_handle, upper);
2885 txns_upper_future.boxed()
2886 }
2887 None => async { std::future::pending().await }.boxed(),
2888 };
2889
2890 loop {
2891 tokio::select! {
2892 (id, handle, upper) = &mut txns_upper_future => {
2893 trace!("new upper from txns shard: {:?}", upper);
2894 let mut uppers = Vec::new();
2895 for id in self.txns_shards.iter() {
2896 uppers.push((*id, &upper));
2897 }
2898 self.update_write_frontiers(&uppers).await;
2899
2900 let fut = gen_upper_future(id, handle, upper);
2901 txns_upper_future = fut.boxed();
2902 }
2903 Some((id, handle, upper)) = upper_futures.next() => {
2904 if id.is_user() {
2905 trace!("new upper for collection {id}: {:?}", upper);
2906 }
2907 let current_shard = self.shard_by_id.get(&id);
2908 if let Some(shard_id) = current_shard {
2909 if shard_id == &handle.shard_id() {
2910 let uppers = &[(id, &upper)];
2913 self.update_write_frontiers(uppers).await;
2914 if !upper.is_empty() {
2915 let fut = gen_upper_future(id, handle, upper);
2916 upper_futures.push(fut.boxed());
2917 }
2918 } else {
2919 handle.expire().await;
2923 }
2924 }
2925 }
2926 cmd = self.cmds_rx.recv() => {
2927 let cmd = if let Some(cmd) = cmd {
2928 cmd
2929 } else {
2930 break;
2932 };
2933
2934 match cmd {
2935 BackgroundCmd::Register{ id, is_in_txns, write_handle, since_handle } => {
2936 debug!("registering handles for {}", id);
2937 let previous = self.shard_by_id.insert(id, write_handle.shard_id());
2938 if previous.is_some() {
2939 panic!("already registered a WriteHandle for collection {id}");
2940 }
2941
2942 let previous = self.since_handles.insert(id, since_handle);
2943 if previous.is_some() {
2944 panic!("already registered a SinceHandle for collection {id}");
2945 }
2946
2947 if is_in_txns {
2948 self.txns_shards.insert(id);
2949 } else {
2950 let upper = write_handle.upper().clone();
2951 if !upper.is_empty() {
2952 let fut = gen_upper_future(id, write_handle, upper);
2953 upper_futures.push(fut.boxed());
2954 }
2955 }
2956
2957 }
2958 BackgroundCmd::DowngradeSince(cmds) => {
2959 self.downgrade_sinces(cmds).await;
2960 }
2961 BackgroundCmd::SnapshotStats(id, as_of, tx) => {
2962 let res = match self.since_handles.get(&id) {
2968 Some(x) => {
2969 let fut: BoxFuture<
2970 'static,
2971 Result<SnapshotStats, StorageError<T>>,
2972 > = match as_of {
2973 SnapshotStatsAsOf::Direct(as_of) => {
2974 x.snapshot_stats(id, Some(as_of))
2975 }
2976 SnapshotStatsAsOf::Txns(data_snapshot) => {
2977 x.snapshot_stats_from_txn(id, data_snapshot)
2978 }
2979 };
2980 SnapshotStatsRes(fut)
2981 }
2982 None => SnapshotStatsRes(Box::pin(futures::future::ready(Err(
2983 StorageError::IdentifierMissing(id),
2984 )))),
2985 };
2986 let _ = tx.send(res);
2988 }
2989 }
2990 }
2991 Some(holds_changes) = self.holds_rx.recv() => {
2992 let mut batched_changes = BTreeMap::new();
2993 batched_changes.insert(holds_changes.0, holds_changes.1);
2994
2995 while let Ok(mut holds_changes) = self.holds_rx.try_recv() {
2996 let entry = batched_changes.entry(holds_changes.0);
2997 entry
2998 .and_modify(|existing| existing.extend(holds_changes.1.drain()))
2999 .or_insert_with(|| holds_changes.1);
3000 }
3001
3002 let mut collections = self.collections.lock().expect("lock poisoned");
3003
3004 let user_changes = batched_changes
3005 .iter()
3006 .filter(|(id, _c)| id.is_user())
3007 .map(|(id, c)| {
3008 (id.clone(), c.clone())
3009 })
3010 .collect_vec();
3011
3012 if !user_changes.is_empty() {
3013 trace!(?user_changes, "applying holds changes from channel");
3014 }
3015
3016 StorageCollectionsImpl::update_read_capabilities_inner(
3017 &self.cmds_tx,
3018 &mut collections,
3019 &mut batched_changes,
3020 );
3021 }
3022 }
3023 }
3024
3025 warn!("BackgroundTask shutting down");
3026 }
3027
3028 #[instrument(level = "debug")]
3029 async fn update_write_frontiers(&mut self, updates: &[(GlobalId, &Antichain<T>)]) {
3030 let mut read_capability_changes = BTreeMap::default();
3031
3032 let mut self_collections = self.collections.lock().expect("lock poisoned");
3033
3034 for (id, new_upper) in updates.iter() {
3035 let collection = if let Some(c) = self_collections.get_mut(id) {
3036 c
3037 } else {
3038 trace!(
3039 "Reference to absent collection {id}, due to concurrent removal of that collection"
3040 );
3041 continue;
3042 };
3043
3044 if PartialOrder::less_than(&collection.write_frontier, *new_upper) {
3045 collection.write_frontier.clone_from(new_upper);
3046 }
3047
3048 let mut new_read_capability = collection
3049 .read_policy
3050 .frontier(collection.write_frontier.borrow());
3051
3052 if id.is_user() {
3053 trace!(
3054 %id,
3055 implied_capability = ?collection.implied_capability,
3056 policy = ?collection.read_policy,
3057 write_frontier = ?collection.write_frontier,
3058 ?new_read_capability,
3059 "update_write_frontiers");
3060 }
3061
3062 if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
3063 let mut update = ChangeBatch::new();
3064 update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
3065 std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
3066 update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
3067
3068 if !update.is_empty() {
3069 read_capability_changes.insert(*id, update);
3070 }
3071 }
3072 }
3073
3074 if !read_capability_changes.is_empty() {
3075 StorageCollectionsImpl::update_read_capabilities_inner(
3076 &self.cmds_tx,
3077 &mut self_collections,
3078 &mut read_capability_changes,
3079 );
3080 }
3081 }
3082
3083 async fn downgrade_sinces(&mut self, cmds: Vec<(GlobalId, Antichain<T>)>) {
3084 for (id, new_since) in cmds {
3085 let since_handle = if let Some(c) = self.since_handles.get_mut(&id) {
3086 c
3087 } else {
3088 trace!("downgrade_sinces: reference to absent collection {id}");
3090 continue;
3091 };
3092
3093 if id.is_user() {
3094 trace!("downgrading since of {} to {:?}", id, new_since);
3095 }
3096
3097 let epoch = since_handle.opaque().clone();
3098 let result = if new_since.is_empty() {
3099 let res = Some(
3103 since_handle
3104 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3105 .await,
3106 );
3107
3108 info!(%id, "removing persist handles because the since advanced to []!");
3109
3110 let _since_handle = self.since_handles.remove(&id).expect("known to exist");
3111 let dropped_shard_id = if let Some(shard_id) = self.shard_by_id.remove(&id) {
3112 shard_id
3113 } else {
3114 panic!("missing GlobalId -> ShardId mapping for id {id}");
3115 };
3116
3117 self.txns_shards.remove(&id);
3122
3123 if !self
3124 .config
3125 .lock()
3126 .expect("lock poisoned")
3127 .parameters
3128 .finalize_shards
3129 {
3130 info!(
3131 "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3132 );
3133 return;
3134 }
3135
3136 info!(%id, %dropped_shard_id, "enqueing shard finalization due to dropped collection and dropped persist handle");
3137
3138 self.finalizable_shards.lock().insert(dropped_shard_id);
3139
3140 res
3141 } else {
3142 since_handle
3143 .maybe_compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3144 .await
3145 };
3146
3147 if let Some(Err(other_epoch)) = result {
3148 mz_ore::halt!("fenced by envd @ {other_epoch:?}. ours = {epoch:?}");
3149 }
3150 }
3151 }
3152}
3153
3154struct FinalizeShardsTaskConfig {
3155 envd_epoch: NonZeroI64,
3156 config: Arc<Mutex<StorageConfiguration>>,
3157 metrics: StorageCollectionsMetrics,
3158 finalizable_shards: Arc<ShardIdSet>,
3159 finalized_shards: Arc<ShardIdSet>,
3160 persist_location: PersistLocation,
3161 persist: Arc<PersistClientCache>,
3162 read_only: bool,
3163}
3164
3165async fn finalize_shards_task<T>(
3166 FinalizeShardsTaskConfig {
3167 envd_epoch,
3168 config,
3169 metrics,
3170 finalizable_shards,
3171 finalized_shards,
3172 persist_location,
3173 persist,
3174 read_only,
3175 }: FinalizeShardsTaskConfig,
3176) where
3177 T: TimelyTimestamp + TotalOrder + Lattice + Codec64 + Sync,
3178{
3179 if read_only {
3180 info!("disabling shard finalization in read only mode");
3181 return;
3182 }
3183
3184 let mut interval = tokio::time::interval(Duration::from_secs(5));
3185 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
3186 loop {
3187 interval.tick().await;
3188
3189 if !config
3190 .lock()
3191 .expect("lock poisoned")
3192 .parameters
3193 .finalize_shards
3194 {
3195 debug!(
3196 "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3197 );
3198 continue;
3199 }
3200
3201 let current_finalizable_shards = {
3202 finalizable_shards.lock().iter().cloned().collect_vec()
3205 };
3206
3207 if current_finalizable_shards.is_empty() {
3208 debug!("no shards to finalize");
3209 continue;
3210 }
3211
3212 debug!(?current_finalizable_shards, "attempting to finalize shards");
3213
3214 let persist_client = persist.open(persist_location.clone()).await.unwrap();
3216
3217 let metrics = &metrics;
3218 let finalizable_shards = &finalizable_shards;
3219 let finalized_shards = &finalized_shards;
3220 let persist_client = &persist_client;
3221 let diagnostics = &Diagnostics::from_purpose("finalizing shards");
3222
3223 let force_downgrade_since = STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION
3224 .get(config.lock().expect("lock poisoned").config_set());
3225
3226 let epoch = &PersistEpoch::from(envd_epoch);
3227
3228 futures::stream::iter(current_finalizable_shards.clone())
3229 .map(|shard_id| async move {
3230 let persist_client = persist_client.clone();
3231 let diagnostics = diagnostics.clone();
3232 let epoch = epoch.clone();
3233
3234 metrics.finalization_started.inc();
3235
3236 let is_finalized = persist_client
3237 .is_finalized::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
3238 .await
3239 .expect("invalid persist usage");
3240
3241 if is_finalized {
3242 debug!(%shard_id, "shard is already finalized!");
3243 Some(shard_id)
3244 } else {
3245 debug!(%shard_id, "finalizing shard");
3246 let finalize = || async move {
3247 let diagnostics = Diagnostics::from_purpose("finalizing shards");
3249
3250 let mut write_handle: WriteHandle<SourceData, (), T, StorageDiff> =
3253 persist_client
3254 .open_writer(
3255 shard_id,
3256 Arc::new(RelationDesc::empty()),
3257 Arc::new(UnitSchema),
3258 diagnostics,
3259 )
3260 .await
3261 .expect("invalid persist usage");
3262 write_handle.advance_upper(&Antichain::new()).await;
3263 write_handle.expire().await;
3264
3265 if force_downgrade_since {
3266 let mut since_handle: SinceHandle<
3267 SourceData,
3268 (),
3269 T,
3270 StorageDiff,
3271 PersistEpoch,
3272 > = persist_client
3273 .open_critical_since(
3274 shard_id,
3275 PersistClient::CONTROLLER_CRITICAL_SINCE,
3276 Diagnostics::from_purpose("finalizing shards"),
3277 )
3278 .await
3279 .expect("invalid persist usage");
3280 let handle_epoch = since_handle.opaque().clone();
3281 let our_epoch = epoch.clone();
3282 let epoch = if our_epoch.0 > handle_epoch.0 {
3283 handle_epoch
3286 } else {
3287 our_epoch
3292 };
3293 let new_since = Antichain::new();
3294 let downgrade = since_handle
3295 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3296 .await;
3297 if let Err(e) = downgrade {
3298 warn!("tried to finalize a shard with an advancing epoch: {e:?}");
3299 return Ok(());
3300 }
3301 }
3304
3305 persist_client
3306 .finalize_shard::<SourceData, (), T, StorageDiff>(
3307 shard_id,
3308 Diagnostics::from_purpose("finalizing shards"),
3309 )
3310 .await
3311 };
3312
3313 match finalize().await {
3314 Err(e) => {
3315 warn!("error during finalization of shard {shard_id}: {e:?}");
3318 None
3319 }
3320 Ok(()) => {
3321 debug!(%shard_id, "finalize success!");
3322 Some(shard_id)
3323 }
3324 }
3325 }
3326 })
3327 .buffer_unordered(10)
3332 .for_each(|shard_id| async move {
3336 match shard_id {
3337 None => metrics.finalization_failed.inc(),
3338 Some(shard_id) => {
3339 {
3346 let mut finalizable_shards = finalizable_shards.lock();
3347 let mut finalized_shards = finalized_shards.lock();
3348 finalizable_shards.remove(&shard_id);
3349 finalized_shards.insert(shard_id);
3350 }
3351
3352 metrics.finalization_succeeded.inc();
3353 }
3354 }
3355 })
3356 .await;
3357
3358 debug!("done finalizing shards");
3359 }
3360}
3361
3362#[derive(Debug)]
3363pub(crate) enum SnapshotStatsAsOf<T: TimelyTimestamp + Lattice + Codec64> {
3364 Direct(Antichain<T>),
3367 Txns(DataSnapshot<T>),
3370}
3371
3372#[cfg(test)]
3373mod tests {
3374 use std::str::FromStr;
3375 use std::sync::Arc;
3376
3377 use mz_build_info::DUMMY_BUILD_INFO;
3378 use mz_dyncfg::ConfigSet;
3379 use mz_ore::assert_err;
3380 use mz_ore::metrics::{MetricsRegistry, UIntGauge};
3381 use mz_ore::now::SYSTEM_TIME;
3382 use mz_ore::url::SensitiveUrl;
3383 use mz_persist_client::cache::PersistClientCache;
3384 use mz_persist_client::cfg::PersistConfig;
3385 use mz_persist_client::rpc::PubSubClientConnection;
3386 use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
3387 use mz_persist_types::codec_impls::UnitSchema;
3388 use mz_repr::{RelationDesc, Row};
3389 use mz_secrets::InMemorySecretsController;
3390
3391 use super::*;
3392
3393 #[mz_ore::test(tokio::test)]
3394 #[cfg_attr(miri, ignore)] async fn test_snapshot_stats(&self) {
3396 let persist_location = PersistLocation {
3397 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3398 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3399 };
3400 let persist_client = PersistClientCache::new(
3401 PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
3402 &MetricsRegistry::new(),
3403 |_, _| PubSubClientConnection::noop(),
3404 );
3405 let persist_client = Arc::new(persist_client);
3406
3407 let (cmds_tx, mut background_task) =
3408 BackgroundTask::new_for_test(persist_location.clone(), Arc::clone(&persist_client));
3409 let background_task =
3410 mz_ore::task::spawn(|| "storage_collections::background_task", async move {
3411 background_task.run().await
3412 });
3413
3414 let persist = persist_client.open(persist_location).await.unwrap();
3415
3416 let shard_id = ShardId::new();
3417 let since_handle = persist
3418 .open_critical_since(
3419 shard_id,
3420 PersistClient::CONTROLLER_CRITICAL_SINCE,
3421 Diagnostics::for_tests(),
3422 )
3423 .await
3424 .unwrap();
3425 let write_handle = persist
3426 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3427 shard_id,
3428 Arc::new(RelationDesc::empty()),
3429 Arc::new(UnitSchema),
3430 Diagnostics::for_tests(),
3431 )
3432 .await
3433 .unwrap();
3434
3435 cmds_tx
3436 .send(BackgroundCmd::Register {
3437 id: GlobalId::User(1),
3438 is_in_txns: false,
3439 since_handle: SinceHandleWrapper::Critical(since_handle),
3440 write_handle,
3441 })
3442 .unwrap();
3443
3444 let mut write_handle = persist
3445 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3446 shard_id,
3447 Arc::new(RelationDesc::empty()),
3448 Arc::new(UnitSchema),
3449 Diagnostics::for_tests(),
3450 )
3451 .await
3452 .unwrap();
3453
3454 let stats =
3456 snapshot_stats(&cmds_tx, GlobalId::User(2), Antichain::from_elem(0.into())).await;
3457 assert_err!(stats);
3458
3459 let stats_fut = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3461 assert_none!(stats_fut.now_or_never());
3462
3463 let stats_ts1_fut =
3465 snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3466
3467 let data = (
3469 (SourceData(Ok(Row::default())), ()),
3470 mz_repr::Timestamp::from(0),
3471 1i64,
3472 );
3473 let () = write_handle
3474 .compare_and_append(
3475 &[data],
3476 Antichain::from_elem(0.into()),
3477 Antichain::from_elem(1.into()),
3478 )
3479 .await
3480 .unwrap()
3481 .unwrap();
3482
3483 let stats = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(0.into()))
3485 .await
3486 .unwrap();
3487 assert_eq!(stats.num_updates, 1);
3488
3489 let data = (
3491 (SourceData(Ok(Row::default())), ()),
3492 mz_repr::Timestamp::from(1),
3493 1i64,
3494 );
3495 let () = write_handle
3496 .compare_and_append(
3497 &[data],
3498 Antichain::from_elem(1.into()),
3499 Antichain::from_elem(2.into()),
3500 )
3501 .await
3502 .unwrap()
3503 .unwrap();
3504
3505 let stats = stats_ts1_fut.await.unwrap();
3506 assert_eq!(stats.num_updates, 2);
3507
3508 drop(background_task);
3510 }
3511
3512 async fn snapshot_stats<T: TimelyTimestamp + Lattice + Codec64>(
3513 cmds_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
3514 id: GlobalId,
3515 as_of: Antichain<T>,
3516 ) -> Result<SnapshotStats, StorageError<T>> {
3517 let (tx, rx) = oneshot::channel();
3518 cmds_tx
3519 .send(BackgroundCmd::SnapshotStats(
3520 id,
3521 SnapshotStatsAsOf::Direct(as_of),
3522 tx,
3523 ))
3524 .unwrap();
3525 let res = rx.await.expect("BackgroundTask should be live").0;
3526
3527 res.await
3528 }
3529
3530 impl<T: TimelyTimestamp + Lattice + Codec64> BackgroundTask<T> {
3531 fn new_for_test(
3532 _persist_location: PersistLocation,
3533 _persist_client: Arc<PersistClientCache>,
3534 ) -> (mpsc::UnboundedSender<BackgroundCmd<T>>, Self) {
3535 let (cmds_tx, cmds_rx) = mpsc::unbounded_channel();
3536 let (_holds_tx, holds_rx) = mpsc::unbounded_channel();
3537 let connection_context =
3538 ConnectionContext::for_tests(Arc::new(InMemorySecretsController::new()));
3539
3540 let task = Self {
3541 config: Arc::new(Mutex::new(StorageConfiguration::new(
3542 connection_context,
3543 ConfigSet::default(),
3544 ))),
3545 cmds_tx: cmds_tx.clone(),
3546 cmds_rx,
3547 holds_rx,
3548 finalizable_shards: Arc::new(ShardIdSet::new(
3549 UIntGauge::new("finalizable_shards", "dummy gauge for tests").unwrap(),
3550 )),
3551 collections: Arc::new(Mutex::new(BTreeMap::new())),
3552 shard_by_id: BTreeMap::new(),
3553 since_handles: BTreeMap::new(),
3554 txns_handle: None,
3555 txns_shards: BTreeSet::new(),
3556 };
3557
3558 (cmds_tx, task)
3559 }
3560 }
3561}