1use std::cmp::Reverse;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::num::NonZeroI64;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use async_trait::async_trait;
20use differential_dataflow::lattice::Lattice;
21use futures::future::BoxFuture;
22use futures::stream::{BoxStream, FuturesUnordered};
23use futures::{Future, FutureExt, StreamExt};
24use itertools::Itertools;
25use mz_ore::collections::CollectionExt;
26use mz_ore::metrics::MetricsRegistry;
27use mz_ore::now::{EpochMillis, NowFn};
28use mz_ore::task::AbortOnDropHandle;
29use mz_ore::{assert_none, instrument, soft_assert_or_log};
30use mz_persist_client::cache::PersistClientCache;
31use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
32use mz_persist_client::critical::SinceHandle;
33use mz_persist_client::read::{Cursor, ReadHandle};
34use mz_persist_client::schema::CaESchema;
35use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
36use mz_persist_client::write::WriteHandle;
37use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
38use mz_persist_types::Codec64;
39use mz_persist_types::codec_impls::UnitSchema;
40use mz_persist_types::txn::TxnsCodec;
41use mz_repr::{GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
42use mz_storage_types::StorageDiff;
43use mz_storage_types::configuration::StorageConfiguration;
44use mz_storage_types::connections::ConnectionContext;
45use mz_storage_types::controller::{CollectionMetadata, StorageError, TxnsCodecRow};
46use mz_storage_types::dyncfgs::STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION;
47use mz_storage_types::errors::CollectionMissing;
48use mz_storage_types::parameters::StorageParameters;
49use mz_storage_types::read_holds::ReadHold;
50use mz_storage_types::read_policy::ReadPolicy;
51use mz_storage_types::sources::{GenericSourceConnection, SourceData, SourceEnvelope, Timeline};
52use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
53use mz_txn_wal::metrics::Metrics as TxnMetrics;
54use mz_txn_wal::txn_read::{DataSnapshot, TxnsRead};
55use mz_txn_wal::txns::TxnsHandle;
56use timely::PartialOrder;
57use timely::order::TotalOrder;
58use timely::progress::frontier::MutableAntichain;
59use timely::progress::{Antichain, ChangeBatch, Timestamp as TimelyTimestamp};
60use tokio::sync::{mpsc, oneshot};
61use tokio::time::MissedTickBehavior;
62use tracing::{debug, info, trace, warn};
63
64use crate::client::TimestamplessUpdateBuilder;
65use crate::controller::{
66 CollectionDescription, DataSource, PersistEpoch, StorageMetadata, StorageTxn,
67};
68use crate::storage_collections::metrics::{ShardIdSet, StorageCollectionsMetrics};
69
70mod metrics;
71
72#[async_trait]
86pub trait StorageCollections: Debug + Sync {
87 type Timestamp: TimelyTimestamp;
88
89 async fn initialize_state(
96 &self,
97 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
98 init_ids: BTreeSet<GlobalId>,
99 ) -> Result<(), StorageError<Self::Timestamp>>;
100
101 fn update_parameters(&self, config_params: StorageParameters);
103
104 fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing>;
106
107 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
113
114 fn collection_frontiers(
116 &self,
117 id: GlobalId,
118 ) -> Result<CollectionFrontiers<Self::Timestamp>, CollectionMissing> {
119 let frontiers = self
120 .collections_frontiers(vec![id])?
121 .expect_element(|| "known to exist");
122
123 Ok(frontiers)
124 }
125
126 fn collections_frontiers(
129 &self,
130 id: Vec<GlobalId>,
131 ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, CollectionMissing>;
132
133 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>>;
138
139 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
142
143 async fn snapshot_stats(
146 &self,
147 id: GlobalId,
148 as_of: Antichain<Self::Timestamp>,
149 ) -> Result<SnapshotStats, StorageError<Self::Timestamp>>;
150
151 async fn snapshot_parts_stats(
160 &self,
161 id: GlobalId,
162 as_of: Antichain<Self::Timestamp>,
163 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>;
164
165 fn snapshot(
167 &self,
168 id: GlobalId,
169 as_of: Self::Timestamp,
170 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>;
171
172 async fn snapshot_latest(
175 &self,
176 id: GlobalId,
177 ) -> Result<Vec<Row>, StorageError<Self::Timestamp>>;
178
179 fn snapshot_cursor(
181 &self,
182 id: GlobalId,
183 as_of: Self::Timestamp,
184 ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
185 where
186 Self::Timestamp: Codec64 + TimelyTimestamp + Lattice;
187
188 fn snapshot_and_stream(
193 &self,
194 id: GlobalId,
195 as_of: Self::Timestamp,
196 ) -> BoxFuture<
197 'static,
198 Result<
199 BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
200 StorageError<Self::Timestamp>,
201 >,
202 >;
203
204 fn create_update_builder(
207 &self,
208 id: GlobalId,
209 ) -> BoxFuture<
210 'static,
211 Result<
212 TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
213 StorageError<Self::Timestamp>,
214 >,
215 >
216 where
217 Self::Timestamp: Lattice + Codec64;
218
219 async fn prepare_state(
225 &self,
226 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
227 ids_to_add: BTreeSet<GlobalId>,
228 ids_to_drop: BTreeSet<GlobalId>,
229 ids_to_register: BTreeMap<GlobalId, ShardId>,
230 ) -> Result<(), StorageError<Self::Timestamp>>;
231
232 async fn create_collections_for_bootstrap(
258 &self,
259 storage_metadata: &StorageMetadata,
260 register_ts: Option<Self::Timestamp>,
261 collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
262 migrated_storage_collections: &BTreeSet<GlobalId>,
263 ) -> Result<(), StorageError<Self::Timestamp>>;
264
265 async fn alter_table_desc(
267 &self,
268 existing_collection: GlobalId,
269 new_collection: GlobalId,
270 new_desc: RelationDesc,
271 expected_version: RelationVersion,
272 ) -> Result<(), StorageError<Self::Timestamp>>;
273
274 fn drop_collections_unvalidated(
286 &self,
287 storage_metadata: &StorageMetadata,
288 identifiers: Vec<GlobalId>,
289 );
290
291 fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>);
305
306 fn acquire_read_holds(
309 &self,
310 desired_holds: Vec<GlobalId>,
311 ) -> Result<Vec<ReadHold<Self::Timestamp>>, CollectionMissing>;
312
313 fn determine_time_dependence(
316 &self,
317 id: GlobalId,
318 ) -> Result<Option<TimeDependence>, TimeDependenceError>;
319
320 fn dump(&self) -> Result<serde_json::Value, anyhow::Error>;
322}
323
324pub struct SnapshotCursor<T: Codec64 + TimelyTimestamp + Lattice> {
327 pub _read_handle: ReadHandle<SourceData, (), T, StorageDiff>,
330 pub cursor: Cursor<SourceData, (), T, StorageDiff>,
331}
332
333impl<T: Codec64 + TimelyTimestamp + Lattice + Sync> SnapshotCursor<T> {
334 pub async fn next(
335 &mut self,
336 ) -> Option<impl Iterator<Item = (SourceData, T, StorageDiff)> + Sized + '_> {
337 let iter = self.cursor.next().await?;
338 Some(iter.map(|((k, ()), t, d)| (k, t, d)))
339 }
340}
341
342#[derive(Debug)]
344pub struct CollectionFrontiers<T> {
345 pub id: GlobalId,
347
348 pub write_frontier: Antichain<T>,
350
351 pub implied_capability: Antichain<T>,
358
359 pub read_capabilities: Antichain<T>,
362}
363
364#[derive(Debug, Clone)]
367pub struct StorageCollectionsImpl<
368 T: TimelyTimestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
369> {
370 envd_epoch: NonZeroI64,
373
374 read_only: bool,
380
381 finalizable_shards: Arc<ShardIdSet>,
384
385 finalized_shards: Arc<ShardIdSet>,
390
391 collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
393
394 txns_read: TxnsRead<T>,
396
397 config: Arc<Mutex<StorageConfiguration>>,
399
400 initial_txn_upper: Antichain<T>,
409
410 persist_location: PersistLocation,
412
413 persist: Arc<PersistClientCache>,
415
416 cmd_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
418
419 holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<T>)>,
421
422 _background_task: Arc<AbortOnDropHandle<()>>,
424 _finalize_shards_task: Arc<AbortOnDropHandle<()>>,
425}
426
427impl<T> StorageCollectionsImpl<T>
437where
438 T: TimelyTimestamp
439 + Lattice
440 + Codec64
441 + From<EpochMillis>
442 + TimestampManipulation
443 + Into<mz_repr::Timestamp>
444 + Sync,
445{
446 pub async fn new(
454 persist_location: PersistLocation,
455 persist_clients: Arc<PersistClientCache>,
456 metrics_registry: &MetricsRegistry,
457 _now: NowFn,
458 txns_metrics: Arc<TxnMetrics>,
459 envd_epoch: NonZeroI64,
460 read_only: bool,
461 connection_context: ConnectionContext,
462 txn: &dyn StorageTxn<T>,
463 ) -> Self {
464 let metrics = StorageCollectionsMetrics::register_into(metrics_registry);
465
466 let txns_id = txn
470 .get_txn_wal_shard()
471 .expect("must call prepare initialization before creating StorageCollections");
472
473 let txns_client = persist_clients
474 .open(persist_location.clone())
475 .await
476 .expect("location should be valid");
477
478 let _txns_handle: TxnsHandle<SourceData, (), T, StorageDiff, PersistEpoch, TxnsCodecRow> =
481 TxnsHandle::open(
482 T::minimum(),
483 txns_client.clone(),
484 txns_client.dyncfgs().clone(),
485 Arc::clone(&txns_metrics),
486 txns_id,
487 )
488 .await;
489
490 let (txns_key_schema, txns_val_schema) = TxnsCodecRow::schemas();
492 let mut txns_write = txns_client
493 .open_writer(
494 txns_id,
495 Arc::new(txns_key_schema),
496 Arc::new(txns_val_schema),
497 Diagnostics {
498 shard_name: "txns".to_owned(),
499 handle_purpose: "commit txns".to_owned(),
500 },
501 )
502 .await
503 .expect("txns schema shouldn't change");
504
505 let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
506
507 let collections = Arc::new(std::sync::Mutex::new(BTreeMap::default()));
508 let finalizable_shards =
509 Arc::new(ShardIdSet::new(metrics.finalization_outstanding.clone()));
510 let finalized_shards =
511 Arc::new(ShardIdSet::new(metrics.finalization_pending_commit.clone()));
512 let config = Arc::new(Mutex::new(StorageConfiguration::new(
513 connection_context,
514 mz_dyncfgs::all_dyncfgs(),
515 )));
516
517 let initial_txn_upper = txns_write.fetch_recent_upper().await.to_owned();
518
519 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
520 let (holds_tx, holds_rx) = mpsc::unbounded_channel();
521 let mut background_task = BackgroundTask {
522 config: Arc::clone(&config),
523 cmds_tx: cmd_tx.clone(),
524 cmds_rx: cmd_rx,
525 holds_rx,
526 collections: Arc::clone(&collections),
527 finalizable_shards: Arc::clone(&finalizable_shards),
528 shard_by_id: BTreeMap::new(),
529 since_handles: BTreeMap::new(),
530 txns_handle: Some(txns_write),
531 txns_shards: Default::default(),
532 };
533
534 let background_task =
535 mz_ore::task::spawn(|| "storage_collections::background_task", async move {
536 background_task.run().await
537 });
538
539 let finalize_shards_task = mz_ore::task::spawn(
540 || "storage_collections::finalize_shards_task",
541 finalize_shards_task::<T>(FinalizeShardsTaskConfig {
542 envd_epoch: envd_epoch.clone(),
543 config: Arc::clone(&config),
544 metrics,
545 finalizable_shards: Arc::clone(&finalizable_shards),
546 finalized_shards: Arc::clone(&finalized_shards),
547 persist_location: persist_location.clone(),
548 persist: Arc::clone(&persist_clients),
549 read_only,
550 }),
551 );
552
553 Self {
554 finalizable_shards,
555 finalized_shards,
556 collections,
557 txns_read,
558 envd_epoch,
559 read_only,
560 config,
561 initial_txn_upper,
562 persist_location,
563 persist: persist_clients,
564 cmd_tx,
565 holds_tx,
566 _background_task: Arc::new(background_task.abort_on_drop()),
567 _finalize_shards_task: Arc::new(finalize_shards_task.abort_on_drop()),
568 }
569 }
570
571 async fn open_data_handles(
579 &self,
580 id: &GlobalId,
581 shard: ShardId,
582 since: Option<&Antichain<T>>,
583 relation_desc: RelationDesc,
584 persist_client: &PersistClient,
585 ) -> (
586 WriteHandle<SourceData, (), T, StorageDiff>,
587 SinceHandleWrapper<T>,
588 ) {
589 let since_handle = if self.read_only {
590 let read_handle = self
591 .open_leased_handle(id, shard, relation_desc.clone(), since, persist_client)
592 .await;
593 SinceHandleWrapper::Leased(read_handle)
594 } else {
595 persist_client
598 .upgrade_version::<SourceData, (), T, StorageDiff>(
599 shard,
600 Diagnostics {
601 shard_name: id.to_string(),
602 handle_purpose: format!("controller data for {}", id),
603 },
604 )
605 .await
606 .expect("invalid persist usage");
607
608 let since_handle = self
609 .open_critical_handle(id, shard, since, persist_client)
610 .await;
611
612 SinceHandleWrapper::Critical(since_handle)
613 };
614
615 let mut write_handle = self
616 .open_write_handle(id, shard, relation_desc, persist_client)
617 .await;
618
619 write_handle.fetch_recent_upper().await;
630
631 (write_handle, since_handle)
632 }
633
634 async fn open_write_handle(
636 &self,
637 id: &GlobalId,
638 shard: ShardId,
639 relation_desc: RelationDesc,
640 persist_client: &PersistClient,
641 ) -> WriteHandle<SourceData, (), T, StorageDiff> {
642 let diagnostics = Diagnostics {
643 shard_name: id.to_string(),
644 handle_purpose: format!("controller data for {}", id),
645 };
646
647 let write = persist_client
648 .open_writer(
649 shard,
650 Arc::new(relation_desc),
651 Arc::new(UnitSchema),
652 diagnostics.clone(),
653 )
654 .await
655 .expect("invalid persist usage");
656
657 write
658 }
659
660 async fn open_critical_handle(
668 &self,
669 id: &GlobalId,
670 shard: ShardId,
671 since: Option<&Antichain<T>>,
672 persist_client: &PersistClient,
673 ) -> SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch> {
674 tracing::debug!(%id, ?since, "opening critical handle");
675
676 assert!(
677 !self.read_only,
678 "attempting to open critical SinceHandle in read-only mode"
679 );
680
681 let diagnostics = Diagnostics {
682 shard_name: id.to_string(),
683 handle_purpose: format!("controller data for {}", id),
684 };
685
686 let since_handle = {
689 let mut handle: SinceHandle<_, _, _, _, PersistEpoch> = persist_client
692 .open_critical_since(
693 shard,
694 PersistClient::CONTROLLER_CRITICAL_SINCE,
695 diagnostics.clone(),
696 )
697 .await
698 .expect("invalid persist usage");
699
700 let provided_since = match since {
704 Some(since) => since,
705 None => &Antichain::from_elem(T::minimum()),
706 };
707 let since = handle.since().join(provided_since);
708
709 let our_epoch = self.envd_epoch;
710
711 loop {
712 let current_epoch: PersistEpoch = handle.opaque().clone();
713
714 let unchecked_success = current_epoch.0.map(|e| e <= our_epoch).unwrap_or(true);
716
717 if unchecked_success {
718 let checked_success = handle
721 .compare_and_downgrade_since(
722 ¤t_epoch,
723 (&PersistEpoch::from(our_epoch), &since),
724 )
725 .await
726 .is_ok();
727 if checked_success {
728 break handle;
729 }
730 } else {
731 mz_ore::halt!("fenced by envd @ {current_epoch:?}. ours = {our_epoch}");
732 }
733 }
734 };
735
736 since_handle
737 }
738
739 async fn open_leased_handle(
745 &self,
746 id: &GlobalId,
747 shard: ShardId,
748 relation_desc: RelationDesc,
749 since: Option<&Antichain<T>>,
750 persist_client: &PersistClient,
751 ) -> ReadHandle<SourceData, (), T, StorageDiff> {
752 tracing::debug!(%id, ?since, "opening leased handle");
753
754 let diagnostics = Diagnostics {
755 shard_name: id.to_string(),
756 handle_purpose: format!("controller data for {}", id),
757 };
758
759 let use_critical_since = false;
760 let mut handle: ReadHandle<_, _, _, _> = persist_client
761 .open_leased_reader(
762 shard,
763 Arc::new(relation_desc),
764 Arc::new(UnitSchema),
765 diagnostics.clone(),
766 use_critical_since,
767 )
768 .await
769 .expect("invalid persist usage");
770
771 let provided_since = match since {
775 Some(since) => since,
776 None => &Antichain::from_elem(T::minimum()),
777 };
778 let since = handle.since().join(provided_since);
779
780 handle.downgrade_since(&since).await;
781
782 handle
783 }
784
785 fn register_handles(
786 &self,
787 id: GlobalId,
788 is_in_txns: bool,
789 since_handle: SinceHandleWrapper<T>,
790 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
791 ) {
792 self.send(BackgroundCmd::Register {
793 id,
794 is_in_txns,
795 since_handle,
796 write_handle,
797 });
798 }
799
800 fn send(&self, cmd: BackgroundCmd<T>) {
801 let _ = self.cmd_tx.send(cmd);
802 }
803
804 async fn snapshot_stats_inner(
805 &self,
806 id: GlobalId,
807 as_of: SnapshotStatsAsOf<T>,
808 ) -> Result<SnapshotStats, StorageError<T>> {
809 let (tx, rx) = oneshot::channel();
816 self.send(BackgroundCmd::SnapshotStats(id, as_of, tx));
817 rx.await.expect("BackgroundTask should be live").0.await
818 }
819
820 fn install_collection_dependency_read_holds_inner(
826 &self,
827 self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
828 id: GlobalId,
829 ) -> Result<(), StorageError<T>> {
830 let (deps, collection_implied_capability) = match self_collections.get(&id) {
831 Some(CollectionState {
832 storage_dependencies: deps,
833 implied_capability,
834 ..
835 }) => (deps.clone(), implied_capability),
836 _ => return Ok(()),
837 };
838
839 for dep in deps.iter() {
840 let dep_collection = self_collections
841 .get(dep)
842 .ok_or(StorageError::IdentifierMissing(id))?;
843
844 mz_ore::soft_assert_or_log!(
845 PartialOrder::less_equal(
846 &dep_collection.implied_capability,
847 collection_implied_capability
848 ),
849 "dependency since ({dep}@{:?}) cannot be in advance of dependent's since ({id}@{:?})",
850 dep_collection.implied_capability,
851 collection_implied_capability,
852 );
853 }
854
855 self.install_read_capabilities_inner(
856 self_collections,
857 id,
858 &deps,
859 collection_implied_capability.clone(),
860 )?;
861
862 Ok(())
863 }
864
865 fn determine_collection_dependencies(
867 self_collections: &BTreeMap<GlobalId, CollectionState<T>>,
868 source_id: GlobalId,
869 collection_desc: &CollectionDescription<T>,
870 ) -> Result<Vec<GlobalId>, StorageError<T>> {
871 let mut dependencies = Vec::new();
872
873 if let Some(id) = collection_desc.primary {
874 dependencies.push(id);
875 }
876
877 match &collection_desc.data_source {
878 DataSource::Introspection(_)
879 | DataSource::Webhook
880 | DataSource::Table
881 | DataSource::Progress
882 | DataSource::Other => (),
883 DataSource::IngestionExport {
884 ingestion_id,
885 data_config,
886 ..
887 } => {
888 let source = self_collections
891 .get(ingestion_id)
892 .ok_or(StorageError::IdentifierMissing(*ingestion_id))?;
893 let Some(remap_collection_id) = &source.ingestion_remap_collection_id else {
894 panic!("SourceExport must refer to a primary source that already exists");
895 };
896
897 match data_config.envelope {
898 SourceEnvelope::CdcV2 => (),
899 _ => dependencies.push(*remap_collection_id),
900 }
901 }
902 DataSource::Ingestion(ingestion) => {
904 if ingestion.remap_collection_id != source_id {
905 dependencies.push(ingestion.remap_collection_id);
906 }
907 }
908 DataSource::Sink { desc } => dependencies.push(desc.sink.from),
909 }
910
911 Ok(dependencies)
912 }
913
914 #[instrument(level = "debug")]
916 fn install_read_capabilities_inner(
917 &self,
918 self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
919 from_id: GlobalId,
920 storage_dependencies: &[GlobalId],
921 read_capability: Antichain<T>,
922 ) -> Result<(), StorageError<T>> {
923 let mut changes = ChangeBatch::new();
924 for time in read_capability.iter() {
925 changes.update(time.clone(), 1);
926 }
927
928 if tracing::span_enabled!(tracing::Level::TRACE) {
929 let user_capabilities = self_collections
931 .iter_mut()
932 .filter(|(id, _c)| id.is_user())
933 .map(|(id, c)| {
934 let updates = c.read_capabilities.updates().cloned().collect_vec();
935 (*id, c.implied_capability.clone(), updates)
936 })
937 .collect_vec();
938
939 trace!(
940 %from_id,
941 ?storage_dependencies,
942 ?read_capability,
943 ?user_capabilities,
944 "install_read_capabilities_inner");
945 }
946
947 let mut storage_read_updates = storage_dependencies
948 .iter()
949 .map(|id| (*id, changes.clone()))
950 .collect();
951
952 StorageCollectionsImpl::update_read_capabilities_inner(
953 &self.cmd_tx,
954 self_collections,
955 &mut storage_read_updates,
956 );
957
958 if tracing::span_enabled!(tracing::Level::TRACE) {
959 let user_capabilities = self_collections
961 .iter_mut()
962 .filter(|(id, _c)| id.is_user())
963 .map(|(id, c)| {
964 let updates = c.read_capabilities.updates().cloned().collect_vec();
965 (*id, c.implied_capability.clone(), updates)
966 })
967 .collect_vec();
968
969 trace!(
970 %from_id,
971 ?storage_dependencies,
972 ?read_capability,
973 ?user_capabilities,
974 "after install_read_capabilities_inner!");
975 }
976
977 Ok(())
978 }
979
980 async fn recent_upper(&self, id: GlobalId) -> Result<Antichain<T>, StorageError<T>> {
981 let metadata = &self.collection_metadata(id)?;
982 let persist_client = self
983 .persist
984 .open(metadata.persist_location.clone())
985 .await
986 .unwrap();
987 let diagnostics = Diagnostics {
990 shard_name: id.to_string(),
991 handle_purpose: format!("controller data for {}", id),
992 };
993 let write = persist_client
996 .open_writer::<SourceData, (), T, StorageDiff>(
997 metadata.data_shard,
998 Arc::new(metadata.relation_desc.clone()),
999 Arc::new(UnitSchema),
1000 diagnostics.clone(),
1001 )
1002 .await
1003 .expect("invalid persist usage");
1004 Ok(write.shared_upper())
1005 }
1006
1007 async fn read_handle_for_snapshot(
1008 persist: Arc<PersistClientCache>,
1009 metadata: &CollectionMetadata,
1010 id: GlobalId,
1011 ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
1012 let persist_client = persist
1013 .open(metadata.persist_location.clone())
1014 .await
1015 .unwrap();
1016
1017 let read_handle = persist_client
1023 .open_leased_reader::<SourceData, (), _, _>(
1024 metadata.data_shard,
1025 Arc::new(metadata.relation_desc.clone()),
1026 Arc::new(UnitSchema),
1027 Diagnostics {
1028 shard_name: id.to_string(),
1029 handle_purpose: format!("snapshot {}", id),
1030 },
1031 USE_CRITICAL_SINCE_SNAPSHOT.get(&persist.cfg),
1032 )
1033 .await
1034 .expect("invalid persist usage");
1035 Ok(read_handle)
1036 }
1037
1038 fn snapshot(
1044 &self,
1045 id: GlobalId,
1046 as_of: T,
1047 txns_read: &TxnsRead<T>,
1048 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<T>>>
1049 where
1050 T: Codec64 + From<EpochMillis> + TimestampManipulation,
1051 {
1052 let metadata = match self.collection_metadata(id) {
1053 Ok(metadata) => metadata.clone(),
1054 Err(e) => return async { Err(e.into()) }.boxed(),
1055 };
1056 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1057 assert_eq!(txns_id, txns_read.txns_id());
1058 txns_read.clone()
1059 });
1060 let persist = Arc::clone(&self.persist);
1061 async move {
1062 let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1063 let contents = match txns_read {
1064 None => {
1065 read_handle
1067 .snapshot_and_fetch(Antichain::from_elem(as_of))
1068 .await
1069 }
1070 Some(txns_read) => {
1071 txns_read.update_gt(as_of.clone()).await;
1085 let data_snapshot = txns_read
1086 .data_snapshot(metadata.data_shard, as_of.clone())
1087 .await;
1088 data_snapshot.snapshot_and_fetch(&mut read_handle).await
1089 }
1090 };
1091 match contents {
1092 Ok(contents) => {
1093 let mut snapshot = Vec::with_capacity(contents.len());
1094 for ((data, _), _, diff) in contents {
1095 let row = data.0?;
1098 snapshot.push((row, diff));
1099 }
1100 Ok(snapshot)
1101 }
1102 Err(_) => Err(StorageError::ReadBeforeSince(id)),
1103 }
1104 }
1105 .boxed()
1106 }
1107
1108 fn snapshot_and_stream(
1109 &self,
1110 id: GlobalId,
1111 as_of: T,
1112 txns_read: &TxnsRead<T>,
1113 ) -> BoxFuture<'static, Result<BoxStream<'static, (SourceData, T, StorageDiff)>, StorageError<T>>>
1114 {
1115 use futures::stream::StreamExt;
1116
1117 let metadata = match self.collection_metadata(id) {
1118 Ok(metadata) => metadata.clone(),
1119 Err(e) => return async { Err(e.into()) }.boxed(),
1120 };
1121 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1122 assert_eq!(txns_id, txns_read.txns_id());
1123 txns_read.clone()
1124 });
1125 let persist = Arc::clone(&self.persist);
1126
1127 async move {
1128 let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1129 let stream = match txns_read {
1130 None => {
1131 read_handle
1133 .snapshot_and_stream(Antichain::from_elem(as_of))
1134 .await
1135 .map_err(|_| StorageError::ReadBeforeSince(id))?
1136 .boxed()
1137 }
1138 Some(txns_read) => {
1139 txns_read.update_gt(as_of.clone()).await;
1140 let data_snapshot = txns_read
1141 .data_snapshot(metadata.data_shard, as_of.clone())
1142 .await;
1143 data_snapshot
1144 .snapshot_and_stream(&mut read_handle)
1145 .await
1146 .map_err(|_| StorageError::ReadBeforeSince(id))?
1147 .boxed()
1148 }
1149 };
1150
1151 let stream = stream.map(|((data, _v), t, d)| (data, t, d)).boxed();
1153 Ok(stream)
1154 }
1155 .boxed()
1156 }
1157
1158 fn set_read_policies_inner(
1159 &self,
1160 collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1161 policies: Vec<(GlobalId, ReadPolicy<T>)>,
1162 ) {
1163 trace!("set_read_policies: {:?}", policies);
1164
1165 let mut read_capability_changes = BTreeMap::default();
1166
1167 for (id, policy) in policies.into_iter() {
1168 let collection = match collections.get_mut(&id) {
1169 Some(c) => c,
1170 None => {
1171 panic!("Reference to absent collection {id}");
1172 }
1173 };
1174
1175 let mut new_read_capability = policy.frontier(collection.write_frontier.borrow());
1176
1177 if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
1178 let mut update = ChangeBatch::new();
1179 update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
1180 std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
1181 update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
1182 if !update.is_empty() {
1183 read_capability_changes.insert(id, update);
1184 }
1185 }
1186
1187 collection.read_policy = policy;
1188 }
1189
1190 for (id, changes) in read_capability_changes.iter() {
1191 if id.is_user() {
1192 trace!(%id, ?changes, "in set_read_policies, capability changes");
1193 }
1194 }
1195
1196 if !read_capability_changes.is_empty() {
1197 StorageCollectionsImpl::update_read_capabilities_inner(
1198 &self.cmd_tx,
1199 collections,
1200 &mut read_capability_changes,
1201 );
1202 }
1203 }
1204
1205 fn update_read_capabilities_inner(
1209 cmd_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
1210 collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1211 updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>,
1212 ) {
1213 let mut collections_net = BTreeMap::new();
1215
1216 while let Some(id) = updates.keys().rev().next().cloned() {
1221 let mut update = updates.remove(&id).unwrap();
1222
1223 if id.is_user() {
1224 trace!(id = ?id, update = ?update, "update_read_capabilities");
1225 }
1226
1227 let collection = if let Some(c) = collections.get_mut(&id) {
1228 c
1229 } else {
1230 let has_positive_updates = update.iter().any(|(_ts, diff)| *diff > 0);
1231 if has_positive_updates {
1232 panic!(
1233 "reference to absent collection {id} but we have positive updates: {:?}",
1234 update
1235 );
1236 } else {
1237 continue;
1240 }
1241 };
1242
1243 let current_read_capabilities = collection.read_capabilities.frontier().to_owned();
1244 for (time, diff) in update.iter() {
1245 assert!(
1246 collection.read_capabilities.count_for(time) + diff >= 0,
1247 "update {:?} for collection {id} would lead to negative \
1248 read capabilities, read capabilities before applying: {:?}",
1249 update,
1250 collection.read_capabilities
1251 );
1252
1253 if collection.read_capabilities.count_for(time) + diff > 0 {
1254 assert!(
1255 current_read_capabilities.less_equal(time),
1256 "update {:?} for collection {id} is trying to \
1257 install read capabilities before the current \
1258 frontier of read capabilities, read capabilities before applying: {:?}",
1259 update,
1260 collection.read_capabilities
1261 );
1262 }
1263 }
1264
1265 let changes = collection.read_capabilities.update_iter(update.drain());
1266 update.extend(changes);
1267
1268 if id.is_user() {
1269 trace!(
1270 %id,
1271 ?collection.storage_dependencies,
1272 ?update,
1273 "forwarding update to storage dependencies");
1274 }
1275
1276 for id in collection.storage_dependencies.iter() {
1277 updates
1278 .entry(*id)
1279 .or_insert_with(ChangeBatch::new)
1280 .extend(update.iter().cloned());
1281 }
1282
1283 let (changes, frontier) = collections_net
1284 .entry(id)
1285 .or_insert_with(|| (<ChangeBatch<_>>::new(), Antichain::new()));
1286
1287 changes.extend(update.drain());
1288 *frontier = collection.read_capabilities.frontier().to_owned();
1289 }
1290
1291 let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
1294 for (key, (mut changes, frontier)) in collections_net {
1295 if !changes.is_empty() {
1296 let collection = collections.get(&key).expect("must still exist");
1298 let should_emit_persist_compaction = collection.primary.is_none();
1299
1300 if frontier.is_empty() {
1301 info!(id = %key, "removing collection state because the since advanced to []!");
1302 collections.remove(&key).expect("must still exist");
1303 }
1304
1305 if should_emit_persist_compaction {
1306 persist_compaction_commands.push((key, frontier));
1307 }
1308 }
1309 }
1310
1311 if !persist_compaction_commands.is_empty() {
1312 cmd_tx
1313 .send(BackgroundCmd::DowngradeSince(persist_compaction_commands))
1314 .expect("cannot fail to send");
1315 }
1316 }
1317
1318 fn synchronize_finalized_shards(&self, storage_metadata: &StorageMetadata) {
1320 self.finalized_shards
1321 .lock()
1322 .retain(|shard| storage_metadata.unfinalized_shards.contains(shard));
1323 }
1324}
1325
1326#[async_trait]
1328impl<T> StorageCollections for StorageCollectionsImpl<T>
1329where
1330 T: TimelyTimestamp
1331 + Lattice
1332 + Codec64
1333 + From<EpochMillis>
1334 + TimestampManipulation
1335 + Into<mz_repr::Timestamp>
1336 + Sync,
1337{
1338 type Timestamp = T;
1339
1340 async fn initialize_state(
1341 &self,
1342 txn: &mut (dyn StorageTxn<T> + Send),
1343 init_ids: BTreeSet<GlobalId>,
1344 ) -> Result<(), StorageError<T>> {
1345 let metadata = txn.get_collection_metadata();
1346 let existing_metadata: BTreeSet<_> = metadata.into_iter().map(|(id, _)| id).collect();
1347
1348 let new_collections: BTreeSet<GlobalId> =
1350 init_ids.difference(&existing_metadata).cloned().collect();
1351
1352 self.prepare_state(
1353 txn,
1354 new_collections,
1355 BTreeSet::default(),
1356 BTreeMap::default(),
1357 )
1358 .await?;
1359
1360 let unfinalized_shards = txn.get_unfinalized_shards().into_iter().collect_vec();
1368
1369 info!(?unfinalized_shards, "initializing finalizable_shards");
1370
1371 self.finalizable_shards.lock().extend(unfinalized_shards);
1372
1373 Ok(())
1374 }
1375
1376 fn update_parameters(&self, config_params: StorageParameters) {
1377 config_params.dyncfg_updates.apply(self.persist.cfg());
1380
1381 self.config
1382 .lock()
1383 .expect("lock poisoned")
1384 .update(config_params);
1385 }
1386
1387 fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing> {
1388 let collections = self.collections.lock().expect("lock poisoned");
1389
1390 collections
1391 .get(&id)
1392 .map(|c| c.collection_metadata.clone())
1393 .ok_or(CollectionMissing(id))
1394 }
1395
1396 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
1397 let collections = self.collections.lock().expect("lock poisoned");
1398
1399 collections
1400 .iter()
1401 .filter(|(_id, c)| !c.is_dropped())
1402 .map(|(id, c)| (*id, c.collection_metadata.clone()))
1403 .collect()
1404 }
1405
1406 fn collections_frontiers(
1407 &self,
1408 ids: Vec<GlobalId>,
1409 ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, CollectionMissing> {
1410 if ids.is_empty() {
1411 return Ok(vec![]);
1412 }
1413
1414 let collections = self.collections.lock().expect("lock poisoned");
1415
1416 let res = ids
1417 .into_iter()
1418 .map(|id| {
1419 collections
1420 .get(&id)
1421 .map(|c| CollectionFrontiers {
1422 id: id.clone(),
1423 write_frontier: c.write_frontier.clone(),
1424 implied_capability: c.implied_capability.clone(),
1425 read_capabilities: c.read_capabilities.frontier().to_owned(),
1426 })
1427 .ok_or(CollectionMissing(id))
1428 })
1429 .collect::<Result<Vec<_>, _>>()?;
1430
1431 Ok(res)
1432 }
1433
1434 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
1435 let collections = self.collections.lock().expect("lock poisoned");
1436
1437 let res = collections
1438 .iter()
1439 .filter(|(_id, c)| !c.is_dropped())
1440 .map(|(id, c)| CollectionFrontiers {
1441 id: id.clone(),
1442 write_frontier: c.write_frontier.clone(),
1443 implied_capability: c.implied_capability.clone(),
1444 read_capabilities: c.read_capabilities.frontier().to_owned(),
1445 })
1446 .collect_vec();
1447
1448 res
1449 }
1450
1451 async fn snapshot_stats(
1452 &self,
1453 id: GlobalId,
1454 as_of: Antichain<Self::Timestamp>,
1455 ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
1456 let metadata = self.collection_metadata(id)?;
1457
1458 let as_of = match metadata.txns_shard.as_ref() {
1461 None => SnapshotStatsAsOf::Direct(as_of),
1462 Some(txns_id) => {
1463 assert_eq!(txns_id, self.txns_read.txns_id());
1464 let as_of = as_of
1465 .into_option()
1466 .expect("cannot read as_of the empty antichain");
1467 self.txns_read.update_gt(as_of.clone()).await;
1468 let data_snapshot = self
1469 .txns_read
1470 .data_snapshot(metadata.data_shard, as_of.clone())
1471 .await;
1472 SnapshotStatsAsOf::Txns(data_snapshot)
1473 }
1474 };
1475 self.snapshot_stats_inner(id, as_of).await
1476 }
1477
1478 async fn snapshot_parts_stats(
1479 &self,
1480 id: GlobalId,
1481 as_of: Antichain<Self::Timestamp>,
1482 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
1483 let metadata = {
1484 let self_collections = self.collections.lock().expect("lock poisoned");
1485
1486 let collection_metadata = self_collections
1487 .get(&id)
1488 .ok_or(StorageError::IdentifierMissing(id))
1489 .map(|c| c.collection_metadata.clone());
1490
1491 match collection_metadata {
1492 Ok(m) => m,
1493 Err(e) => return Box::pin(async move { Err(e) }),
1494 }
1495 };
1496
1497 let persist = Arc::clone(&self.persist);
1500 let read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await;
1501
1502 let data_snapshot = match (metadata, as_of.as_option()) {
1503 (
1504 CollectionMetadata {
1505 txns_shard: Some(txns_id),
1506 data_shard,
1507 ..
1508 },
1509 Some(as_of),
1510 ) => {
1511 assert_eq!(txns_id, *self.txns_read.txns_id());
1512 self.txns_read.update_gt(as_of.clone()).await;
1513 let data_snapshot = self
1514 .txns_read
1515 .data_snapshot(data_shard, as_of.clone())
1516 .await;
1517 Some(data_snapshot)
1518 }
1519 _ => None,
1520 };
1521
1522 Box::pin(async move {
1523 let read_handle = read_handle?;
1524 let result = match data_snapshot {
1525 Some(data_snapshot) => data_snapshot.snapshot_parts_stats(&read_handle).await,
1526 None => read_handle.snapshot_parts_stats(as_of).await,
1527 };
1528 read_handle.expire().await;
1529 result.map_err(|_| StorageError::ReadBeforeSince(id))
1530 })
1531 }
1532
1533 fn snapshot(
1539 &self,
1540 id: GlobalId,
1541 as_of: Self::Timestamp,
1542 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>> {
1543 self.snapshot(id, as_of, &self.txns_read)
1544 }
1545
1546 async fn snapshot_latest(
1547 &self,
1548 id: GlobalId,
1549 ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
1550 let upper = self.recent_upper(id).await?;
1551 let res = match upper.as_option() {
1552 Some(f) if f > &T::minimum() => {
1553 let as_of = f.step_back().unwrap();
1554
1555 let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap();
1556 snapshot
1557 .into_iter()
1558 .map(|(row, diff)| {
1559 assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
1560 row
1561 })
1562 .collect()
1563 }
1564 Some(_min) => {
1565 Vec::new()
1567 }
1568 _ => {
1571 return Err(StorageError::InvalidUsage(
1572 "collection closed, cannot determine a read timestamp based on the upper"
1573 .to_string(),
1574 ));
1575 }
1576 };
1577
1578 Ok(res)
1579 }
1580
1581 fn snapshot_cursor(
1582 &self,
1583 id: GlobalId,
1584 as_of: Self::Timestamp,
1585 ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
1586 where
1587 Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
1588 {
1589 let metadata = match self.collection_metadata(id) {
1590 Ok(metadata) => metadata.clone(),
1591 Err(e) => return async { Err(e.into()) }.boxed(),
1592 };
1593 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1594 assert_eq!(txns_id, self.txns_read.txns_id());
1597 self.txns_read.clone()
1598 });
1599 let persist = Arc::clone(&self.persist);
1600
1601 async move {
1603 let mut handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1604 let cursor = match txns_read {
1605 None => {
1606 let cursor = handle
1607 .snapshot_cursor(Antichain::from_elem(as_of), |_| true)
1608 .await
1609 .map_err(|_| StorageError::ReadBeforeSince(id))?;
1610 SnapshotCursor {
1611 _read_handle: handle,
1612 cursor,
1613 }
1614 }
1615 Some(txns_read) => {
1616 txns_read.update_gt(as_of.clone()).await;
1617 let data_snapshot = txns_read
1618 .data_snapshot(metadata.data_shard, as_of.clone())
1619 .await;
1620 let cursor = data_snapshot
1621 .snapshot_cursor(&mut handle, |_| true)
1622 .await
1623 .map_err(|_| StorageError::ReadBeforeSince(id))?;
1624 SnapshotCursor {
1625 _read_handle: handle,
1626 cursor,
1627 }
1628 }
1629 };
1630
1631 Ok(cursor)
1632 }
1633 .boxed()
1634 }
1635
1636 fn snapshot_and_stream(
1637 &self,
1638 id: GlobalId,
1639 as_of: Self::Timestamp,
1640 ) -> BoxFuture<
1641 'static,
1642 Result<
1643 BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
1644 StorageError<Self::Timestamp>,
1645 >,
1646 >
1647 where
1648 Self::Timestamp: TimelyTimestamp + Lattice + Codec64 + 'static,
1649 {
1650 self.snapshot_and_stream(id, as_of, &self.txns_read)
1651 }
1652
1653 fn create_update_builder(
1654 &self,
1655 id: GlobalId,
1656 ) -> BoxFuture<
1657 'static,
1658 Result<
1659 TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
1660 StorageError<Self::Timestamp>,
1661 >,
1662 > {
1663 let metadata = match self.collection_metadata(id) {
1664 Ok(m) => m,
1665 Err(e) => return Box::pin(async move { Err(e.into()) }),
1666 };
1667 let persist = Arc::clone(&self.persist);
1668
1669 async move {
1670 let persist_client = persist
1671 .open(metadata.persist_location.clone())
1672 .await
1673 .expect("invalid persist usage");
1674 let write_handle = persist_client
1675 .open_writer::<SourceData, (), Self::Timestamp, StorageDiff>(
1676 metadata.data_shard,
1677 Arc::new(metadata.relation_desc.clone()),
1678 Arc::new(UnitSchema),
1679 Diagnostics {
1680 shard_name: id.to_string(),
1681 handle_purpose: format!("create write batch {}", id),
1682 },
1683 )
1684 .await
1685 .expect("invalid persist usage");
1686 let builder = TimestamplessUpdateBuilder::new(&write_handle);
1687
1688 Ok(builder)
1689 }
1690 .boxed()
1691 }
1692
1693 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
1694 let collections = self.collections.lock().expect("lock poisoned");
1695
1696 if collections.contains_key(&id) {
1697 Ok(())
1698 } else {
1699 Err(StorageError::IdentifierMissing(id))
1700 }
1701 }
1702
1703 async fn prepare_state(
1704 &self,
1705 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
1706 ids_to_add: BTreeSet<GlobalId>,
1707 ids_to_drop: BTreeSet<GlobalId>,
1708 ids_to_register: BTreeMap<GlobalId, ShardId>,
1709 ) -> Result<(), StorageError<T>> {
1710 txn.insert_collection_metadata(
1711 ids_to_add
1712 .into_iter()
1713 .map(|id| (id, ShardId::new()))
1714 .collect(),
1715 )?;
1716 txn.insert_collection_metadata(ids_to_register)?;
1717
1718 let dropped_mappings = txn.delete_collection_metadata(ids_to_drop);
1720
1721 let dropped_shards = dropped_mappings
1722 .into_iter()
1723 .map(|(_id, shard)| shard)
1724 .collect();
1725
1726 txn.insert_unfinalized_shards(dropped_shards)?;
1727
1728 let finalized_shards = self.finalized_shards.lock().iter().copied().collect();
1731 txn.mark_shards_as_finalized(finalized_shards);
1732
1733 Ok(())
1734 }
1735
1736 #[instrument(level = "debug")]
1739 async fn create_collections_for_bootstrap(
1740 &self,
1741 storage_metadata: &StorageMetadata,
1742 register_ts: Option<Self::Timestamp>,
1743 mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
1744 migrated_storage_collections: &BTreeSet<GlobalId>,
1745 ) -> Result<(), StorageError<Self::Timestamp>> {
1746 let is_in_txns = |id, metadata: &CollectionMetadata| {
1747 metadata.txns_shard.is_some()
1748 && !(self.read_only && migrated_storage_collections.contains(&id))
1749 };
1750
1751 collections.sort_by_key(|(id, _)| *id);
1756 collections.dedup();
1757 for pos in 1..collections.len() {
1758 if collections[pos - 1].0 == collections[pos].0 {
1759 return Err(StorageError::CollectionIdReused(collections[pos].0));
1760 }
1761 }
1762
1763 let enriched_with_metadata = collections
1766 .into_iter()
1767 .map(|(id, description)| {
1768 let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
1769
1770 let txns_shard = description
1774 .data_source
1775 .in_txns()
1776 .then(|| *self.txns_read.txns_id());
1777
1778 let metadata = CollectionMetadata {
1779 persist_location: self.persist_location.clone(),
1780 data_shard,
1781 relation_desc: description.desc.clone(),
1782 txns_shard,
1783 };
1784
1785 Ok((id, description, metadata))
1786 })
1787 .collect_vec();
1788
1789 let persist_client = self
1791 .persist
1792 .open(self.persist_location.clone())
1793 .await
1794 .unwrap();
1795 let persist_client = &persist_client;
1796 use futures::stream::{StreamExt, TryStreamExt};
1799 let this = &*self;
1800 let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
1801 .map(|data: Result<_, StorageError<Self::Timestamp>>| {
1802 let register_ts = register_ts.clone();
1803 async move {
1804 let (id, description, metadata) = data?;
1805
1806 debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
1811
1812 let since = if description.primary.is_some() {
1816 None
1817 } else {
1818 description.since.as_ref()
1819 };
1820
1821 let (write, mut since_handle) = this
1822 .open_data_handles(
1823 &id,
1824 metadata.data_shard,
1825 since,
1826 metadata.relation_desc.clone(),
1827 persist_client,
1828 )
1829 .await;
1830
1831 match description.data_source {
1840 DataSource::Introspection(_)
1841 | DataSource::IngestionExport { .. }
1842 | DataSource::Webhook
1843 | DataSource::Ingestion(_)
1844 | DataSource::Progress
1845 | DataSource::Other => {}
1846 DataSource::Sink { .. } => {}
1847 DataSource::Table => {
1848 let register_ts = register_ts.expect(
1849 "caller should have provided a register_ts when creating a table",
1850 );
1851 if since_handle.since().elements() == &[T::minimum()]
1852 && !migrated_storage_collections.contains(&id)
1853 {
1854 debug!("advancing {} to initial since of {:?}", id, register_ts);
1855 let token = since_handle.opaque();
1856 let _ = since_handle
1857 .compare_and_downgrade_since(
1858 &token,
1859 (&token, &Antichain::from_elem(register_ts.clone())),
1860 )
1861 .await;
1862 }
1863 }
1864 }
1865
1866 Ok::<_, StorageError<Self::Timestamp>>((
1867 id,
1868 description,
1869 write,
1870 since_handle,
1871 metadata,
1872 ))
1873 }
1874 })
1875 .buffer_unordered(50)
1877 .try_collect()
1891 .await?;
1892
1893 #[derive(Ord, PartialOrd, Eq, PartialEq)]
1895 enum DependencyOrder {
1896 Table(Reverse<GlobalId>),
1898 Collection(GlobalId),
1900 Sink(GlobalId),
1902 }
1903 to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
1904 DataSource::Table => DependencyOrder::Table(Reverse(*id)),
1905 DataSource::Sink { .. } => DependencyOrder::Sink(*id),
1906 _ => DependencyOrder::Collection(*id),
1907 });
1908
1909 let mut self_collections = self.collections.lock().expect("lock poisoned");
1912
1913 for (id, description, write_handle, since_handle, metadata) in to_register {
1914 let write_frontier = write_handle.upper();
1915 let data_shard_since = since_handle.since().clone();
1916
1917 let storage_dependencies =
1919 Self::determine_collection_dependencies(&*self_collections, id, &description)?;
1920
1921 let initial_since = match storage_dependencies
1923 .iter()
1924 .at_most_one()
1925 .expect("should have at most one dependency")
1926 {
1927 Some(dep) => {
1928 let dependency_collection = self_collections
1929 .get(dep)
1930 .ok_or(StorageError::IdentifierMissing(*dep))?;
1931 let dependency_since = dependency_collection.implied_capability.clone();
1932
1933 if PartialOrder::less_than(&data_shard_since, &dependency_since) {
1944 mz_ore::soft_assert_or_log!(
1963 write_frontier.elements() == &[T::minimum()]
1964 || write_frontier.is_empty()
1965 || PartialOrder::less_than(&dependency_since, write_frontier),
1966 "dependency ({dep}) since has advanced past dependent ({id}) upper \n
1967 dependent ({id}): since {:?}, upper {:?} \n
1968 dependency ({dep}): since {:?}",
1969 data_shard_since,
1970 write_frontier,
1971 dependency_since
1972 );
1973
1974 dependency_since
1975 } else {
1976 data_shard_since
1977 }
1978 }
1979 None => data_shard_since,
1980 };
1981
1982 let time_dependence = {
1984 use DataSource::*;
1985 if let Some(timeline) = &description.timeline
1986 && *timeline != Timeline::EpochMilliseconds
1987 {
1988 None
1990 } else {
1991 match &description.data_source {
1992 Ingestion(ingestion) => {
1993 use GenericSourceConnection::*;
1994 match ingestion.desc.connection {
1995 Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => {
1998 Some(TimeDependence::default())
1999 }
2000 LoadGenerator(_) => None,
2002 }
2003 }
2004 IngestionExport { ingestion_id, .. } => {
2005 let c = self_collections.get(ingestion_id).expect("known to exist");
2006 c.time_dependence.clone()
2007 }
2008 Introspection(_) | Progress | Table { .. } | Webhook { .. } => {
2010 Some(TimeDependence::default())
2011 }
2012 Other => None,
2014 Sink { .. } => None,
2015 }
2016 }
2017 };
2018
2019 let ingestion_remap_collection_id = match &description.data_source {
2020 DataSource::Ingestion(desc) => Some(desc.remap_collection_id),
2021 _ => None,
2022 };
2023
2024 let mut collection_state = CollectionState::new(
2025 description.primary,
2026 time_dependence,
2027 ingestion_remap_collection_id,
2028 initial_since,
2029 write_frontier.clone(),
2030 storage_dependencies,
2031 metadata.clone(),
2032 );
2033
2034 match &description.data_source {
2036 DataSource::Introspection(_) => {
2037 self_collections.insert(id, collection_state);
2038 }
2039 DataSource::Webhook => {
2040 self_collections.insert(id, collection_state);
2041 }
2042 DataSource::IngestionExport { .. } => {
2043 self_collections.insert(id, collection_state);
2044 }
2045 DataSource::Table => {
2046 if is_in_txns(id, &metadata)
2049 && PartialOrder::less_than(
2050 &collection_state.write_frontier,
2051 &self.initial_txn_upper,
2052 )
2053 {
2054 collection_state
2060 .write_frontier
2061 .clone_from(&self.initial_txn_upper);
2062 }
2063 self_collections.insert(id, collection_state);
2064 }
2065 DataSource::Progress | DataSource::Other => {
2066 self_collections.insert(id, collection_state);
2067 }
2068 DataSource::Ingestion(_) => {
2069 self_collections.insert(id, collection_state);
2070 }
2071 DataSource::Sink { .. } => {
2072 self_collections.insert(id, collection_state);
2073 }
2074 }
2075
2076 self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle);
2077
2078 self.install_collection_dependency_read_holds_inner(&mut *self_collections, id)?;
2080 }
2081
2082 drop(self_collections);
2083
2084 self.synchronize_finalized_shards(storage_metadata);
2085
2086 Ok(())
2087 }
2088
2089 async fn alter_table_desc(
2090 &self,
2091 existing_collection: GlobalId,
2092 new_collection: GlobalId,
2093 new_desc: RelationDesc,
2094 expected_version: RelationVersion,
2095 ) -> Result<(), StorageError<Self::Timestamp>> {
2096 let data_shard = {
2097 let self_collections = self.collections.lock().expect("lock poisoned");
2098 let existing = self_collections
2099 .get(&existing_collection)
2100 .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;
2101
2102 existing.collection_metadata.data_shard
2103 };
2104
2105 let persist_client = self
2106 .persist
2107 .open(self.persist_location.clone())
2108 .await
2109 .unwrap();
2110
2111 let diagnostics = Diagnostics {
2113 shard_name: existing_collection.to_string(),
2114 handle_purpose: "alter_table_desc".to_string(),
2115 };
2116 let expected_schema = expected_version.into();
2118 let schema_result = persist_client
2119 .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
2120 data_shard,
2121 expected_schema,
2122 &new_desc,
2123 &UnitSchema,
2124 diagnostics,
2125 )
2126 .await
2127 .map_err(|e| StorageError::InvalidUsage(e.to_string()))?;
2128 tracing::info!(
2129 ?existing_collection,
2130 ?new_collection,
2131 ?new_desc,
2132 "evolved schema"
2133 );
2134
2135 match schema_result {
2136 CaESchema::Ok(id) => id,
2137 CaESchema::ExpectedMismatch {
2139 schema_id,
2140 key,
2141 val,
2142 } => {
2143 mz_ore::soft_panic_or_log!(
2144 "schema expectation mismatch {schema_id:?}, {key:?}, {val:?}"
2145 );
2146 return Err(StorageError::Generic(anyhow::anyhow!(
2147 "schema expected mismatch, {existing_collection:?}",
2148 )));
2149 }
2150 CaESchema::Incompatible => {
2151 mz_ore::soft_panic_or_log!(
2152 "incompatible schema! {existing_collection} {new_desc:?}"
2153 );
2154 return Err(StorageError::Generic(anyhow::anyhow!(
2155 "schema incompatible, {existing_collection:?}"
2156 )));
2157 }
2158 };
2159
2160 let (write_handle, since_handle) = self
2162 .open_data_handles(
2163 &new_collection,
2164 data_shard,
2165 None,
2166 new_desc.clone(),
2167 &persist_client,
2168 )
2169 .await;
2170
2171 {
2177 let mut self_collections = self.collections.lock().expect("lock poisoned");
2178
2179 let existing = self_collections
2181 .get_mut(&existing_collection)
2182 .expect("existing collection missing");
2183
2184 assert_none!(existing.primary);
2186
2187 existing.primary = Some(new_collection);
2189 existing.storage_dependencies.push(new_collection);
2190
2191 let implied_capability = existing.read_capabilities.frontier().to_owned();
2195 let write_frontier = existing.write_frontier.clone();
2196
2197 let mut changes = ChangeBatch::new();
2204 changes.extend(implied_capability.iter().map(|t| (t.clone(), 1)));
2205
2206 let collection_meta = CollectionMetadata {
2208 persist_location: self.persist_location.clone(),
2209 relation_desc: new_desc.clone(),
2210 data_shard,
2211 txns_shard: Some(self.txns_read.txns_id().clone()),
2212 };
2213 let collection_state = CollectionState::new(
2214 None,
2215 existing.time_dependence.clone(),
2216 existing.ingestion_remap_collection_id.clone(),
2217 implied_capability,
2218 write_frontier,
2219 Vec::new(),
2220 collection_meta,
2221 );
2222
2223 self_collections.insert(new_collection, collection_state);
2225
2226 let mut updates = BTreeMap::from([(new_collection, changes)]);
2227 StorageCollectionsImpl::update_read_capabilities_inner(
2228 &self.cmd_tx,
2229 &mut *self_collections,
2230 &mut updates,
2231 );
2232 };
2233
2234 self.register_handles(new_collection, true, since_handle, write_handle);
2236
2237 info!(%existing_collection, %new_collection, ?new_desc, "altered table");
2238
2239 Ok(())
2240 }
2241
2242 fn drop_collections_unvalidated(
2243 &self,
2244 storage_metadata: &StorageMetadata,
2245 identifiers: Vec<GlobalId>,
2246 ) {
2247 debug!(?identifiers, "drop_collections_unvalidated");
2248
2249 let mut self_collections = self.collections.lock().expect("lock poisoned");
2250
2251 for id in identifiers.iter() {
2252 let metadata = storage_metadata.get_collection_shard::<T>(*id);
2253 mz_ore::soft_assert_or_log!(
2254 matches!(metadata, Err(StorageError::IdentifierMissing(_))),
2255 "dropping {id}, but drop was not synchronized with storage \
2256 controller via `synchronize_collections`"
2257 );
2258 }
2259
2260 let mut finalized_policies = Vec::new();
2268
2269 for id in identifiers {
2270 if self_collections.contains_key(&id) {
2272 finalized_policies.push((id, ReadPolicy::ValidFrom(Antichain::new())));
2273 }
2274 }
2275 self.set_read_policies_inner(&mut self_collections, finalized_policies);
2276
2277 drop(self_collections);
2278
2279 self.synchronize_finalized_shards(storage_metadata);
2280 }
2281
2282 fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
2283 let mut collections = self.collections.lock().expect("lock poisoned");
2284
2285 if tracing::enabled!(tracing::Level::TRACE) {
2286 let user_capabilities = collections
2287 .iter_mut()
2288 .filter(|(id, _c)| id.is_user())
2289 .map(|(id, c)| {
2290 let updates = c.read_capabilities.updates().cloned().collect_vec();
2291 (*id, c.implied_capability.clone(), updates)
2292 })
2293 .collect_vec();
2294
2295 trace!(?policies, ?user_capabilities, "set_read_policies");
2296 }
2297
2298 self.set_read_policies_inner(&mut collections, policies);
2299
2300 if tracing::enabled!(tracing::Level::TRACE) {
2301 let user_capabilities = collections
2302 .iter_mut()
2303 .filter(|(id, _c)| id.is_user())
2304 .map(|(id, c)| {
2305 let updates = c.read_capabilities.updates().cloned().collect_vec();
2306 (*id, c.implied_capability.clone(), updates)
2307 })
2308 .collect_vec();
2309
2310 trace!(?user_capabilities, "after! set_read_policies");
2311 }
2312 }
2313
2314 fn acquire_read_holds(
2315 &self,
2316 desired_holds: Vec<GlobalId>,
2317 ) -> Result<Vec<ReadHold<Self::Timestamp>>, CollectionMissing> {
2318 if desired_holds.is_empty() {
2319 return Ok(vec![]);
2320 }
2321
2322 let mut collections = self.collections.lock().expect("lock poisoned");
2323
2324 let mut advanced_holds = Vec::new();
2325 for id in desired_holds.iter() {
2336 let collection = collections.get(id).ok_or(CollectionMissing(*id))?;
2337 let since = collection.read_capabilities.frontier().to_owned();
2338 advanced_holds.push((*id, since));
2339 }
2340
2341 let mut updates = advanced_holds
2342 .iter()
2343 .map(|(id, hold)| {
2344 let mut changes = ChangeBatch::new();
2345 changes.extend(hold.iter().map(|time| (time.clone(), 1)));
2346 (*id, changes)
2347 })
2348 .collect::<BTreeMap<_, _>>();
2349
2350 StorageCollectionsImpl::update_read_capabilities_inner(
2351 &self.cmd_tx,
2352 &mut collections,
2353 &mut updates,
2354 );
2355
2356 let acquired_holds = advanced_holds
2357 .into_iter()
2358 .map(|(id, since)| ReadHold::with_channel(id, since, self.holds_tx.clone()))
2359 .collect_vec();
2360
2361 trace!(?desired_holds, ?acquired_holds, "acquire_read_holds");
2362
2363 Ok(acquired_holds)
2364 }
2365
2366 fn determine_time_dependence(
2368 &self,
2369 id: GlobalId,
2370 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
2371 use TimeDependenceError::CollectionMissing;
2372 let collections = self.collections.lock().expect("lock poisoned");
2373 let state = collections.get(&id).ok_or(CollectionMissing(id))?;
2374 Ok(state.time_dependence.clone())
2375 }
2376
2377 fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2378 let Self {
2380 envd_epoch,
2381 read_only,
2382 finalizable_shards,
2383 finalized_shards,
2384 collections,
2385 txns_read: _,
2386 config,
2387 initial_txn_upper,
2388 persist_location,
2389 persist: _,
2390 cmd_tx: _,
2391 holds_tx: _,
2392 _background_task: _,
2393 _finalize_shards_task: _,
2394 } = self;
2395
2396 let finalizable_shards: Vec<_> = finalizable_shards
2397 .lock()
2398 .iter()
2399 .map(ToString::to_string)
2400 .collect();
2401 let finalized_shards: Vec<_> = finalized_shards
2402 .lock()
2403 .iter()
2404 .map(ToString::to_string)
2405 .collect();
2406 let collections: BTreeMap<_, _> = collections
2407 .lock()
2408 .expect("poisoned")
2409 .iter()
2410 .map(|(id, c)| (id.to_string(), format!("{c:?}")))
2411 .collect();
2412 let config = format!("{:?}", config.lock().expect("poisoned"));
2413
2414 Ok(serde_json::json!({
2415 "envd_epoch": envd_epoch,
2416 "read_only": read_only,
2417 "finalizable_shards": finalizable_shards,
2418 "finalized_shards": finalized_shards,
2419 "collections": collections,
2420 "config": config,
2421 "initial_txn_upper": initial_txn_upper,
2422 "persist_location": format!("{persist_location:?}"),
2423 }))
2424 }
2425}
2426
2427#[derive(Debug)]
2434enum SinceHandleWrapper<T>
2435where
2436 T: TimelyTimestamp + Lattice + Codec64,
2437{
2438 Critical(SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch>),
2439 Leased(ReadHandle<SourceData, (), T, StorageDiff>),
2440}
2441
2442impl<T> SinceHandleWrapper<T>
2443where
2444 T: TimelyTimestamp + Lattice + Codec64 + TotalOrder + Sync,
2445{
2446 pub fn since(&self) -> &Antichain<T> {
2447 match self {
2448 Self::Critical(handle) => handle.since(),
2449 Self::Leased(handle) => handle.since(),
2450 }
2451 }
2452
2453 pub fn opaque(&self) -> PersistEpoch {
2454 match self {
2455 Self::Critical(handle) => handle.opaque().clone(),
2456 Self::Leased(_handle) => {
2457 PersistEpoch(None)
2462 }
2463 }
2464 }
2465
2466 pub async fn compare_and_downgrade_since(
2467 &mut self,
2468 expected: &PersistEpoch,
2469 new: (&PersistEpoch, &Antichain<T>),
2470 ) -> Result<Antichain<T>, PersistEpoch> {
2471 match self {
2472 Self::Critical(handle) => handle.compare_and_downgrade_since(expected, new).await,
2473 Self::Leased(handle) => {
2474 let (opaque, since) = new;
2475 assert_none!(opaque.0);
2476
2477 handle.downgrade_since(since).await;
2478
2479 Ok(since.clone())
2480 }
2481 }
2482 }
2483
2484 pub async fn maybe_compare_and_downgrade_since(
2485 &mut self,
2486 expected: &PersistEpoch,
2487 new: (&PersistEpoch, &Antichain<T>),
2488 ) -> Option<Result<Antichain<T>, PersistEpoch>> {
2489 match self {
2490 Self::Critical(handle) => {
2491 handle
2492 .maybe_compare_and_downgrade_since(expected, new)
2493 .await
2494 }
2495 Self::Leased(handle) => {
2496 let (opaque, since) = new;
2497 assert_none!(opaque.0);
2498
2499 handle.maybe_downgrade_since(since).await;
2500
2501 Some(Ok(since.clone()))
2502 }
2503 }
2504 }
2505
2506 pub fn snapshot_stats(
2507 &self,
2508 id: GlobalId,
2509 as_of: Option<Antichain<T>>,
2510 ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2511 match self {
2512 Self::Critical(handle) => {
2513 let res = handle
2514 .snapshot_stats(as_of)
2515 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2516 Box::pin(res)
2517 }
2518 Self::Leased(handle) => {
2519 let res = handle
2520 .snapshot_stats(as_of)
2521 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2522 Box::pin(res)
2523 }
2524 }
2525 }
2526
2527 pub fn snapshot_stats_from_txn(
2528 &self,
2529 id: GlobalId,
2530 data_snapshot: DataSnapshot<T>,
2531 ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2532 match self {
2533 Self::Critical(handle) => Box::pin(
2534 data_snapshot
2535 .snapshot_stats_from_critical(handle)
2536 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2537 ),
2538 Self::Leased(handle) => Box::pin(
2539 data_snapshot
2540 .snapshot_stats_from_leased(handle)
2541 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2542 ),
2543 }
2544 }
2545}
2546
2547#[derive(Debug, Clone)]
2549struct CollectionState<T> {
2550 primary: Option<GlobalId>,
2557
2558 time_dependence: Option<TimeDependence>,
2560 ingestion_remap_collection_id: Option<GlobalId>,
2562
2563 pub read_capabilities: MutableAntichain<T>,
2569
2570 pub implied_capability: Antichain<T>,
2574
2575 pub read_policy: ReadPolicy<T>,
2577
2578 pub storage_dependencies: Vec<GlobalId>,
2580
2581 pub write_frontier: Antichain<T>,
2583
2584 pub collection_metadata: CollectionMetadata,
2585}
2586
2587impl<T: TimelyTimestamp> CollectionState<T> {
2588 pub fn new(
2591 primary: Option<GlobalId>,
2592 time_dependence: Option<TimeDependence>,
2593 ingestion_remap_collection_id: Option<GlobalId>,
2594 since: Antichain<T>,
2595 write_frontier: Antichain<T>,
2596 storage_dependencies: Vec<GlobalId>,
2597 metadata: CollectionMetadata,
2598 ) -> Self {
2599 let mut read_capabilities = MutableAntichain::new();
2600 read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2601 Self {
2602 primary,
2603 time_dependence,
2604 ingestion_remap_collection_id,
2605 read_capabilities,
2606 implied_capability: since.clone(),
2607 read_policy: ReadPolicy::NoPolicy {
2608 initial_since: since,
2609 },
2610 storage_dependencies,
2611 write_frontier,
2612 collection_metadata: metadata,
2613 }
2614 }
2615
2616 pub fn is_dropped(&self) -> bool {
2618 self.read_capabilities.is_empty()
2619 }
2620}
2621
2622#[derive(Debug)]
2628struct BackgroundTask<T: TimelyTimestamp + Lattice + Codec64> {
2629 config: Arc<Mutex<StorageConfiguration>>,
2630 cmds_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
2631 cmds_rx: mpsc::UnboundedReceiver<BackgroundCmd<T>>,
2632 holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch<T>)>,
2633 finalizable_shards: Arc<ShardIdSet>,
2634 collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
2635 shard_by_id: BTreeMap<GlobalId, ShardId>,
2638 since_handles: BTreeMap<GlobalId, SinceHandleWrapper<T>>,
2639 txns_handle: Option<WriteHandle<SourceData, (), T, StorageDiff>>,
2640 txns_shards: BTreeSet<GlobalId>,
2641}
2642
2643#[derive(Debug)]
2644enum BackgroundCmd<T: TimelyTimestamp + Lattice + Codec64> {
2645 Register {
2646 id: GlobalId,
2647 is_in_txns: bool,
2648 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
2649 since_handle: SinceHandleWrapper<T>,
2650 },
2651 DowngradeSince(Vec<(GlobalId, Antichain<T>)>),
2652 SnapshotStats(
2653 GlobalId,
2654 SnapshotStatsAsOf<T>,
2655 oneshot::Sender<SnapshotStatsRes<T>>,
2656 ),
2657}
2658
2659pub(crate) struct SnapshotStatsRes<T>(BoxFuture<'static, Result<SnapshotStats, StorageError<T>>>);
2661
2662impl<T> Debug for SnapshotStatsRes<T> {
2663 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2664 f.debug_struct("SnapshotStatsRes").finish_non_exhaustive()
2665 }
2666}
2667
2668impl<T> BackgroundTask<T>
2669where
2670 T: TimelyTimestamp
2671 + Lattice
2672 + Codec64
2673 + From<EpochMillis>
2674 + TimestampManipulation
2675 + Into<mz_repr::Timestamp>
2676 + Sync,
2677{
2678 async fn run(&mut self) {
2679 let mut upper_futures: FuturesUnordered<
2681 std::pin::Pin<
2682 Box<
2683 dyn Future<
2684 Output = (
2685 GlobalId,
2686 WriteHandle<SourceData, (), T, StorageDiff>,
2687 Antichain<T>,
2688 ),
2689 > + Send,
2690 >,
2691 >,
2692 > = FuturesUnordered::new();
2693
2694 let gen_upper_future =
2695 |id, mut handle: WriteHandle<_, _, _, _>, prev_upper: Antichain<T>| {
2696 let fut = async move {
2697 soft_assert_or_log!(
2698 !prev_upper.is_empty(),
2699 "cannot await progress when upper is already empty"
2700 );
2701 handle.wait_for_upper_past(&prev_upper).await;
2702 let new_upper = handle.shared_upper();
2703 (id, handle, new_upper)
2704 };
2705
2706 fut
2707 };
2708
2709 let mut txns_upper_future = match self.txns_handle.take() {
2710 Some(txns_handle) => {
2711 let upper = txns_handle.upper().clone();
2712 let txns_upper_future =
2713 gen_upper_future(GlobalId::Transient(1), txns_handle, upper);
2714 txns_upper_future.boxed()
2715 }
2716 None => async { std::future::pending().await }.boxed(),
2717 };
2718
2719 loop {
2720 tokio::select! {
2721 (id, handle, upper) = &mut txns_upper_future => {
2722 trace!("new upper from txns shard: {:?}", upper);
2723 let mut uppers = Vec::new();
2724 for id in self.txns_shards.iter() {
2725 uppers.push((*id, &upper));
2726 }
2727 self.update_write_frontiers(&uppers).await;
2728
2729 let fut = gen_upper_future(id, handle, upper);
2730 txns_upper_future = fut.boxed();
2731 }
2732 Some((id, handle, upper)) = upper_futures.next() => {
2733 if id.is_user() {
2734 trace!("new upper for collection {id}: {:?}", upper);
2735 }
2736 let current_shard = self.shard_by_id.get(&id);
2737 if let Some(shard_id) = current_shard {
2738 if shard_id == &handle.shard_id() {
2739 let uppers = &[(id, &upper)];
2742 self.update_write_frontiers(uppers).await;
2743 if !upper.is_empty() {
2744 let fut = gen_upper_future(id, handle, upper);
2745 upper_futures.push(fut.boxed());
2746 }
2747 } else {
2748 handle.expire().await;
2752 }
2753 }
2754 }
2755 cmd = self.cmds_rx.recv() => {
2756 let cmd = if let Some(cmd) = cmd {
2757 cmd
2758 } else {
2759 break;
2761 };
2762
2763 match cmd {
2764 BackgroundCmd::Register{ id, is_in_txns, write_handle, since_handle } => {
2765 debug!("registering handles for {}", id);
2766 let previous = self.shard_by_id.insert(id, write_handle.shard_id());
2767 if previous.is_some() {
2768 panic!("already registered a WriteHandle for collection {id}");
2769 }
2770
2771 let previous = self.since_handles.insert(id, since_handle);
2772 if previous.is_some() {
2773 panic!("already registered a SinceHandle for collection {id}");
2774 }
2775
2776 if is_in_txns {
2777 self.txns_shards.insert(id);
2778 } else {
2779 let upper = write_handle.upper().clone();
2780 if !upper.is_empty() {
2781 let fut = gen_upper_future(id, write_handle, upper);
2782 upper_futures.push(fut.boxed());
2783 }
2784 }
2785
2786 }
2787 BackgroundCmd::DowngradeSince(cmds) => {
2788 self.downgrade_sinces(cmds).await;
2789 }
2790 BackgroundCmd::SnapshotStats(id, as_of, tx) => {
2791 let res = match self.since_handles.get(&id) {
2797 Some(x) => {
2798 let fut: BoxFuture<
2799 'static,
2800 Result<SnapshotStats, StorageError<T>>,
2801 > = match as_of {
2802 SnapshotStatsAsOf::Direct(as_of) => {
2803 x.snapshot_stats(id, Some(as_of))
2804 }
2805 SnapshotStatsAsOf::Txns(data_snapshot) => {
2806 x.snapshot_stats_from_txn(id, data_snapshot)
2807 }
2808 };
2809 SnapshotStatsRes(fut)
2810 }
2811 None => SnapshotStatsRes(Box::pin(futures::future::ready(Err(
2812 StorageError::IdentifierMissing(id),
2813 )))),
2814 };
2815 let _ = tx.send(res);
2817 }
2818 }
2819 }
2820 Some(holds_changes) = self.holds_rx.recv() => {
2821 let mut batched_changes = BTreeMap::new();
2822 batched_changes.insert(holds_changes.0, holds_changes.1);
2823
2824 while let Ok(mut holds_changes) = self.holds_rx.try_recv() {
2825 let entry = batched_changes.entry(holds_changes.0);
2826 entry
2827 .and_modify(|existing| existing.extend(holds_changes.1.drain()))
2828 .or_insert_with(|| holds_changes.1);
2829 }
2830
2831 let mut collections = self.collections.lock().expect("lock poisoned");
2832
2833 let user_changes = batched_changes
2834 .iter()
2835 .filter(|(id, _c)| id.is_user())
2836 .map(|(id, c)| {
2837 (id.clone(), c.clone())
2838 })
2839 .collect_vec();
2840
2841 if !user_changes.is_empty() {
2842 trace!(?user_changes, "applying holds changes from channel");
2843 }
2844
2845 StorageCollectionsImpl::update_read_capabilities_inner(
2846 &self.cmds_tx,
2847 &mut collections,
2848 &mut batched_changes,
2849 );
2850 }
2851 }
2852 }
2853
2854 warn!("BackgroundTask shutting down");
2855 }
2856
2857 #[instrument(level = "debug")]
2858 async fn update_write_frontiers(&self, updates: &[(GlobalId, &Antichain<T>)]) {
2859 let mut read_capability_changes = BTreeMap::default();
2860
2861 let mut self_collections = self.collections.lock().expect("lock poisoned");
2862
2863 for (id, new_upper) in updates.iter() {
2864 let collection = if let Some(c) = self_collections.get_mut(id) {
2865 c
2866 } else {
2867 trace!(
2868 "Reference to absent collection {id}, due to concurrent removal of that collection"
2869 );
2870 continue;
2871 };
2872
2873 if PartialOrder::less_than(&collection.write_frontier, *new_upper) {
2874 collection.write_frontier.clone_from(new_upper);
2875 }
2876
2877 let mut new_read_capability = collection
2878 .read_policy
2879 .frontier(collection.write_frontier.borrow());
2880
2881 if id.is_user() {
2882 trace!(
2883 %id,
2884 implied_capability = ?collection.implied_capability,
2885 policy = ?collection.read_policy,
2886 write_frontier = ?collection.write_frontier,
2887 ?new_read_capability,
2888 "update_write_frontiers");
2889 }
2890
2891 if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
2892 let mut update = ChangeBatch::new();
2893 update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
2894 std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
2895 update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
2896
2897 if !update.is_empty() {
2898 read_capability_changes.insert(*id, update);
2899 }
2900 }
2901 }
2902
2903 if !read_capability_changes.is_empty() {
2904 StorageCollectionsImpl::update_read_capabilities_inner(
2905 &self.cmds_tx,
2906 &mut self_collections,
2907 &mut read_capability_changes,
2908 );
2909 }
2910 }
2911
2912 async fn downgrade_sinces(&mut self, cmds: Vec<(GlobalId, Antichain<T>)>) {
2913 for (id, new_since) in cmds {
2914 let since_handle = if let Some(c) = self.since_handles.get_mut(&id) {
2915 c
2916 } else {
2917 trace!("downgrade_sinces: reference to absent collection {id}");
2919 continue;
2920 };
2921
2922 if id.is_user() {
2923 trace!("downgrading since of {} to {:?}", id, new_since);
2924 }
2925
2926 let epoch = since_handle.opaque().clone();
2927 let result = if new_since.is_empty() {
2928 let res = Some(
2932 since_handle
2933 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2934 .await,
2935 );
2936
2937 info!(%id, "removing persist handles because the since advanced to []!");
2938
2939 let _since_handle = self.since_handles.remove(&id).expect("known to exist");
2940 let dropped_shard_id = if let Some(shard_id) = self.shard_by_id.remove(&id) {
2941 shard_id
2942 } else {
2943 panic!("missing GlobalId -> ShardId mapping for id {id}");
2944 };
2945
2946 self.txns_shards.remove(&id);
2951
2952 if !self
2953 .config
2954 .lock()
2955 .expect("lock poisoned")
2956 .parameters
2957 .finalize_shards
2958 {
2959 info!(
2960 "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
2961 );
2962 return;
2963 }
2964
2965 info!(%id, %dropped_shard_id, "enqueing shard finalization due to dropped collection and dropped persist handle");
2966
2967 self.finalizable_shards.lock().insert(dropped_shard_id);
2968
2969 res
2970 } else {
2971 since_handle
2972 .maybe_compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2973 .await
2974 };
2975
2976 if let Some(Err(other_epoch)) = result {
2977 mz_ore::halt!("fenced by envd @ {other_epoch:?}. ours = {epoch:?}");
2978 }
2979 }
2980 }
2981}
2982
2983struct FinalizeShardsTaskConfig {
2984 envd_epoch: NonZeroI64,
2985 config: Arc<Mutex<StorageConfiguration>>,
2986 metrics: StorageCollectionsMetrics,
2987 finalizable_shards: Arc<ShardIdSet>,
2988 finalized_shards: Arc<ShardIdSet>,
2989 persist_location: PersistLocation,
2990 persist: Arc<PersistClientCache>,
2991 read_only: bool,
2992}
2993
2994async fn finalize_shards_task<T>(
2995 FinalizeShardsTaskConfig {
2996 envd_epoch,
2997 config,
2998 metrics,
2999 finalizable_shards,
3000 finalized_shards,
3001 persist_location,
3002 persist,
3003 read_only,
3004 }: FinalizeShardsTaskConfig,
3005) where
3006 T: TimelyTimestamp + TotalOrder + Lattice + Codec64 + Sync,
3007{
3008 if read_only {
3009 info!("disabling shard finalization in read only mode");
3010 return;
3011 }
3012
3013 let mut interval = tokio::time::interval(Duration::from_secs(5));
3014 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
3015 loop {
3016 interval.tick().await;
3017
3018 if !config
3019 .lock()
3020 .expect("lock poisoned")
3021 .parameters
3022 .finalize_shards
3023 {
3024 debug!(
3025 "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3026 );
3027 continue;
3028 }
3029
3030 let current_finalizable_shards = {
3031 finalizable_shards.lock().iter().cloned().collect_vec()
3034 };
3035
3036 if current_finalizable_shards.is_empty() {
3037 debug!("no shards to finalize");
3038 continue;
3039 }
3040
3041 debug!(?current_finalizable_shards, "attempting to finalize shards");
3042
3043 let persist_client = persist.open(persist_location.clone()).await.unwrap();
3045
3046 let metrics = &metrics;
3047 let finalizable_shards = &finalizable_shards;
3048 let finalized_shards = &finalized_shards;
3049 let persist_client = &persist_client;
3050 let diagnostics = &Diagnostics::from_purpose("finalizing shards");
3051
3052 let force_downgrade_since = STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION
3053 .get(config.lock().expect("lock poisoned").config_set());
3054
3055 let epoch = &PersistEpoch::from(envd_epoch);
3056
3057 futures::stream::iter(current_finalizable_shards.clone())
3058 .map(|shard_id| async move {
3059 let persist_client = persist_client.clone();
3060 let diagnostics = diagnostics.clone();
3061 let epoch = epoch.clone();
3062
3063 metrics.finalization_started.inc();
3064
3065 let is_finalized = persist_client
3066 .is_finalized::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
3067 .await
3068 .expect("invalid persist usage");
3069
3070 if is_finalized {
3071 debug!(%shard_id, "shard is already finalized!");
3072 Some(shard_id)
3073 } else {
3074 debug!(%shard_id, "finalizing shard");
3075 let finalize = || async move {
3076 let diagnostics = Diagnostics::from_purpose("finalizing shards");
3078
3079 let mut write_handle: WriteHandle<SourceData, (), T, StorageDiff> =
3082 persist_client
3083 .open_writer(
3084 shard_id,
3085 Arc::new(RelationDesc::empty()),
3086 Arc::new(UnitSchema),
3087 diagnostics,
3088 )
3089 .await
3090 .expect("invalid persist usage");
3091 write_handle.advance_upper(&Antichain::new()).await;
3092 write_handle.expire().await;
3093
3094 if force_downgrade_since {
3095 let mut since_handle: SinceHandle<
3096 SourceData,
3097 (),
3098 T,
3099 StorageDiff,
3100 PersistEpoch,
3101 > = persist_client
3102 .open_critical_since(
3103 shard_id,
3104 PersistClient::CONTROLLER_CRITICAL_SINCE,
3105 Diagnostics::from_purpose("finalizing shards"),
3106 )
3107 .await
3108 .expect("invalid persist usage");
3109 let handle_epoch = since_handle.opaque().clone();
3110 let our_epoch = epoch.clone();
3111 let epoch = if our_epoch.0 > handle_epoch.0 {
3112 handle_epoch
3115 } else {
3116 our_epoch
3121 };
3122 let new_since = Antichain::new();
3123 let downgrade = since_handle
3124 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3125 .await;
3126 if let Err(e) = downgrade {
3127 warn!("tried to finalize a shard with an advancing epoch: {e:?}");
3128 return Ok(());
3129 }
3130 }
3133
3134 persist_client
3135 .finalize_shard::<SourceData, (), T, StorageDiff>(
3136 shard_id,
3137 Diagnostics::from_purpose("finalizing shards"),
3138 )
3139 .await
3140 };
3141
3142 match finalize().await {
3143 Err(e) => {
3144 warn!("error during finalization of shard {shard_id}: {e:?}");
3147 None
3148 }
3149 Ok(()) => {
3150 debug!(%shard_id, "finalize success!");
3151 Some(shard_id)
3152 }
3153 }
3154 }
3155 })
3156 .buffer_unordered(10)
3161 .for_each(|shard_id| async move {
3165 match shard_id {
3166 None => metrics.finalization_failed.inc(),
3167 Some(shard_id) => {
3168 {
3175 let mut finalizable_shards = finalizable_shards.lock();
3176 let mut finalized_shards = finalized_shards.lock();
3177 finalizable_shards.remove(&shard_id);
3178 finalized_shards.insert(shard_id);
3179 }
3180
3181 metrics.finalization_succeeded.inc();
3182 }
3183 }
3184 })
3185 .await;
3186
3187 debug!("done finalizing shards");
3188 }
3189}
3190
3191#[derive(Debug)]
3192pub(crate) enum SnapshotStatsAsOf<T: TimelyTimestamp + Lattice + Codec64> {
3193 Direct(Antichain<T>),
3196 Txns(DataSnapshot<T>),
3199}
3200
3201#[cfg(test)]
3202mod tests {
3203 use std::str::FromStr;
3204 use std::sync::Arc;
3205
3206 use mz_build_info::DUMMY_BUILD_INFO;
3207 use mz_dyncfg::ConfigSet;
3208 use mz_ore::assert_err;
3209 use mz_ore::metrics::{MetricsRegistry, UIntGauge};
3210 use mz_ore::now::SYSTEM_TIME;
3211 use mz_ore::url::SensitiveUrl;
3212 use mz_persist_client::cache::PersistClientCache;
3213 use mz_persist_client::cfg::PersistConfig;
3214 use mz_persist_client::rpc::PubSubClientConnection;
3215 use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
3216 use mz_persist_types::codec_impls::UnitSchema;
3217 use mz_repr::{RelationDesc, Row};
3218 use mz_secrets::InMemorySecretsController;
3219
3220 use super::*;
3221
3222 #[mz_ore::test(tokio::test)]
3223 #[cfg_attr(miri, ignore)] async fn test_snapshot_stats(&self) {
3225 let persist_location = PersistLocation {
3226 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3227 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3228 };
3229 let persist_client = PersistClientCache::new(
3230 PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
3231 &MetricsRegistry::new(),
3232 |_, _| PubSubClientConnection::noop(),
3233 );
3234 let persist_client = Arc::new(persist_client);
3235
3236 let (cmds_tx, mut background_task) =
3237 BackgroundTask::new_for_test(persist_location.clone(), Arc::clone(&persist_client));
3238 let background_task =
3239 mz_ore::task::spawn(|| "storage_collections::background_task", async move {
3240 background_task.run().await
3241 });
3242
3243 let persist = persist_client.open(persist_location).await.unwrap();
3244
3245 let shard_id = ShardId::new();
3246 let since_handle = persist
3247 .open_critical_since(
3248 shard_id,
3249 PersistClient::CONTROLLER_CRITICAL_SINCE,
3250 Diagnostics::for_tests(),
3251 )
3252 .await
3253 .unwrap();
3254 let write_handle = persist
3255 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3256 shard_id,
3257 Arc::new(RelationDesc::empty()),
3258 Arc::new(UnitSchema),
3259 Diagnostics::for_tests(),
3260 )
3261 .await
3262 .unwrap();
3263
3264 cmds_tx
3265 .send(BackgroundCmd::Register {
3266 id: GlobalId::User(1),
3267 is_in_txns: false,
3268 since_handle: SinceHandleWrapper::Critical(since_handle),
3269 write_handle,
3270 })
3271 .unwrap();
3272
3273 let mut write_handle = persist
3274 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3275 shard_id,
3276 Arc::new(RelationDesc::empty()),
3277 Arc::new(UnitSchema),
3278 Diagnostics::for_tests(),
3279 )
3280 .await
3281 .unwrap();
3282
3283 let stats =
3285 snapshot_stats(&cmds_tx, GlobalId::User(2), Antichain::from_elem(0.into())).await;
3286 assert_err!(stats);
3287
3288 let stats_fut = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3290 assert_none!(stats_fut.now_or_never());
3291
3292 let stats_ts1_fut =
3294 snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3295
3296 let data = (
3298 (SourceData(Ok(Row::default())), ()),
3299 mz_repr::Timestamp::from(0),
3300 1i64,
3301 );
3302 let () = write_handle
3303 .compare_and_append(
3304 &[data],
3305 Antichain::from_elem(0.into()),
3306 Antichain::from_elem(1.into()),
3307 )
3308 .await
3309 .unwrap()
3310 .unwrap();
3311
3312 let stats = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(0.into()))
3314 .await
3315 .unwrap();
3316 assert_eq!(stats.num_updates, 1);
3317
3318 let data = (
3320 (SourceData(Ok(Row::default())), ()),
3321 mz_repr::Timestamp::from(1),
3322 1i64,
3323 );
3324 let () = write_handle
3325 .compare_and_append(
3326 &[data],
3327 Antichain::from_elem(1.into()),
3328 Antichain::from_elem(2.into()),
3329 )
3330 .await
3331 .unwrap()
3332 .unwrap();
3333
3334 let stats = stats_ts1_fut.await.unwrap();
3335 assert_eq!(stats.num_updates, 2);
3336
3337 drop(background_task);
3339 }
3340
3341 async fn snapshot_stats<T: TimelyTimestamp + Lattice + Codec64>(
3342 cmds_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
3343 id: GlobalId,
3344 as_of: Antichain<T>,
3345 ) -> Result<SnapshotStats, StorageError<T>> {
3346 let (tx, rx) = oneshot::channel();
3347 cmds_tx
3348 .send(BackgroundCmd::SnapshotStats(
3349 id,
3350 SnapshotStatsAsOf::Direct(as_of),
3351 tx,
3352 ))
3353 .unwrap();
3354 let res = rx.await.expect("BackgroundTask should be live").0;
3355
3356 res.await
3357 }
3358
3359 impl<T: TimelyTimestamp + Lattice + Codec64> BackgroundTask<T> {
3360 fn new_for_test(
3361 _persist_location: PersistLocation,
3362 _persist_client: Arc<PersistClientCache>,
3363 ) -> (mpsc::UnboundedSender<BackgroundCmd<T>>, Self) {
3364 let (cmds_tx, cmds_rx) = mpsc::unbounded_channel();
3365 let (_holds_tx, holds_rx) = mpsc::unbounded_channel();
3366 let connection_context =
3367 ConnectionContext::for_tests(Arc::new(InMemorySecretsController::new()));
3368
3369 let task = Self {
3370 config: Arc::new(Mutex::new(StorageConfiguration::new(
3371 connection_context,
3372 ConfigSet::default(),
3373 ))),
3374 cmds_tx: cmds_tx.clone(),
3375 cmds_rx,
3376 holds_rx,
3377 finalizable_shards: Arc::new(ShardIdSet::new(
3378 UIntGauge::new("finalizable_shards", "dummy gauge for tests").unwrap(),
3379 )),
3380 collections: Arc::new(Mutex::new(BTreeMap::new())),
3381 shard_by_id: BTreeMap::new(),
3382 since_handles: BTreeMap::new(),
3383 txns_handle: None,
3384 txns_shards: BTreeSet::new(),
3385 };
3386
3387 (cmds_tx, task)
3388 }
3389 }
3390}