1use std::cmp::Reverse;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::num::NonZeroI64;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use async_trait::async_trait;
20use differential_dataflow::lattice::Lattice;
21use futures::future::BoxFuture;
22use futures::stream::{BoxStream, FuturesUnordered};
23use futures::{Future, FutureExt, StreamExt};
24use itertools::Itertools;
25
26use mz_ore::collections::CollectionExt;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::now::{EpochMillis, NowFn};
29use mz_ore::task::AbortOnDropHandle;
30use mz_ore::{assert_none, instrument, soft_assert_or_log};
31use mz_persist_client::cache::PersistClientCache;
32use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
33use mz_persist_client::critical::SinceHandle;
34use mz_persist_client::read::{Cursor, ReadHandle};
35use mz_persist_client::schema::CaESchema;
36use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
37use mz_persist_client::write::WriteHandle;
38use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
39use mz_persist_types::Codec64;
40use mz_persist_types::codec_impls::UnitSchema;
41use mz_persist_types::txn::TxnsCodec;
42use mz_repr::{GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
43use mz_storage_types::StorageDiff;
44use mz_storage_types::configuration::StorageConfiguration;
45use mz_storage_types::connections::ConnectionContext;
46use mz_storage_types::connections::inline::InlinedConnection;
47use mz_storage_types::controller::{CollectionMetadata, StorageError, TxnsCodecRow};
48use mz_storage_types::dyncfgs::STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION;
49use mz_storage_types::errors::CollectionMissing;
50use mz_storage_types::parameters::StorageParameters;
51use mz_storage_types::read_holds::ReadHold;
52use mz_storage_types::read_policy::ReadPolicy;
53use mz_storage_types::sources::{
54 GenericSourceConnection, SourceData, SourceDesc, SourceEnvelope, SourceExport,
55 SourceExportDataConfig, Timeline,
56};
57use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
58use mz_txn_wal::metrics::Metrics as TxnMetrics;
59use mz_txn_wal::txn_read::{DataSnapshot, TxnsRead};
60use mz_txn_wal::txns::TxnsHandle;
61use timely::PartialOrder;
62use timely::order::TotalOrder;
63use timely::progress::frontier::MutableAntichain;
64use timely::progress::{Antichain, ChangeBatch, Timestamp as TimelyTimestamp};
65use tokio::sync::{mpsc, oneshot};
66use tokio::time::MissedTickBehavior;
67use tracing::{debug, info, trace, warn};
68
69use crate::client::TimestamplessUpdateBuilder;
70use crate::controller::{
71 CollectionDescription, DataSource, PersistEpoch, StorageMetadata, StorageTxn,
72};
73use crate::storage_collections::metrics::{ShardIdSet, StorageCollectionsMetrics};
74
75mod metrics;
76
77#[async_trait]
91pub trait StorageCollections: Debug + Sync {
92 type Timestamp: TimelyTimestamp;
93
94 async fn initialize_state(
101 &self,
102 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
103 init_ids: BTreeSet<GlobalId>,
104 ) -> Result<(), StorageError<Self::Timestamp>>;
105
106 fn update_parameters(&self, config_params: StorageParameters);
108
109 fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing>;
111
112 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
118
119 fn collection_frontiers(
121 &self,
122 id: GlobalId,
123 ) -> Result<CollectionFrontiers<Self::Timestamp>, CollectionMissing> {
124 let frontiers = self
125 .collections_frontiers(vec![id])?
126 .expect_element(|| "known to exist");
127
128 Ok(frontiers)
129 }
130
131 fn collections_frontiers(
134 &self,
135 id: Vec<GlobalId>,
136 ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, CollectionMissing>;
137
138 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>>;
143
144 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
147
148 async fn snapshot_stats(
151 &self,
152 id: GlobalId,
153 as_of: Antichain<Self::Timestamp>,
154 ) -> Result<SnapshotStats, StorageError<Self::Timestamp>>;
155
156 async fn snapshot_parts_stats(
165 &self,
166 id: GlobalId,
167 as_of: Antichain<Self::Timestamp>,
168 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>;
169
170 fn snapshot(
172 &self,
173 id: GlobalId,
174 as_of: Self::Timestamp,
175 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>;
176
177 async fn snapshot_latest(
180 &self,
181 id: GlobalId,
182 ) -> Result<Vec<Row>, StorageError<Self::Timestamp>>;
183
184 fn snapshot_cursor(
186 &self,
187 id: GlobalId,
188 as_of: Self::Timestamp,
189 ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
190 where
191 Self::Timestamp: Codec64 + TimelyTimestamp + Lattice;
192
193 fn snapshot_and_stream(
198 &self,
199 id: GlobalId,
200 as_of: Self::Timestamp,
201 ) -> BoxFuture<
202 'static,
203 Result<
204 BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
205 StorageError<Self::Timestamp>,
206 >,
207 >;
208
209 fn create_update_builder(
212 &self,
213 id: GlobalId,
214 ) -> BoxFuture<
215 'static,
216 Result<
217 TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
218 StorageError<Self::Timestamp>,
219 >,
220 >
221 where
222 Self::Timestamp: Lattice + Codec64;
223
224 async fn prepare_state(
230 &self,
231 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
232 ids_to_add: BTreeSet<GlobalId>,
233 ids_to_drop: BTreeSet<GlobalId>,
234 ids_to_register: BTreeMap<GlobalId, ShardId>,
235 ) -> Result<(), StorageError<Self::Timestamp>>;
236
237 async fn create_collections_for_bootstrap(
263 &self,
264 storage_metadata: &StorageMetadata,
265 register_ts: Option<Self::Timestamp>,
266 collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
267 migrated_storage_collections: &BTreeSet<GlobalId>,
268 ) -> Result<(), StorageError<Self::Timestamp>>;
269
270 async fn alter_ingestion_source_desc(
278 &self,
279 ingestion_id: GlobalId,
280 source_desc: SourceDesc,
281 ) -> Result<(), StorageError<Self::Timestamp>>;
282
283 async fn alter_ingestion_export_data_configs(
285 &self,
286 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
287 ) -> Result<(), StorageError<Self::Timestamp>>;
288
289 async fn alter_ingestion_connections(
294 &self,
295 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
296 ) -> Result<(), StorageError<Self::Timestamp>>;
297
298 async fn alter_table_desc(
300 &self,
301 existing_collection: GlobalId,
302 new_collection: GlobalId,
303 new_desc: RelationDesc,
304 expected_version: RelationVersion,
305 ) -> Result<(), StorageError<Self::Timestamp>>;
306
307 fn drop_collections_unvalidated(
319 &self,
320 storage_metadata: &StorageMetadata,
321 identifiers: Vec<GlobalId>,
322 );
323
324 fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>);
338
339 fn acquire_read_holds(
342 &self,
343 desired_holds: Vec<GlobalId>,
344 ) -> Result<Vec<ReadHold<Self::Timestamp>>, CollectionMissing>;
345
346 fn determine_time_dependence(
349 &self,
350 id: GlobalId,
351 ) -> Result<Option<TimeDependence>, TimeDependenceError>;
352
353 fn dump(&self) -> Result<serde_json::Value, anyhow::Error>;
355}
356
357pub struct SnapshotCursor<T: Codec64 + TimelyTimestamp + Lattice> {
360 pub _read_handle: ReadHandle<SourceData, (), T, StorageDiff>,
363 pub cursor: Cursor<SourceData, (), T, StorageDiff>,
364}
365
366impl<T: Codec64 + TimelyTimestamp + Lattice + Sync> SnapshotCursor<T> {
367 pub async fn next(
368 &mut self,
369 ) -> Option<impl Iterator<Item = (SourceData, T, StorageDiff)> + Sized + '_> {
370 let iter = self.cursor.next().await?;
371 Some(iter.map(|((k, ()), t, d)| (k, t, d)))
372 }
373}
374
375#[derive(Debug)]
377pub struct CollectionFrontiers<T> {
378 pub id: GlobalId,
380
381 pub write_frontier: Antichain<T>,
383
384 pub implied_capability: Antichain<T>,
391
392 pub read_capabilities: Antichain<T>,
395}
396
397#[derive(Debug, Clone)]
400pub struct StorageCollectionsImpl<
401 T: TimelyTimestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
402> {
403 envd_epoch: NonZeroI64,
406
407 read_only: bool,
413
414 finalizable_shards: Arc<ShardIdSet>,
417
418 finalized_shards: Arc<ShardIdSet>,
423
424 collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
426
427 txns_read: TxnsRead<T>,
429
430 config: Arc<Mutex<StorageConfiguration>>,
432
433 initial_txn_upper: Antichain<T>,
442
443 persist_location: PersistLocation,
445
446 persist: Arc<PersistClientCache>,
448
449 cmd_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
451
452 holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<T>)>,
454
455 _background_task: Arc<AbortOnDropHandle<()>>,
457 _finalize_shards_task: Arc<AbortOnDropHandle<()>>,
458}
459
460impl<T> StorageCollectionsImpl<T>
470where
471 T: TimelyTimestamp
472 + Lattice
473 + Codec64
474 + From<EpochMillis>
475 + TimestampManipulation
476 + Into<mz_repr::Timestamp>
477 + Sync,
478{
479 pub async fn new(
487 persist_location: PersistLocation,
488 persist_clients: Arc<PersistClientCache>,
489 metrics_registry: &MetricsRegistry,
490 _now: NowFn,
491 txns_metrics: Arc<TxnMetrics>,
492 envd_epoch: NonZeroI64,
493 read_only: bool,
494 connection_context: ConnectionContext,
495 txn: &dyn StorageTxn<T>,
496 ) -> Self {
497 let metrics = StorageCollectionsMetrics::register_into(metrics_registry);
498
499 let txns_id = txn
503 .get_txn_wal_shard()
504 .expect("must call prepare initialization before creating StorageCollections");
505
506 let txns_client = persist_clients
507 .open(persist_location.clone())
508 .await
509 .expect("location should be valid");
510
511 let _txns_handle: TxnsHandle<SourceData, (), T, StorageDiff, PersistEpoch, TxnsCodecRow> =
514 TxnsHandle::open(
515 T::minimum(),
516 txns_client.clone(),
517 txns_client.dyncfgs().clone(),
518 Arc::clone(&txns_metrics),
519 txns_id,
520 )
521 .await;
522
523 let (txns_key_schema, txns_val_schema) = TxnsCodecRow::schemas();
525 let mut txns_write = txns_client
526 .open_writer(
527 txns_id,
528 Arc::new(txns_key_schema),
529 Arc::new(txns_val_schema),
530 Diagnostics {
531 shard_name: "txns".to_owned(),
532 handle_purpose: "commit txns".to_owned(),
533 },
534 )
535 .await
536 .expect("txns schema shouldn't change");
537
538 let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
539
540 let collections = Arc::new(std::sync::Mutex::new(BTreeMap::default()));
541 let finalizable_shards =
542 Arc::new(ShardIdSet::new(metrics.finalization_outstanding.clone()));
543 let finalized_shards =
544 Arc::new(ShardIdSet::new(metrics.finalization_pending_commit.clone()));
545 let config = Arc::new(Mutex::new(StorageConfiguration::new(
546 connection_context,
547 mz_dyncfgs::all_dyncfgs(),
548 )));
549
550 let initial_txn_upper = txns_write.fetch_recent_upper().await.to_owned();
551
552 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
553 let (holds_tx, holds_rx) = mpsc::unbounded_channel();
554 let mut background_task = BackgroundTask {
555 config: Arc::clone(&config),
556 cmds_tx: cmd_tx.clone(),
557 cmds_rx: cmd_rx,
558 holds_rx,
559 collections: Arc::clone(&collections),
560 finalizable_shards: Arc::clone(&finalizable_shards),
561 shard_by_id: BTreeMap::new(),
562 since_handles: BTreeMap::new(),
563 txns_handle: Some(txns_write),
564 txns_shards: Default::default(),
565 };
566
567 let background_task =
568 mz_ore::task::spawn(|| "storage_collections::background_task", async move {
569 background_task.run().await
570 });
571
572 let finalize_shards_task = mz_ore::task::spawn(
573 || "storage_collections::finalize_shards_task",
574 finalize_shards_task::<T>(FinalizeShardsTaskConfig {
575 envd_epoch: envd_epoch.clone(),
576 config: Arc::clone(&config),
577 metrics,
578 finalizable_shards: Arc::clone(&finalizable_shards),
579 finalized_shards: Arc::clone(&finalized_shards),
580 persist_location: persist_location.clone(),
581 persist: Arc::clone(&persist_clients),
582 read_only,
583 }),
584 );
585
586 Self {
587 finalizable_shards,
588 finalized_shards,
589 collections,
590 txns_read,
591 envd_epoch,
592 read_only,
593 config,
594 initial_txn_upper,
595 persist_location,
596 persist: persist_clients,
597 cmd_tx,
598 holds_tx,
599 _background_task: Arc::new(background_task.abort_on_drop()),
600 _finalize_shards_task: Arc::new(finalize_shards_task.abort_on_drop()),
601 }
602 }
603
604 async fn open_data_handles(
612 &self,
613 id: &GlobalId,
614 shard: ShardId,
615 since: Option<&Antichain<T>>,
616 relation_desc: RelationDesc,
617 persist_client: &PersistClient,
618 ) -> (
619 WriteHandle<SourceData, (), T, StorageDiff>,
620 SinceHandleWrapper<T>,
621 ) {
622 let since_handle = if self.read_only {
623 let read_handle = self
624 .open_leased_handle(id, shard, relation_desc.clone(), since, persist_client)
625 .await;
626 SinceHandleWrapper::Leased(read_handle)
627 } else {
628 persist_client
631 .upgrade_version::<SourceData, (), T, StorageDiff>(
632 shard,
633 Diagnostics {
634 shard_name: id.to_string(),
635 handle_purpose: format!("controller data for {}", id),
636 },
637 )
638 .await
639 .expect("invalid persist usage");
640
641 let since_handle = self
642 .open_critical_handle(id, shard, since, persist_client)
643 .await;
644
645 SinceHandleWrapper::Critical(since_handle)
646 };
647
648 let mut write_handle = self
649 .open_write_handle(id, shard, relation_desc, persist_client)
650 .await;
651
652 write_handle.fetch_recent_upper().await;
663
664 (write_handle, since_handle)
665 }
666
667 async fn open_write_handle(
669 &self,
670 id: &GlobalId,
671 shard: ShardId,
672 relation_desc: RelationDesc,
673 persist_client: &PersistClient,
674 ) -> WriteHandle<SourceData, (), T, StorageDiff> {
675 let diagnostics = Diagnostics {
676 shard_name: id.to_string(),
677 handle_purpose: format!("controller data for {}", id),
678 };
679
680 let write = persist_client
681 .open_writer(
682 shard,
683 Arc::new(relation_desc),
684 Arc::new(UnitSchema),
685 diagnostics.clone(),
686 )
687 .await
688 .expect("invalid persist usage");
689
690 write
691 }
692
693 async fn open_critical_handle(
701 &self,
702 id: &GlobalId,
703 shard: ShardId,
704 since: Option<&Antichain<T>>,
705 persist_client: &PersistClient,
706 ) -> SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch> {
707 tracing::debug!(%id, ?since, "opening critical handle");
708
709 assert!(
710 !self.read_only,
711 "attempting to open critical SinceHandle in read-only mode"
712 );
713
714 let diagnostics = Diagnostics {
715 shard_name: id.to_string(),
716 handle_purpose: format!("controller data for {}", id),
717 };
718
719 let since_handle = {
722 let mut handle: SinceHandle<_, _, _, _, PersistEpoch> = persist_client
725 .open_critical_since(
726 shard,
727 PersistClient::CONTROLLER_CRITICAL_SINCE,
728 diagnostics.clone(),
729 )
730 .await
731 .expect("invalid persist usage");
732
733 let provided_since = match since {
737 Some(since) => since,
738 None => &Antichain::from_elem(T::minimum()),
739 };
740 let since = handle.since().join(provided_since);
741
742 let our_epoch = self.envd_epoch;
743
744 loop {
745 let current_epoch: PersistEpoch = handle.opaque().clone();
746
747 let unchecked_success = current_epoch.0.map(|e| e <= our_epoch).unwrap_or(true);
749
750 if unchecked_success {
751 let checked_success = handle
754 .compare_and_downgrade_since(
755 ¤t_epoch,
756 (&PersistEpoch::from(our_epoch), &since),
757 )
758 .await
759 .is_ok();
760 if checked_success {
761 break handle;
762 }
763 } else {
764 mz_ore::halt!("fenced by envd @ {current_epoch:?}. ours = {our_epoch}");
765 }
766 }
767 };
768
769 since_handle
770 }
771
772 async fn open_leased_handle(
778 &self,
779 id: &GlobalId,
780 shard: ShardId,
781 relation_desc: RelationDesc,
782 since: Option<&Antichain<T>>,
783 persist_client: &PersistClient,
784 ) -> ReadHandle<SourceData, (), T, StorageDiff> {
785 tracing::debug!(%id, ?since, "opening leased handle");
786
787 let diagnostics = Diagnostics {
788 shard_name: id.to_string(),
789 handle_purpose: format!("controller data for {}", id),
790 };
791
792 let use_critical_since = false;
793 let mut handle: ReadHandle<_, _, _, _> = persist_client
794 .open_leased_reader(
795 shard,
796 Arc::new(relation_desc),
797 Arc::new(UnitSchema),
798 diagnostics.clone(),
799 use_critical_since,
800 )
801 .await
802 .expect("invalid persist usage");
803
804 let provided_since = match since {
808 Some(since) => since,
809 None => &Antichain::from_elem(T::minimum()),
810 };
811 let since = handle.since().join(provided_since);
812
813 handle.downgrade_since(&since).await;
814
815 handle
816 }
817
818 fn register_handles(
819 &self,
820 id: GlobalId,
821 is_in_txns: bool,
822 since_handle: SinceHandleWrapper<T>,
823 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
824 ) {
825 self.send(BackgroundCmd::Register {
826 id,
827 is_in_txns,
828 since_handle,
829 write_handle,
830 });
831 }
832
833 fn send(&self, cmd: BackgroundCmd<T>) {
834 let _ = self.cmd_tx.send(cmd);
835 }
836
837 async fn snapshot_stats_inner(
838 &self,
839 id: GlobalId,
840 as_of: SnapshotStatsAsOf<T>,
841 ) -> Result<SnapshotStats, StorageError<T>> {
842 let (tx, rx) = oneshot::channel();
849 self.send(BackgroundCmd::SnapshotStats(id, as_of, tx));
850 rx.await.expect("BackgroundTask should be live").0.await
851 }
852
853 fn install_collection_dependency_read_holds_inner(
859 &self,
860 self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
861 id: GlobalId,
862 ) -> Result<(), StorageError<T>> {
863 let (deps, collection_implied_capability) = match self_collections.get(&id) {
864 Some(CollectionState {
865 storage_dependencies: deps,
866 implied_capability,
867 ..
868 }) => (deps.clone(), implied_capability),
869 _ => return Ok(()),
870 };
871
872 for dep in deps.iter() {
873 let dep_collection = self_collections
874 .get(dep)
875 .ok_or(StorageError::IdentifierMissing(id))?;
876
877 mz_ore::soft_assert_or_log!(
878 PartialOrder::less_equal(
879 &dep_collection.implied_capability,
880 collection_implied_capability
881 ),
882 "dependency since ({dep}@{:?}) cannot be in advance of dependent's since ({id}@{:?})",
883 dep_collection.implied_capability,
884 collection_implied_capability,
885 );
886 }
887
888 self.install_read_capabilities_inner(
889 self_collections,
890 id,
891 &deps,
892 collection_implied_capability.clone(),
893 )?;
894
895 Ok(())
896 }
897
898 fn determine_collection_dependencies(
900 &self,
901 self_collections: &BTreeMap<GlobalId, CollectionState<T>>,
902 source_id: GlobalId,
903 collection_desc: &CollectionDescription<T>,
904 ) -> Result<Vec<GlobalId>, StorageError<T>> {
905 let mut dependencies = Vec::new();
906
907 if let Some(id) = collection_desc.primary {
908 dependencies.push(id);
909 }
910
911 match &collection_desc.data_source {
912 DataSource::Introspection(_)
913 | DataSource::Webhook
914 | DataSource::Table
915 | DataSource::Progress
916 | DataSource::Other => (),
917 DataSource::IngestionExport {
918 ingestion_id,
919 data_config,
920 ..
921 } => {
922 let source = self_collections
925 .get(ingestion_id)
926 .ok_or(StorageError::IdentifierMissing(*ingestion_id))?;
927 let DataSource::Ingestion(ingestion) = &source.description.data_source else {
928 panic!("SourceExport must refer to a primary source that already exists");
929 };
930
931 match data_config.envelope {
932 SourceEnvelope::CdcV2 => (),
933 _ => dependencies.push(ingestion.remap_collection_id),
934 }
935 }
936 DataSource::Ingestion(ingestion) => {
938 if ingestion.remap_collection_id != source_id {
939 dependencies.push(ingestion.remap_collection_id);
940 }
941 }
942 DataSource::Sink { desc } => dependencies.push(desc.sink.from),
943 }
944
945 Ok(dependencies)
946 }
947
948 #[instrument(level = "debug")]
950 fn install_read_capabilities_inner(
951 &self,
952 self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
953 from_id: GlobalId,
954 storage_dependencies: &[GlobalId],
955 read_capability: Antichain<T>,
956 ) -> Result<(), StorageError<T>> {
957 let mut changes = ChangeBatch::new();
958 for time in read_capability.iter() {
959 changes.update(time.clone(), 1);
960 }
961
962 if tracing::span_enabled!(tracing::Level::TRACE) {
963 let user_capabilities = self_collections
965 .iter_mut()
966 .filter(|(id, _c)| id.is_user())
967 .map(|(id, c)| {
968 let updates = c.read_capabilities.updates().cloned().collect_vec();
969 (*id, c.implied_capability.clone(), updates)
970 })
971 .collect_vec();
972
973 trace!(
974 %from_id,
975 ?storage_dependencies,
976 ?read_capability,
977 ?user_capabilities,
978 "install_read_capabilities_inner");
979 }
980
981 let mut storage_read_updates = storage_dependencies
982 .iter()
983 .map(|id| (*id, changes.clone()))
984 .collect();
985
986 StorageCollectionsImpl::update_read_capabilities_inner(
987 &self.cmd_tx,
988 self_collections,
989 &mut storage_read_updates,
990 );
991
992 if tracing::span_enabled!(tracing::Level::TRACE) {
993 let user_capabilities = self_collections
995 .iter_mut()
996 .filter(|(id, _c)| id.is_user())
997 .map(|(id, c)| {
998 let updates = c.read_capabilities.updates().cloned().collect_vec();
999 (*id, c.implied_capability.clone(), updates)
1000 })
1001 .collect_vec();
1002
1003 trace!(
1004 %from_id,
1005 ?storage_dependencies,
1006 ?read_capability,
1007 ?user_capabilities,
1008 "after install_read_capabilities_inner!");
1009 }
1010
1011 Ok(())
1012 }
1013
1014 async fn recent_upper(&self, id: GlobalId) -> Result<Antichain<T>, StorageError<T>> {
1015 let metadata = &self.collection_metadata(id)?;
1016 let persist_client = self
1017 .persist
1018 .open(metadata.persist_location.clone())
1019 .await
1020 .unwrap();
1021 let diagnostics = Diagnostics {
1024 shard_name: id.to_string(),
1025 handle_purpose: format!("controller data for {}", id),
1026 };
1027 let write = persist_client
1030 .open_writer::<SourceData, (), T, StorageDiff>(
1031 metadata.data_shard,
1032 Arc::new(metadata.relation_desc.clone()),
1033 Arc::new(UnitSchema),
1034 diagnostics.clone(),
1035 )
1036 .await
1037 .expect("invalid persist usage");
1038 Ok(write.shared_upper())
1039 }
1040
1041 async fn read_handle_for_snapshot(
1042 persist: Arc<PersistClientCache>,
1043 metadata: &CollectionMetadata,
1044 id: GlobalId,
1045 ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
1046 let persist_client = persist
1047 .open(metadata.persist_location.clone())
1048 .await
1049 .unwrap();
1050
1051 let read_handle = persist_client
1057 .open_leased_reader::<SourceData, (), _, _>(
1058 metadata.data_shard,
1059 Arc::new(metadata.relation_desc.clone()),
1060 Arc::new(UnitSchema),
1061 Diagnostics {
1062 shard_name: id.to_string(),
1063 handle_purpose: format!("snapshot {}", id),
1064 },
1065 USE_CRITICAL_SINCE_SNAPSHOT.get(&persist.cfg),
1066 )
1067 .await
1068 .expect("invalid persist usage");
1069 Ok(read_handle)
1070 }
1071
1072 fn snapshot(
1078 &self,
1079 id: GlobalId,
1080 as_of: T,
1081 txns_read: &TxnsRead<T>,
1082 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<T>>>
1083 where
1084 T: Codec64 + From<EpochMillis> + TimestampManipulation,
1085 {
1086 let metadata = match self.collection_metadata(id) {
1087 Ok(metadata) => metadata.clone(),
1088 Err(e) => return async { Err(e.into()) }.boxed(),
1089 };
1090 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1091 assert_eq!(txns_id, txns_read.txns_id());
1092 txns_read.clone()
1093 });
1094 let persist = Arc::clone(&self.persist);
1095 async move {
1096 let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1097 let contents = match txns_read {
1098 None => {
1099 read_handle
1101 .snapshot_and_fetch(Antichain::from_elem(as_of))
1102 .await
1103 }
1104 Some(txns_read) => {
1105 txns_read.update_gt(as_of.clone()).await;
1119 let data_snapshot = txns_read
1120 .data_snapshot(metadata.data_shard, as_of.clone())
1121 .await;
1122 data_snapshot.snapshot_and_fetch(&mut read_handle).await
1123 }
1124 };
1125 match contents {
1126 Ok(contents) => {
1127 let mut snapshot = Vec::with_capacity(contents.len());
1128 for ((data, _), _, diff) in contents {
1129 let row = data.0?;
1132 snapshot.push((row, diff));
1133 }
1134 Ok(snapshot)
1135 }
1136 Err(_) => Err(StorageError::ReadBeforeSince(id)),
1137 }
1138 }
1139 .boxed()
1140 }
1141
1142 fn snapshot_and_stream(
1143 &self,
1144 id: GlobalId,
1145 as_of: T,
1146 txns_read: &TxnsRead<T>,
1147 ) -> BoxFuture<'static, Result<BoxStream<'static, (SourceData, T, StorageDiff)>, StorageError<T>>>
1148 {
1149 use futures::stream::StreamExt;
1150
1151 let metadata = match self.collection_metadata(id) {
1152 Ok(metadata) => metadata.clone(),
1153 Err(e) => return async { Err(e.into()) }.boxed(),
1154 };
1155 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1156 assert_eq!(txns_id, txns_read.txns_id());
1157 txns_read.clone()
1158 });
1159 let persist = Arc::clone(&self.persist);
1160
1161 async move {
1162 let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1163 let stream = match txns_read {
1164 None => {
1165 read_handle
1167 .snapshot_and_stream(Antichain::from_elem(as_of))
1168 .await
1169 .map_err(|_| StorageError::ReadBeforeSince(id))?
1170 .boxed()
1171 }
1172 Some(txns_read) => {
1173 txns_read.update_gt(as_of.clone()).await;
1174 let data_snapshot = txns_read
1175 .data_snapshot(metadata.data_shard, as_of.clone())
1176 .await;
1177 data_snapshot
1178 .snapshot_and_stream(&mut read_handle)
1179 .await
1180 .map_err(|_| StorageError::ReadBeforeSince(id))?
1181 .boxed()
1182 }
1183 };
1184
1185 let stream = stream.map(|((data, _v), t, d)| (data, t, d)).boxed();
1187 Ok(stream)
1188 }
1189 .boxed()
1190 }
1191
1192 fn set_read_policies_inner(
1193 &self,
1194 collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1195 policies: Vec<(GlobalId, ReadPolicy<T>)>,
1196 ) {
1197 trace!("set_read_policies: {:?}", policies);
1198
1199 let mut read_capability_changes = BTreeMap::default();
1200
1201 for (id, policy) in policies.into_iter() {
1202 let collection = match collections.get_mut(&id) {
1203 Some(c) => c,
1204 None => {
1205 panic!("Reference to absent collection {id}");
1206 }
1207 };
1208
1209 let mut new_read_capability = policy.frontier(collection.write_frontier.borrow());
1210
1211 if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
1212 let mut update = ChangeBatch::new();
1213 update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
1214 std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
1215 update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
1216 if !update.is_empty() {
1217 read_capability_changes.insert(id, update);
1218 }
1219 }
1220
1221 collection.read_policy = policy;
1222 }
1223
1224 for (id, changes) in read_capability_changes.iter() {
1225 if id.is_user() {
1226 trace!(%id, ?changes, "in set_read_policies, capability changes");
1227 }
1228 }
1229
1230 if !read_capability_changes.is_empty() {
1231 StorageCollectionsImpl::update_read_capabilities_inner(
1232 &self.cmd_tx,
1233 collections,
1234 &mut read_capability_changes,
1235 );
1236 }
1237 }
1238
1239 fn update_read_capabilities_inner(
1243 cmd_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
1244 collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1245 updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>,
1246 ) {
1247 let mut collections_net = BTreeMap::new();
1249
1250 while let Some(id) = updates.keys().rev().next().cloned() {
1255 let mut update = updates.remove(&id).unwrap();
1256
1257 if id.is_user() {
1258 trace!(id = ?id, update = ?update, "update_read_capabilities");
1259 }
1260
1261 let collection = if let Some(c) = collections.get_mut(&id) {
1262 c
1263 } else {
1264 let has_positive_updates = update.iter().any(|(_ts, diff)| *diff > 0);
1265 if has_positive_updates {
1266 panic!(
1267 "reference to absent collection {id} but we have positive updates: {:?}",
1268 update
1269 );
1270 } else {
1271 continue;
1274 }
1275 };
1276
1277 let current_read_capabilities = collection.read_capabilities.frontier().to_owned();
1278 for (time, diff) in update.iter() {
1279 assert!(
1280 collection.read_capabilities.count_for(time) + diff >= 0,
1281 "update {:?} for collection {id} would lead to negative \
1282 read capabilities, read capabilities before applying: {:?}",
1283 update,
1284 collection.read_capabilities
1285 );
1286
1287 if collection.read_capabilities.count_for(time) + diff > 0 {
1288 assert!(
1289 current_read_capabilities.less_equal(time),
1290 "update {:?} for collection {id} is trying to \
1291 install read capabilities before the current \
1292 frontier of read capabilities, read capabilities before applying: {:?}",
1293 update,
1294 collection.read_capabilities
1295 );
1296 }
1297 }
1298
1299 let changes = collection.read_capabilities.update_iter(update.drain());
1300 update.extend(changes);
1301
1302 if id.is_user() {
1303 trace!(
1304 %id,
1305 ?collection.storage_dependencies,
1306 ?update,
1307 "forwarding update to storage dependencies");
1308 }
1309
1310 for id in collection.storage_dependencies.iter() {
1311 updates
1312 .entry(*id)
1313 .or_insert_with(ChangeBatch::new)
1314 .extend(update.iter().cloned());
1315 }
1316
1317 let (changes, frontier) = collections_net
1318 .entry(id)
1319 .or_insert_with(|| (<ChangeBatch<_>>::new(), Antichain::new()));
1320
1321 changes.extend(update.drain());
1322 *frontier = collection.read_capabilities.frontier().to_owned();
1323 }
1324
1325 let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
1328 for (key, (mut changes, frontier)) in collections_net {
1329 if !changes.is_empty() {
1330 let collection = collections.get(&key).expect("must still exist");
1332 let should_emit_persist_compaction = collection.description.primary.is_none();
1333
1334 if frontier.is_empty() {
1335 info!(id = %key, "removing collection state because the since advanced to []!");
1336 collections.remove(&key).expect("must still exist");
1337 }
1338
1339 if should_emit_persist_compaction {
1340 persist_compaction_commands.push((key, frontier));
1341 }
1342 }
1343 }
1344
1345 if !persist_compaction_commands.is_empty() {
1346 cmd_tx
1347 .send(BackgroundCmd::DowngradeSince(persist_compaction_commands))
1348 .expect("cannot fail to send");
1349 }
1350 }
1351
1352 fn synchronize_finalized_shards(&self, storage_metadata: &StorageMetadata) {
1354 self.finalized_shards
1355 .lock()
1356 .retain(|shard| storage_metadata.unfinalized_shards.contains(shard));
1357 }
1358}
1359
1360#[async_trait]
1362impl<T> StorageCollections for StorageCollectionsImpl<T>
1363where
1364 T: TimelyTimestamp
1365 + Lattice
1366 + Codec64
1367 + From<EpochMillis>
1368 + TimestampManipulation
1369 + Into<mz_repr::Timestamp>
1370 + Sync,
1371{
1372 type Timestamp = T;
1373
1374 async fn initialize_state(
1375 &self,
1376 txn: &mut (dyn StorageTxn<T> + Send),
1377 init_ids: BTreeSet<GlobalId>,
1378 ) -> Result<(), StorageError<T>> {
1379 let metadata = txn.get_collection_metadata();
1380 let existing_metadata: BTreeSet<_> = metadata.into_iter().map(|(id, _)| id).collect();
1381
1382 let new_collections: BTreeSet<GlobalId> =
1384 init_ids.difference(&existing_metadata).cloned().collect();
1385
1386 self.prepare_state(
1387 txn,
1388 new_collections,
1389 BTreeSet::default(),
1390 BTreeMap::default(),
1391 )
1392 .await?;
1393
1394 let unfinalized_shards = txn.get_unfinalized_shards().into_iter().collect_vec();
1402
1403 info!(?unfinalized_shards, "initializing finalizable_shards");
1404
1405 self.finalizable_shards.lock().extend(unfinalized_shards);
1406
1407 Ok(())
1408 }
1409
1410 fn update_parameters(&self, config_params: StorageParameters) {
1411 config_params.dyncfg_updates.apply(self.persist.cfg());
1414
1415 self.config
1416 .lock()
1417 .expect("lock poisoned")
1418 .update(config_params);
1419 }
1420
1421 fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing> {
1422 let collections = self.collections.lock().expect("lock poisoned");
1423
1424 collections
1425 .get(&id)
1426 .map(|c| c.collection_metadata.clone())
1427 .ok_or(CollectionMissing(id))
1428 }
1429
1430 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
1431 let collections = self.collections.lock().expect("lock poisoned");
1432
1433 collections
1434 .iter()
1435 .filter(|(_id, c)| !c.is_dropped())
1436 .map(|(id, c)| (*id, c.collection_metadata.clone()))
1437 .collect()
1438 }
1439
1440 fn collections_frontiers(
1441 &self,
1442 ids: Vec<GlobalId>,
1443 ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, CollectionMissing> {
1444 if ids.is_empty() {
1445 return Ok(vec![]);
1446 }
1447
1448 let collections = self.collections.lock().expect("lock poisoned");
1449
1450 let res = ids
1451 .into_iter()
1452 .map(|id| {
1453 collections
1454 .get(&id)
1455 .map(|c| CollectionFrontiers {
1456 id: id.clone(),
1457 write_frontier: c.write_frontier.clone(),
1458 implied_capability: c.implied_capability.clone(),
1459 read_capabilities: c.read_capabilities.frontier().to_owned(),
1460 })
1461 .ok_or(CollectionMissing(id))
1462 })
1463 .collect::<Result<Vec<_>, _>>()?;
1464
1465 Ok(res)
1466 }
1467
1468 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
1469 let collections = self.collections.lock().expect("lock poisoned");
1470
1471 let res = collections
1472 .iter()
1473 .filter(|(_id, c)| !c.is_dropped())
1474 .map(|(id, c)| CollectionFrontiers {
1475 id: id.clone(),
1476 write_frontier: c.write_frontier.clone(),
1477 implied_capability: c.implied_capability.clone(),
1478 read_capabilities: c.read_capabilities.frontier().to_owned(),
1479 })
1480 .collect_vec();
1481
1482 res
1483 }
1484
1485 async fn snapshot_stats(
1486 &self,
1487 id: GlobalId,
1488 as_of: Antichain<Self::Timestamp>,
1489 ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
1490 let metadata = self.collection_metadata(id)?;
1491
1492 let as_of = match metadata.txns_shard.as_ref() {
1495 None => SnapshotStatsAsOf::Direct(as_of),
1496 Some(txns_id) => {
1497 assert_eq!(txns_id, self.txns_read.txns_id());
1498 let as_of = as_of
1499 .into_option()
1500 .expect("cannot read as_of the empty antichain");
1501 self.txns_read.update_gt(as_of.clone()).await;
1502 let data_snapshot = self
1503 .txns_read
1504 .data_snapshot(metadata.data_shard, as_of.clone())
1505 .await;
1506 SnapshotStatsAsOf::Txns(data_snapshot)
1507 }
1508 };
1509 self.snapshot_stats_inner(id, as_of).await
1510 }
1511
1512 async fn snapshot_parts_stats(
1513 &self,
1514 id: GlobalId,
1515 as_of: Antichain<Self::Timestamp>,
1516 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
1517 let metadata = {
1518 let self_collections = self.collections.lock().expect("lock poisoned");
1519
1520 let collection_metadata = self_collections
1521 .get(&id)
1522 .ok_or(StorageError::IdentifierMissing(id))
1523 .map(|c| c.collection_metadata.clone());
1524
1525 match collection_metadata {
1526 Ok(m) => m,
1527 Err(e) => return Box::pin(async move { Err(e) }),
1528 }
1529 };
1530
1531 let persist = Arc::clone(&self.persist);
1534 let read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await;
1535
1536 let data_snapshot = match (metadata, as_of.as_option()) {
1537 (
1538 CollectionMetadata {
1539 txns_shard: Some(txns_id),
1540 data_shard,
1541 ..
1542 },
1543 Some(as_of),
1544 ) => {
1545 assert_eq!(txns_id, *self.txns_read.txns_id());
1546 self.txns_read.update_gt(as_of.clone()).await;
1547 let data_snapshot = self
1548 .txns_read
1549 .data_snapshot(data_shard, as_of.clone())
1550 .await;
1551 Some(data_snapshot)
1552 }
1553 _ => None,
1554 };
1555
1556 Box::pin(async move {
1557 let read_handle = read_handle?;
1558 let result = match data_snapshot {
1559 Some(data_snapshot) => data_snapshot.snapshot_parts_stats(&read_handle).await,
1560 None => read_handle.snapshot_parts_stats(as_of).await,
1561 };
1562 read_handle.expire().await;
1563 result.map_err(|_| StorageError::ReadBeforeSince(id))
1564 })
1565 }
1566
1567 fn snapshot(
1573 &self,
1574 id: GlobalId,
1575 as_of: Self::Timestamp,
1576 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>> {
1577 self.snapshot(id, as_of, &self.txns_read)
1578 }
1579
1580 async fn snapshot_latest(
1581 &self,
1582 id: GlobalId,
1583 ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
1584 let upper = self.recent_upper(id).await?;
1585 let res = match upper.as_option() {
1586 Some(f) if f > &T::minimum() => {
1587 let as_of = f.step_back().unwrap();
1588
1589 let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap();
1590 snapshot
1591 .into_iter()
1592 .map(|(row, diff)| {
1593 assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
1594 row
1595 })
1596 .collect()
1597 }
1598 Some(_min) => {
1599 Vec::new()
1601 }
1602 _ => {
1605 return Err(StorageError::InvalidUsage(
1606 "collection closed, cannot determine a read timestamp based on the upper"
1607 .to_string(),
1608 ));
1609 }
1610 };
1611
1612 Ok(res)
1613 }
1614
1615 fn snapshot_cursor(
1616 &self,
1617 id: GlobalId,
1618 as_of: Self::Timestamp,
1619 ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
1620 where
1621 Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
1622 {
1623 let metadata = match self.collection_metadata(id) {
1624 Ok(metadata) => metadata.clone(),
1625 Err(e) => return async { Err(e.into()) }.boxed(),
1626 };
1627 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1628 assert_eq!(txns_id, self.txns_read.txns_id());
1631 self.txns_read.clone()
1632 });
1633 let persist = Arc::clone(&self.persist);
1634
1635 async move {
1637 let mut handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1638 let cursor = match txns_read {
1639 None => {
1640 let cursor = handle
1641 .snapshot_cursor(Antichain::from_elem(as_of), |_| true)
1642 .await
1643 .map_err(|_| StorageError::ReadBeforeSince(id))?;
1644 SnapshotCursor {
1645 _read_handle: handle,
1646 cursor,
1647 }
1648 }
1649 Some(txns_read) => {
1650 txns_read.update_gt(as_of.clone()).await;
1651 let data_snapshot = txns_read
1652 .data_snapshot(metadata.data_shard, as_of.clone())
1653 .await;
1654 let cursor = data_snapshot
1655 .snapshot_cursor(&mut handle, |_| true)
1656 .await
1657 .map_err(|_| StorageError::ReadBeforeSince(id))?;
1658 SnapshotCursor {
1659 _read_handle: handle,
1660 cursor,
1661 }
1662 }
1663 };
1664
1665 Ok(cursor)
1666 }
1667 .boxed()
1668 }
1669
1670 fn snapshot_and_stream(
1671 &self,
1672 id: GlobalId,
1673 as_of: Self::Timestamp,
1674 ) -> BoxFuture<
1675 'static,
1676 Result<
1677 BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
1678 StorageError<Self::Timestamp>,
1679 >,
1680 >
1681 where
1682 Self::Timestamp: TimelyTimestamp + Lattice + Codec64 + 'static,
1683 {
1684 self.snapshot_and_stream(id, as_of, &self.txns_read)
1685 }
1686
1687 fn create_update_builder(
1688 &self,
1689 id: GlobalId,
1690 ) -> BoxFuture<
1691 'static,
1692 Result<
1693 TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
1694 StorageError<Self::Timestamp>,
1695 >,
1696 > {
1697 let metadata = match self.collection_metadata(id) {
1698 Ok(m) => m,
1699 Err(e) => return Box::pin(async move { Err(e.into()) }),
1700 };
1701 let persist = Arc::clone(&self.persist);
1702
1703 async move {
1704 let persist_client = persist
1705 .open(metadata.persist_location.clone())
1706 .await
1707 .expect("invalid persist usage");
1708 let write_handle = persist_client
1709 .open_writer::<SourceData, (), Self::Timestamp, StorageDiff>(
1710 metadata.data_shard,
1711 Arc::new(metadata.relation_desc.clone()),
1712 Arc::new(UnitSchema),
1713 Diagnostics {
1714 shard_name: id.to_string(),
1715 handle_purpose: format!("create write batch {}", id),
1716 },
1717 )
1718 .await
1719 .expect("invalid persist usage");
1720 let builder = TimestamplessUpdateBuilder::new(&write_handle);
1721
1722 Ok(builder)
1723 }
1724 .boxed()
1725 }
1726
1727 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
1728 let collections = self.collections.lock().expect("lock poisoned");
1729
1730 if collections.contains_key(&id) {
1731 Ok(())
1732 } else {
1733 Err(StorageError::IdentifierMissing(id))
1734 }
1735 }
1736
1737 async fn prepare_state(
1738 &self,
1739 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
1740 ids_to_add: BTreeSet<GlobalId>,
1741 ids_to_drop: BTreeSet<GlobalId>,
1742 ids_to_register: BTreeMap<GlobalId, ShardId>,
1743 ) -> Result<(), StorageError<T>> {
1744 txn.insert_collection_metadata(
1745 ids_to_add
1746 .into_iter()
1747 .map(|id| (id, ShardId::new()))
1748 .collect(),
1749 )?;
1750 txn.insert_collection_metadata(ids_to_register)?;
1751
1752 let dropped_mappings = txn.delete_collection_metadata(ids_to_drop);
1754
1755 let dropped_shards = dropped_mappings
1756 .into_iter()
1757 .map(|(_id, shard)| shard)
1758 .collect();
1759
1760 txn.insert_unfinalized_shards(dropped_shards)?;
1761
1762 let finalized_shards = self.finalized_shards.lock().iter().copied().collect();
1765 txn.mark_shards_as_finalized(finalized_shards);
1766
1767 Ok(())
1768 }
1769
1770 #[instrument(level = "debug")]
1773 async fn create_collections_for_bootstrap(
1774 &self,
1775 storage_metadata: &StorageMetadata,
1776 register_ts: Option<Self::Timestamp>,
1777 mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
1778 migrated_storage_collections: &BTreeSet<GlobalId>,
1779 ) -> Result<(), StorageError<Self::Timestamp>> {
1780 let is_in_txns = |id, metadata: &CollectionMetadata| {
1781 metadata.txns_shard.is_some()
1782 && !(self.read_only && migrated_storage_collections.contains(&id))
1783 };
1784
1785 collections.sort_by_key(|(id, _)| *id);
1790 collections.dedup();
1791 for pos in 1..collections.len() {
1792 if collections[pos - 1].0 == collections[pos].0 {
1793 return Err(StorageError::CollectionIdReused(collections[pos].0));
1794 }
1795 }
1796
1797 {
1798 let self_collections = self.collections.lock().expect("lock poisoned");
1804 for (id, description) in collections.iter() {
1805 if let Some(existing_collection) = self_collections.get(id) {
1806 if &existing_collection.description != description {
1807 return Err(StorageError::CollectionIdReused(*id));
1808 }
1809 }
1810 }
1811 }
1812
1813 let enriched_with_metadata = collections
1816 .into_iter()
1817 .map(|(id, description)| {
1818 let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
1819
1820 let txns_shard = description
1824 .data_source
1825 .in_txns()
1826 .then(|| *self.txns_read.txns_id());
1827
1828 let metadata = CollectionMetadata {
1829 persist_location: self.persist_location.clone(),
1830 data_shard,
1831 relation_desc: description.desc.clone(),
1832 txns_shard,
1833 };
1834
1835 Ok((id, description, metadata))
1836 })
1837 .collect_vec();
1838
1839 let persist_client = self
1841 .persist
1842 .open(self.persist_location.clone())
1843 .await
1844 .unwrap();
1845 let persist_client = &persist_client;
1846 use futures::stream::{StreamExt, TryStreamExt};
1849 let this = &*self;
1850 let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
1851 .map(|data: Result<_, StorageError<Self::Timestamp>>| {
1852 let register_ts = register_ts.clone();
1853 async move {
1854 let (id, description, metadata) = data?;
1855
1856 debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
1861
1862 let since = if description.primary.is_some() {
1866 None
1867 } else {
1868 description.since.as_ref()
1869 };
1870
1871 let (write, mut since_handle) = this
1872 .open_data_handles(
1873 &id,
1874 metadata.data_shard,
1875 since,
1876 metadata.relation_desc.clone(),
1877 persist_client,
1878 )
1879 .await;
1880
1881 match description.data_source {
1890 DataSource::Introspection(_)
1891 | DataSource::IngestionExport { .. }
1892 | DataSource::Webhook
1893 | DataSource::Ingestion(_)
1894 | DataSource::Progress
1895 | DataSource::Other => {}
1896 DataSource::Sink { .. } => {}
1897 DataSource::Table => {
1898 let register_ts = register_ts.expect(
1899 "caller should have provided a register_ts when creating a table",
1900 );
1901 if since_handle.since().elements() == &[T::minimum()]
1902 && !migrated_storage_collections.contains(&id)
1903 {
1904 debug!("advancing {} to initial since of {:?}", id, register_ts);
1905 let token = since_handle.opaque();
1906 let _ = since_handle
1907 .compare_and_downgrade_since(
1908 &token,
1909 (&token, &Antichain::from_elem(register_ts.clone())),
1910 )
1911 .await;
1912 }
1913 }
1914 }
1915
1916 Ok::<_, StorageError<Self::Timestamp>>((
1917 id,
1918 description,
1919 write,
1920 since_handle,
1921 metadata,
1922 ))
1923 }
1924 })
1925 .buffer_unordered(50)
1927 .try_collect()
1941 .await?;
1942
1943 #[derive(Ord, PartialOrd, Eq, PartialEq)]
1945 enum DependencyOrder {
1946 Table(Reverse<GlobalId>),
1948 Collection(GlobalId),
1950 Sink(GlobalId),
1952 }
1953 to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
1954 DataSource::Table => DependencyOrder::Table(Reverse(*id)),
1955 DataSource::Sink { .. } => DependencyOrder::Sink(*id),
1956 _ => DependencyOrder::Collection(*id),
1957 });
1958
1959 let mut self_collections = self.collections.lock().expect("lock poisoned");
1962
1963 for (id, description, write_handle, since_handle, metadata) in to_register {
1964 let write_frontier = write_handle.upper();
1965 let data_shard_since = since_handle.since().clone();
1966
1967 let storage_dependencies =
1969 self.determine_collection_dependencies(&*self_collections, id, &description)?;
1970
1971 let initial_since = match storage_dependencies
1973 .iter()
1974 .at_most_one()
1975 .expect("should have at most one dependency")
1976 {
1977 Some(dep) => {
1978 let dependency_collection = self_collections
1979 .get(dep)
1980 .ok_or(StorageError::IdentifierMissing(*dep))?;
1981 let dependency_since = dependency_collection.implied_capability.clone();
1982
1983 if PartialOrder::less_than(&data_shard_since, &dependency_since) {
1994 mz_ore::soft_assert_or_log!(
2013 write_frontier.elements() == &[T::minimum()]
2014 || write_frontier.is_empty()
2015 || PartialOrder::less_than(&dependency_since, write_frontier),
2016 "dependency ({dep}) since has advanced past dependent ({id}) upper \n
2017 dependent ({id}): since {:?}, upper {:?} \n
2018 dependency ({dep}): since {:?}",
2019 data_shard_since,
2020 write_frontier,
2021 dependency_since
2022 );
2023
2024 dependency_since
2025 } else {
2026 data_shard_since
2027 }
2028 }
2029 None => data_shard_since,
2030 };
2031
2032 let mut collection_state = CollectionState::new(
2033 description,
2034 initial_since,
2035 write_frontier.clone(),
2036 storage_dependencies,
2037 metadata.clone(),
2038 );
2039
2040 match &collection_state.description.data_source {
2042 DataSource::Introspection(_) => {
2043 self_collections.insert(id, collection_state);
2044 }
2045 DataSource::Webhook => {
2046 self_collections.insert(id, collection_state);
2047 }
2048 DataSource::IngestionExport {
2049 ingestion_id,
2050 details,
2051 data_config,
2052 } => {
2053 let source_collection = self_collections
2055 .get_mut(ingestion_id)
2056 .expect("known to exist");
2057 match &mut source_collection.description {
2058 CollectionDescription {
2059 data_source: DataSource::Ingestion(ingestion_desc),
2060 ..
2061 } => ingestion_desc.source_exports.insert(
2062 id,
2063 SourceExport {
2064 storage_metadata: (),
2065 details: details.clone(),
2066 data_config: data_config.clone(),
2067 },
2068 ),
2069 _ => unreachable!(
2070 "SourceExport must only refer to primary sources that already exist"
2071 ),
2072 };
2073
2074 self_collections.insert(id, collection_state);
2075 }
2076 DataSource::Table => {
2077 if is_in_txns(id, &metadata)
2080 && PartialOrder::less_than(
2081 &collection_state.write_frontier,
2082 &self.initial_txn_upper,
2083 )
2084 {
2085 collection_state
2091 .write_frontier
2092 .clone_from(&self.initial_txn_upper);
2093 }
2094 self_collections.insert(id, collection_state);
2095 }
2096 DataSource::Progress | DataSource::Other => {
2097 self_collections.insert(id, collection_state);
2098 }
2099 DataSource::Ingestion(_) => {
2100 self_collections.insert(id, collection_state);
2101 }
2102 DataSource::Sink { .. } => {
2103 self_collections.insert(id, collection_state);
2104 }
2105 }
2106
2107 self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle);
2108
2109 self.install_collection_dependency_read_holds_inner(&mut *self_collections, id)?;
2111 }
2112
2113 drop(self_collections);
2114
2115 self.synchronize_finalized_shards(storage_metadata);
2116
2117 Ok(())
2118 }
2119
2120 async fn alter_ingestion_source_desc(
2121 &self,
2122 ingestion_id: GlobalId,
2123 source_desc: SourceDesc,
2124 ) -> Result<(), StorageError<Self::Timestamp>> {
2125 let mut self_collections = self.collections.lock().expect("lock poisoned");
2129 let collection = self_collections
2130 .get_mut(&ingestion_id)
2131 .ok_or(StorageError::IdentifierMissing(ingestion_id))?;
2132
2133 let curr_ingestion = match &mut collection.description.data_source {
2134 DataSource::Ingestion(active_ingestion) => active_ingestion,
2135 _ => unreachable!("verified collection refers to ingestion"),
2136 };
2137
2138 curr_ingestion.desc = source_desc;
2139 debug!("altered {ingestion_id}'s SourceDesc");
2140
2141 Ok(())
2142 }
2143
2144 async fn alter_ingestion_export_data_configs(
2145 &self,
2146 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
2147 ) -> Result<(), StorageError<Self::Timestamp>> {
2148 let mut self_collections = self.collections.lock().expect("lock poisoned");
2149
2150 for (source_export_id, new_data_config) in source_exports {
2151 let source_export_collection = self_collections
2154 .get_mut(&source_export_id)
2155 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2156 let ingestion_id = match &mut source_export_collection.description.data_source {
2157 DataSource::IngestionExport {
2158 ingestion_id,
2159 details: _,
2160 data_config,
2161 } => {
2162 *data_config = new_data_config.clone();
2163 *ingestion_id
2164 }
2165 o => {
2166 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2167 Err(StorageError::IdentifierInvalid(source_export_id))?
2168 }
2169 };
2170 let ingestion_collection = self_collections
2173 .get_mut(&ingestion_id)
2174 .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
2175
2176 match &mut ingestion_collection.description.data_source {
2177 DataSource::Ingestion(ingestion_desc) => {
2178 let source_export = ingestion_desc
2179 .source_exports
2180 .get_mut(&source_export_id)
2181 .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2182
2183 if source_export.data_config != new_data_config {
2184 tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
2185 source_export.data_config = new_data_config;
2186 } else {
2187 tracing::warn!(
2188 "alter_ingestion_export_data_configs called on \
2189 export {source_export_id} of {ingestion_id} but \
2190 the data config was the same"
2191 );
2192 }
2193 }
2194 o => {
2195 tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2196 Err(StorageError::IdentifierInvalid(ingestion_id))?;
2197 }
2198 }
2199 }
2200
2201 Ok(())
2202 }
2203
2204 async fn alter_ingestion_connections(
2205 &self,
2206 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
2207 ) -> Result<(), StorageError<Self::Timestamp>> {
2208 let mut self_collections = self.collections.lock().expect("lock poisoned");
2209
2210 for (id, conn) in source_connections {
2211 let collection = self_collections
2212 .get_mut(&id)
2213 .ok_or_else(|| StorageError::IdentifierMissing(id))?;
2214
2215 match &mut collection.description.data_source {
2216 DataSource::Ingestion(ingestion) => {
2217 if ingestion.desc.connection != conn {
2220 info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
2221 ingestion.desc.connection = conn;
2222 } else {
2223 warn!(
2224 "update_source_connection called on {id} but the \
2225 connection was the same"
2226 );
2227 }
2228 }
2229 o => {
2230 warn!("update_source_connection called on {:?}", o);
2231 Err(StorageError::IdentifierInvalid(id))?;
2232 }
2233 }
2234 }
2235
2236 Ok(())
2237 }
2238
2239 async fn alter_table_desc(
2240 &self,
2241 existing_collection: GlobalId,
2242 new_collection: GlobalId,
2243 new_desc: RelationDesc,
2244 expected_version: RelationVersion,
2245 ) -> Result<(), StorageError<Self::Timestamp>> {
2246 let data_shard = {
2247 let self_collections = self.collections.lock().expect("lock poisoned");
2248 let existing = self_collections
2249 .get(&existing_collection)
2250 .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;
2251
2252 if existing.description.data_source != DataSource::Table {
2254 return Err(StorageError::IdentifierInvalid(existing_collection));
2255 }
2256
2257 existing.collection_metadata.data_shard
2258 };
2259
2260 let persist_client = self
2261 .persist
2262 .open(self.persist_location.clone())
2263 .await
2264 .unwrap();
2265
2266 let diagnostics = Diagnostics {
2268 shard_name: existing_collection.to_string(),
2269 handle_purpose: "alter_table_desc".to_string(),
2270 };
2271 let expected_schema = expected_version.into();
2273 let schema_result = persist_client
2274 .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
2275 data_shard,
2276 expected_schema,
2277 &new_desc,
2278 &UnitSchema,
2279 diagnostics,
2280 )
2281 .await
2282 .map_err(|e| StorageError::InvalidUsage(e.to_string()))?;
2283 tracing::info!(
2284 ?existing_collection,
2285 ?new_collection,
2286 ?new_desc,
2287 "evolved schema"
2288 );
2289
2290 match schema_result {
2291 CaESchema::Ok(id) => id,
2292 CaESchema::ExpectedMismatch {
2294 schema_id,
2295 key,
2296 val,
2297 } => {
2298 mz_ore::soft_panic_or_log!(
2299 "schema expectation mismatch {schema_id:?}, {key:?}, {val:?}"
2300 );
2301 return Err(StorageError::Generic(anyhow::anyhow!(
2302 "schema expected mismatch, {existing_collection:?}",
2303 )));
2304 }
2305 CaESchema::Incompatible => {
2306 mz_ore::soft_panic_or_log!(
2307 "incompatible schema! {existing_collection} {new_desc:?}"
2308 );
2309 return Err(StorageError::Generic(anyhow::anyhow!(
2310 "schema incompatible, {existing_collection:?}"
2311 )));
2312 }
2313 };
2314
2315 let (write_handle, since_handle) = self
2317 .open_data_handles(
2318 &new_collection,
2319 data_shard,
2320 None,
2321 new_desc.clone(),
2322 &persist_client,
2323 )
2324 .await;
2325
2326 {
2332 let mut self_collections = self.collections.lock().expect("lock poisoned");
2333
2334 let existing = self_collections
2336 .get_mut(&existing_collection)
2337 .expect("existing collection missing");
2338
2339 assert_eq!(existing.description.data_source, DataSource::Table);
2341 assert_none!(existing.description.primary);
2342
2343 existing.description.primary = Some(new_collection);
2345 existing.storage_dependencies.push(new_collection);
2346
2347 let implied_capability = existing.read_capabilities.frontier().to_owned();
2351 let write_frontier = existing.write_frontier.clone();
2352
2353 let mut changes = ChangeBatch::new();
2360 changes.extend(implied_capability.iter().map(|t| (t.clone(), 1)));
2361
2362 let collection_desc = CollectionDescription::for_table(new_desc.clone());
2364 let collection_meta = CollectionMetadata {
2365 persist_location: self.persist_location.clone(),
2366 relation_desc: collection_desc.desc.clone(),
2367 data_shard,
2368 txns_shard: Some(self.txns_read.txns_id().clone()),
2369 };
2370 let collection_state = CollectionState::new(
2371 collection_desc,
2372 implied_capability,
2373 write_frontier,
2374 Vec::new(),
2375 collection_meta,
2376 );
2377
2378 self_collections.insert(new_collection, collection_state);
2380
2381 let mut updates = BTreeMap::from([(new_collection, changes)]);
2382 StorageCollectionsImpl::update_read_capabilities_inner(
2383 &self.cmd_tx,
2384 &mut *self_collections,
2385 &mut updates,
2386 );
2387 };
2388
2389 self.register_handles(new_collection, true, since_handle, write_handle);
2391
2392 info!(%existing_collection, %new_collection, ?new_desc, "altered table");
2393
2394 Ok(())
2395 }
2396
2397 fn drop_collections_unvalidated(
2398 &self,
2399 storage_metadata: &StorageMetadata,
2400 identifiers: Vec<GlobalId>,
2401 ) {
2402 debug!(?identifiers, "drop_collections_unvalidated");
2403
2404 let mut self_collections = self.collections.lock().expect("lock poisoned");
2405
2406 for id in identifiers.iter() {
2407 let metadata = storage_metadata.get_collection_shard::<T>(*id);
2408 mz_ore::soft_assert_or_log!(
2409 matches!(metadata, Err(StorageError::IdentifierMissing(_))),
2410 "dropping {id}, but drop was not synchronized with storage \
2411 controller via `synchronize_collections`"
2412 );
2413
2414 let dropped_data_source = match self_collections.get(id) {
2415 Some(col) => col.description.data_source.clone(),
2416 None => continue,
2417 };
2418
2419 if let DataSource::IngestionExport { ingestion_id, .. } = dropped_data_source {
2422 let ingestion = match self_collections.get_mut(&ingestion_id) {
2424 Some(ingestion) => ingestion,
2425 None => {
2427 tracing::error!(
2428 "primary source {ingestion_id} seemingly dropped before subsource {id}",
2429 );
2430 continue;
2431 }
2432 };
2433
2434 match &mut ingestion.description {
2435 CollectionDescription {
2436 data_source: DataSource::Ingestion(ingestion_desc),
2437 ..
2438 } => {
2439 let removed = ingestion_desc.source_exports.remove(id);
2440 mz_ore::soft_assert_or_log!(
2441 removed.is_some(),
2442 "dropped subsource {id} already removed from source exports"
2443 );
2444 }
2445 _ => unreachable!(
2446 "SourceExport must only refer to primary sources that already exist"
2447 ),
2448 };
2449 }
2450 }
2451
2452 let mut finalized_policies = Vec::new();
2460
2461 for id in identifiers {
2462 if self_collections.contains_key(&id) {
2464 finalized_policies.push((id, ReadPolicy::ValidFrom(Antichain::new())));
2465 }
2466 }
2467 self.set_read_policies_inner(&mut self_collections, finalized_policies);
2468
2469 drop(self_collections);
2470
2471 self.synchronize_finalized_shards(storage_metadata);
2472 }
2473
2474 fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
2475 let mut collections = self.collections.lock().expect("lock poisoned");
2476
2477 if tracing::enabled!(tracing::Level::TRACE) {
2478 let user_capabilities = collections
2479 .iter_mut()
2480 .filter(|(id, _c)| id.is_user())
2481 .map(|(id, c)| {
2482 let updates = c.read_capabilities.updates().cloned().collect_vec();
2483 (*id, c.implied_capability.clone(), updates)
2484 })
2485 .collect_vec();
2486
2487 trace!(?policies, ?user_capabilities, "set_read_policies");
2488 }
2489
2490 self.set_read_policies_inner(&mut collections, policies);
2491
2492 if tracing::enabled!(tracing::Level::TRACE) {
2493 let user_capabilities = collections
2494 .iter_mut()
2495 .filter(|(id, _c)| id.is_user())
2496 .map(|(id, c)| {
2497 let updates = c.read_capabilities.updates().cloned().collect_vec();
2498 (*id, c.implied_capability.clone(), updates)
2499 })
2500 .collect_vec();
2501
2502 trace!(?user_capabilities, "after! set_read_policies");
2503 }
2504 }
2505
2506 fn acquire_read_holds(
2507 &self,
2508 desired_holds: Vec<GlobalId>,
2509 ) -> Result<Vec<ReadHold<Self::Timestamp>>, CollectionMissing> {
2510 if desired_holds.is_empty() {
2511 return Ok(vec![]);
2512 }
2513
2514 let mut collections = self.collections.lock().expect("lock poisoned");
2515
2516 let mut advanced_holds = Vec::new();
2517 for id in desired_holds.iter() {
2528 let collection = collections.get(id).ok_or(CollectionMissing(*id))?;
2529 let since = collection.read_capabilities.frontier().to_owned();
2530 advanced_holds.push((*id, since));
2531 }
2532
2533 let mut updates = advanced_holds
2534 .iter()
2535 .map(|(id, hold)| {
2536 let mut changes = ChangeBatch::new();
2537 changes.extend(hold.iter().map(|time| (time.clone(), 1)));
2538 (*id, changes)
2539 })
2540 .collect::<BTreeMap<_, _>>();
2541
2542 StorageCollectionsImpl::update_read_capabilities_inner(
2543 &self.cmd_tx,
2544 &mut collections,
2545 &mut updates,
2546 );
2547
2548 let acquired_holds = advanced_holds
2549 .into_iter()
2550 .map(|(id, since)| ReadHold::with_channel(id, since, self.holds_tx.clone()))
2551 .collect_vec();
2552
2553 trace!(?desired_holds, ?acquired_holds, "acquire_read_holds");
2554
2555 Ok(acquired_holds)
2556 }
2557
2558 fn determine_time_dependence(
2560 &self,
2561 id: GlobalId,
2562 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
2563 use TimeDependenceError::CollectionMissing;
2564 let collections = self.collections.lock().expect("lock poisoned");
2565 let mut collection = Some(collections.get(&id).ok_or(CollectionMissing(id))?);
2566
2567 let mut result = None;
2568
2569 while let Some(c) = collection.take() {
2570 use DataSource::*;
2571 if let Some(timeline) = &c.description.timeline {
2572 if *timeline != Timeline::EpochMilliseconds {
2574 break;
2575 }
2576 }
2577 match &c.description.data_source {
2578 Ingestion(ingestion) => {
2579 use GenericSourceConnection::*;
2580 match ingestion.desc.connection {
2581 Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => {
2584 result = Some(TimeDependence::default())
2585 }
2586 LoadGenerator(_) => {}
2588 }
2589 }
2590 IngestionExport { ingestion_id, .. } => {
2591 let c = collections
2592 .get(ingestion_id)
2593 .ok_or(CollectionMissing(*ingestion_id))?;
2594 collection = Some(c);
2595 }
2596 Introspection(_) | Progress | Table { .. } | Webhook { .. } => {
2598 result = Some(TimeDependence::default())
2599 }
2600 Other => {}
2602 Sink { .. } => {}
2603 };
2604 }
2605 Ok(result)
2606 }
2607
2608 fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2609 let Self {
2611 envd_epoch,
2612 read_only,
2613 finalizable_shards,
2614 finalized_shards,
2615 collections,
2616 txns_read: _,
2617 config,
2618 initial_txn_upper,
2619 persist_location,
2620 persist: _,
2621 cmd_tx: _,
2622 holds_tx: _,
2623 _background_task: _,
2624 _finalize_shards_task: _,
2625 } = self;
2626
2627 let finalizable_shards: Vec<_> = finalizable_shards
2628 .lock()
2629 .iter()
2630 .map(ToString::to_string)
2631 .collect();
2632 let finalized_shards: Vec<_> = finalized_shards
2633 .lock()
2634 .iter()
2635 .map(ToString::to_string)
2636 .collect();
2637 let collections: BTreeMap<_, _> = collections
2638 .lock()
2639 .expect("poisoned")
2640 .iter()
2641 .map(|(id, c)| (id.to_string(), format!("{c:?}")))
2642 .collect();
2643 let config = format!("{:?}", config.lock().expect("poisoned"));
2644
2645 Ok(serde_json::json!({
2646 "envd_epoch": envd_epoch,
2647 "read_only": read_only,
2648 "finalizable_shards": finalizable_shards,
2649 "finalized_shards": finalized_shards,
2650 "collections": collections,
2651 "config": config,
2652 "initial_txn_upper": initial_txn_upper,
2653 "persist_location": format!("{persist_location:?}"),
2654 }))
2655 }
2656}
2657
2658#[derive(Debug)]
2665enum SinceHandleWrapper<T>
2666where
2667 T: TimelyTimestamp + Lattice + Codec64,
2668{
2669 Critical(SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch>),
2670 Leased(ReadHandle<SourceData, (), T, StorageDiff>),
2671}
2672
2673impl<T> SinceHandleWrapper<T>
2674where
2675 T: TimelyTimestamp + Lattice + Codec64 + TotalOrder + Sync,
2676{
2677 pub fn since(&self) -> &Antichain<T> {
2678 match self {
2679 Self::Critical(handle) => handle.since(),
2680 Self::Leased(handle) => handle.since(),
2681 }
2682 }
2683
2684 pub fn opaque(&self) -> PersistEpoch {
2685 match self {
2686 Self::Critical(handle) => handle.opaque().clone(),
2687 Self::Leased(_handle) => {
2688 PersistEpoch(None)
2693 }
2694 }
2695 }
2696
2697 pub async fn compare_and_downgrade_since(
2698 &mut self,
2699 expected: &PersistEpoch,
2700 new: (&PersistEpoch, &Antichain<T>),
2701 ) -> Result<Antichain<T>, PersistEpoch> {
2702 match self {
2703 Self::Critical(handle) => handle.compare_and_downgrade_since(expected, new).await,
2704 Self::Leased(handle) => {
2705 let (opaque, since) = new;
2706 assert_none!(opaque.0);
2707
2708 handle.downgrade_since(since).await;
2709
2710 Ok(since.clone())
2711 }
2712 }
2713 }
2714
2715 pub async fn maybe_compare_and_downgrade_since(
2716 &mut self,
2717 expected: &PersistEpoch,
2718 new: (&PersistEpoch, &Antichain<T>),
2719 ) -> Option<Result<Antichain<T>, PersistEpoch>> {
2720 match self {
2721 Self::Critical(handle) => {
2722 handle
2723 .maybe_compare_and_downgrade_since(expected, new)
2724 .await
2725 }
2726 Self::Leased(handle) => {
2727 let (opaque, since) = new;
2728 assert_none!(opaque.0);
2729
2730 handle.maybe_downgrade_since(since).await;
2731
2732 Some(Ok(since.clone()))
2733 }
2734 }
2735 }
2736
2737 pub fn snapshot_stats(
2738 &self,
2739 id: GlobalId,
2740 as_of: Option<Antichain<T>>,
2741 ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2742 match self {
2743 Self::Critical(handle) => {
2744 let res = handle
2745 .snapshot_stats(as_of)
2746 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2747 Box::pin(res)
2748 }
2749 Self::Leased(handle) => {
2750 let res = handle
2751 .snapshot_stats(as_of)
2752 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2753 Box::pin(res)
2754 }
2755 }
2756 }
2757
2758 pub fn snapshot_stats_from_txn(
2759 &self,
2760 id: GlobalId,
2761 data_snapshot: DataSnapshot<T>,
2762 ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2763 match self {
2764 Self::Critical(handle) => Box::pin(
2765 data_snapshot
2766 .snapshot_stats_from_critical(handle)
2767 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2768 ),
2769 Self::Leased(handle) => Box::pin(
2770 data_snapshot
2771 .snapshot_stats_from_leased(handle)
2772 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2773 ),
2774 }
2775 }
2776}
2777
2778#[derive(Debug, Clone)]
2780struct CollectionState<T> {
2781 pub description: CollectionDescription<T>,
2783
2784 pub read_capabilities: MutableAntichain<T>,
2790
2791 pub implied_capability: Antichain<T>,
2795
2796 pub read_policy: ReadPolicy<T>,
2798
2799 pub storage_dependencies: Vec<GlobalId>,
2801
2802 pub write_frontier: Antichain<T>,
2804
2805 pub collection_metadata: CollectionMetadata,
2806}
2807
2808impl<T: TimelyTimestamp> CollectionState<T> {
2809 pub fn new(
2812 description: CollectionDescription<T>,
2813 since: Antichain<T>,
2814 write_frontier: Antichain<T>,
2815 storage_dependencies: Vec<GlobalId>,
2816 metadata: CollectionMetadata,
2817 ) -> Self {
2818 let mut read_capabilities = MutableAntichain::new();
2819 read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2820 Self {
2821 description,
2822 read_capabilities,
2823 implied_capability: since.clone(),
2824 read_policy: ReadPolicy::NoPolicy {
2825 initial_since: since,
2826 },
2827 storage_dependencies,
2828 write_frontier,
2829 collection_metadata: metadata,
2830 }
2831 }
2832
2833 pub fn is_dropped(&self) -> bool {
2835 self.read_capabilities.is_empty()
2836 }
2837}
2838
2839#[derive(Debug)]
2845struct BackgroundTask<T: TimelyTimestamp + Lattice + Codec64> {
2846 config: Arc<Mutex<StorageConfiguration>>,
2847 cmds_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
2848 cmds_rx: mpsc::UnboundedReceiver<BackgroundCmd<T>>,
2849 holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch<T>)>,
2850 finalizable_shards: Arc<ShardIdSet>,
2851 collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
2852 shard_by_id: BTreeMap<GlobalId, ShardId>,
2855 since_handles: BTreeMap<GlobalId, SinceHandleWrapper<T>>,
2856 txns_handle: Option<WriteHandle<SourceData, (), T, StorageDiff>>,
2857 txns_shards: BTreeSet<GlobalId>,
2858}
2859
2860#[derive(Debug)]
2861enum BackgroundCmd<T: TimelyTimestamp + Lattice + Codec64> {
2862 Register {
2863 id: GlobalId,
2864 is_in_txns: bool,
2865 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
2866 since_handle: SinceHandleWrapper<T>,
2867 },
2868 DowngradeSince(Vec<(GlobalId, Antichain<T>)>),
2869 SnapshotStats(
2870 GlobalId,
2871 SnapshotStatsAsOf<T>,
2872 oneshot::Sender<SnapshotStatsRes<T>>,
2873 ),
2874}
2875
2876pub(crate) struct SnapshotStatsRes<T>(BoxFuture<'static, Result<SnapshotStats, StorageError<T>>>);
2878
2879impl<T> Debug for SnapshotStatsRes<T> {
2880 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2881 f.debug_struct("SnapshotStatsRes").finish_non_exhaustive()
2882 }
2883}
2884
2885impl<T> BackgroundTask<T>
2886where
2887 T: TimelyTimestamp
2888 + Lattice
2889 + Codec64
2890 + From<EpochMillis>
2891 + TimestampManipulation
2892 + Into<mz_repr::Timestamp>
2893 + Sync,
2894{
2895 async fn run(&mut self) {
2896 let mut upper_futures: FuturesUnordered<
2898 std::pin::Pin<
2899 Box<
2900 dyn Future<
2901 Output = (
2902 GlobalId,
2903 WriteHandle<SourceData, (), T, StorageDiff>,
2904 Antichain<T>,
2905 ),
2906 > + Send,
2907 >,
2908 >,
2909 > = FuturesUnordered::new();
2910
2911 let gen_upper_future =
2912 |id, mut handle: WriteHandle<_, _, _, _>, prev_upper: Antichain<T>| {
2913 let fut = async move {
2914 soft_assert_or_log!(
2915 !prev_upper.is_empty(),
2916 "cannot await progress when upper is already empty"
2917 );
2918 handle.wait_for_upper_past(&prev_upper).await;
2919 let new_upper = handle.shared_upper();
2920 (id, handle, new_upper)
2921 };
2922
2923 fut
2924 };
2925
2926 let mut txns_upper_future = match self.txns_handle.take() {
2927 Some(txns_handle) => {
2928 let upper = txns_handle.upper().clone();
2929 let txns_upper_future =
2930 gen_upper_future(GlobalId::Transient(1), txns_handle, upper);
2931 txns_upper_future.boxed()
2932 }
2933 None => async { std::future::pending().await }.boxed(),
2934 };
2935
2936 loop {
2937 tokio::select! {
2938 (id, handle, upper) = &mut txns_upper_future => {
2939 trace!("new upper from txns shard: {:?}", upper);
2940 let mut uppers = Vec::new();
2941 for id in self.txns_shards.iter() {
2942 uppers.push((*id, &upper));
2943 }
2944 self.update_write_frontiers(&uppers).await;
2945
2946 let fut = gen_upper_future(id, handle, upper);
2947 txns_upper_future = fut.boxed();
2948 }
2949 Some((id, handle, upper)) = upper_futures.next() => {
2950 if id.is_user() {
2951 trace!("new upper for collection {id}: {:?}", upper);
2952 }
2953 let current_shard = self.shard_by_id.get(&id);
2954 if let Some(shard_id) = current_shard {
2955 if shard_id == &handle.shard_id() {
2956 let uppers = &[(id, &upper)];
2959 self.update_write_frontiers(uppers).await;
2960 if !upper.is_empty() {
2961 let fut = gen_upper_future(id, handle, upper);
2962 upper_futures.push(fut.boxed());
2963 }
2964 } else {
2965 handle.expire().await;
2969 }
2970 }
2971 }
2972 cmd = self.cmds_rx.recv() => {
2973 let cmd = if let Some(cmd) = cmd {
2974 cmd
2975 } else {
2976 break;
2978 };
2979
2980 match cmd {
2981 BackgroundCmd::Register{ id, is_in_txns, write_handle, since_handle } => {
2982 debug!("registering handles for {}", id);
2983 let previous = self.shard_by_id.insert(id, write_handle.shard_id());
2984 if previous.is_some() {
2985 panic!("already registered a WriteHandle for collection {id}");
2986 }
2987
2988 let previous = self.since_handles.insert(id, since_handle);
2989 if previous.is_some() {
2990 panic!("already registered a SinceHandle for collection {id}");
2991 }
2992
2993 if is_in_txns {
2994 self.txns_shards.insert(id);
2995 } else {
2996 let upper = write_handle.upper().clone();
2997 if !upper.is_empty() {
2998 let fut = gen_upper_future(id, write_handle, upper);
2999 upper_futures.push(fut.boxed());
3000 }
3001 }
3002
3003 }
3004 BackgroundCmd::DowngradeSince(cmds) => {
3005 self.downgrade_sinces(cmds).await;
3006 }
3007 BackgroundCmd::SnapshotStats(id, as_of, tx) => {
3008 let res = match self.since_handles.get(&id) {
3014 Some(x) => {
3015 let fut: BoxFuture<
3016 'static,
3017 Result<SnapshotStats, StorageError<T>>,
3018 > = match as_of {
3019 SnapshotStatsAsOf::Direct(as_of) => {
3020 x.snapshot_stats(id, Some(as_of))
3021 }
3022 SnapshotStatsAsOf::Txns(data_snapshot) => {
3023 x.snapshot_stats_from_txn(id, data_snapshot)
3024 }
3025 };
3026 SnapshotStatsRes(fut)
3027 }
3028 None => SnapshotStatsRes(Box::pin(futures::future::ready(Err(
3029 StorageError::IdentifierMissing(id),
3030 )))),
3031 };
3032 let _ = tx.send(res);
3034 }
3035 }
3036 }
3037 Some(holds_changes) = self.holds_rx.recv() => {
3038 let mut batched_changes = BTreeMap::new();
3039 batched_changes.insert(holds_changes.0, holds_changes.1);
3040
3041 while let Ok(mut holds_changes) = self.holds_rx.try_recv() {
3042 let entry = batched_changes.entry(holds_changes.0);
3043 entry
3044 .and_modify(|existing| existing.extend(holds_changes.1.drain()))
3045 .or_insert_with(|| holds_changes.1);
3046 }
3047
3048 let mut collections = self.collections.lock().expect("lock poisoned");
3049
3050 let user_changes = batched_changes
3051 .iter()
3052 .filter(|(id, _c)| id.is_user())
3053 .map(|(id, c)| {
3054 (id.clone(), c.clone())
3055 })
3056 .collect_vec();
3057
3058 if !user_changes.is_empty() {
3059 trace!(?user_changes, "applying holds changes from channel");
3060 }
3061
3062 StorageCollectionsImpl::update_read_capabilities_inner(
3063 &self.cmds_tx,
3064 &mut collections,
3065 &mut batched_changes,
3066 );
3067 }
3068 }
3069 }
3070
3071 warn!("BackgroundTask shutting down");
3072 }
3073
3074 #[instrument(level = "debug")]
3075 async fn update_write_frontiers(&self, updates: &[(GlobalId, &Antichain<T>)]) {
3076 let mut read_capability_changes = BTreeMap::default();
3077
3078 let mut self_collections = self.collections.lock().expect("lock poisoned");
3079
3080 for (id, new_upper) in updates.iter() {
3081 let collection = if let Some(c) = self_collections.get_mut(id) {
3082 c
3083 } else {
3084 trace!(
3085 "Reference to absent collection {id}, due to concurrent removal of that collection"
3086 );
3087 continue;
3088 };
3089
3090 if PartialOrder::less_than(&collection.write_frontier, *new_upper) {
3091 collection.write_frontier.clone_from(new_upper);
3092 }
3093
3094 let mut new_read_capability = collection
3095 .read_policy
3096 .frontier(collection.write_frontier.borrow());
3097
3098 if id.is_user() {
3099 trace!(
3100 %id,
3101 implied_capability = ?collection.implied_capability,
3102 policy = ?collection.read_policy,
3103 write_frontier = ?collection.write_frontier,
3104 ?new_read_capability,
3105 "update_write_frontiers");
3106 }
3107
3108 if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
3109 let mut update = ChangeBatch::new();
3110 update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
3111 std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
3112 update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
3113
3114 if !update.is_empty() {
3115 read_capability_changes.insert(*id, update);
3116 }
3117 }
3118 }
3119
3120 if !read_capability_changes.is_empty() {
3121 StorageCollectionsImpl::update_read_capabilities_inner(
3122 &self.cmds_tx,
3123 &mut self_collections,
3124 &mut read_capability_changes,
3125 );
3126 }
3127 }
3128
3129 async fn downgrade_sinces(&mut self, cmds: Vec<(GlobalId, Antichain<T>)>) {
3130 for (id, new_since) in cmds {
3131 let since_handle = if let Some(c) = self.since_handles.get_mut(&id) {
3132 c
3133 } else {
3134 trace!("downgrade_sinces: reference to absent collection {id}");
3136 continue;
3137 };
3138
3139 if id.is_user() {
3140 trace!("downgrading since of {} to {:?}", id, new_since);
3141 }
3142
3143 let epoch = since_handle.opaque().clone();
3144 let result = if new_since.is_empty() {
3145 let res = Some(
3149 since_handle
3150 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3151 .await,
3152 );
3153
3154 info!(%id, "removing persist handles because the since advanced to []!");
3155
3156 let _since_handle = self.since_handles.remove(&id).expect("known to exist");
3157 let dropped_shard_id = if let Some(shard_id) = self.shard_by_id.remove(&id) {
3158 shard_id
3159 } else {
3160 panic!("missing GlobalId -> ShardId mapping for id {id}");
3161 };
3162
3163 self.txns_shards.remove(&id);
3168
3169 if !self
3170 .config
3171 .lock()
3172 .expect("lock poisoned")
3173 .parameters
3174 .finalize_shards
3175 {
3176 info!(
3177 "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3178 );
3179 return;
3180 }
3181
3182 info!(%id, %dropped_shard_id, "enqueing shard finalization due to dropped collection and dropped persist handle");
3183
3184 self.finalizable_shards.lock().insert(dropped_shard_id);
3185
3186 res
3187 } else {
3188 since_handle
3189 .maybe_compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3190 .await
3191 };
3192
3193 if let Some(Err(other_epoch)) = result {
3194 mz_ore::halt!("fenced by envd @ {other_epoch:?}. ours = {epoch:?}");
3195 }
3196 }
3197 }
3198}
3199
3200struct FinalizeShardsTaskConfig {
3201 envd_epoch: NonZeroI64,
3202 config: Arc<Mutex<StorageConfiguration>>,
3203 metrics: StorageCollectionsMetrics,
3204 finalizable_shards: Arc<ShardIdSet>,
3205 finalized_shards: Arc<ShardIdSet>,
3206 persist_location: PersistLocation,
3207 persist: Arc<PersistClientCache>,
3208 read_only: bool,
3209}
3210
3211async fn finalize_shards_task<T>(
3212 FinalizeShardsTaskConfig {
3213 envd_epoch,
3214 config,
3215 metrics,
3216 finalizable_shards,
3217 finalized_shards,
3218 persist_location,
3219 persist,
3220 read_only,
3221 }: FinalizeShardsTaskConfig,
3222) where
3223 T: TimelyTimestamp + TotalOrder + Lattice + Codec64 + Sync,
3224{
3225 if read_only {
3226 info!("disabling shard finalization in read only mode");
3227 return;
3228 }
3229
3230 let mut interval = tokio::time::interval(Duration::from_secs(5));
3231 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
3232 loop {
3233 interval.tick().await;
3234
3235 if !config
3236 .lock()
3237 .expect("lock poisoned")
3238 .parameters
3239 .finalize_shards
3240 {
3241 debug!(
3242 "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3243 );
3244 continue;
3245 }
3246
3247 let current_finalizable_shards = {
3248 finalizable_shards.lock().iter().cloned().collect_vec()
3251 };
3252
3253 if current_finalizable_shards.is_empty() {
3254 debug!("no shards to finalize");
3255 continue;
3256 }
3257
3258 debug!(?current_finalizable_shards, "attempting to finalize shards");
3259
3260 let persist_client = persist.open(persist_location.clone()).await.unwrap();
3262
3263 let metrics = &metrics;
3264 let finalizable_shards = &finalizable_shards;
3265 let finalized_shards = &finalized_shards;
3266 let persist_client = &persist_client;
3267 let diagnostics = &Diagnostics::from_purpose("finalizing shards");
3268
3269 let force_downgrade_since = STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION
3270 .get(config.lock().expect("lock poisoned").config_set());
3271
3272 let epoch = &PersistEpoch::from(envd_epoch);
3273
3274 futures::stream::iter(current_finalizable_shards.clone())
3275 .map(|shard_id| async move {
3276 let persist_client = persist_client.clone();
3277 let diagnostics = diagnostics.clone();
3278 let epoch = epoch.clone();
3279
3280 metrics.finalization_started.inc();
3281
3282 let is_finalized = persist_client
3283 .is_finalized::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
3284 .await
3285 .expect("invalid persist usage");
3286
3287 if is_finalized {
3288 debug!(%shard_id, "shard is already finalized!");
3289 Some(shard_id)
3290 } else {
3291 debug!(%shard_id, "finalizing shard");
3292 let finalize = || async move {
3293 let diagnostics = Diagnostics::from_purpose("finalizing shards");
3295
3296 let mut write_handle: WriteHandle<SourceData, (), T, StorageDiff> =
3299 persist_client
3300 .open_writer(
3301 shard_id,
3302 Arc::new(RelationDesc::empty()),
3303 Arc::new(UnitSchema),
3304 diagnostics,
3305 )
3306 .await
3307 .expect("invalid persist usage");
3308 write_handle.advance_upper(&Antichain::new()).await;
3309 write_handle.expire().await;
3310
3311 if force_downgrade_since {
3312 let mut since_handle: SinceHandle<
3313 SourceData,
3314 (),
3315 T,
3316 StorageDiff,
3317 PersistEpoch,
3318 > = persist_client
3319 .open_critical_since(
3320 shard_id,
3321 PersistClient::CONTROLLER_CRITICAL_SINCE,
3322 Diagnostics::from_purpose("finalizing shards"),
3323 )
3324 .await
3325 .expect("invalid persist usage");
3326 let handle_epoch = since_handle.opaque().clone();
3327 let our_epoch = epoch.clone();
3328 let epoch = if our_epoch.0 > handle_epoch.0 {
3329 handle_epoch
3332 } else {
3333 our_epoch
3338 };
3339 let new_since = Antichain::new();
3340 let downgrade = since_handle
3341 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3342 .await;
3343 if let Err(e) = downgrade {
3344 warn!("tried to finalize a shard with an advancing epoch: {e:?}");
3345 return Ok(());
3346 }
3347 }
3350
3351 persist_client
3352 .finalize_shard::<SourceData, (), T, StorageDiff>(
3353 shard_id,
3354 Diagnostics::from_purpose("finalizing shards"),
3355 )
3356 .await
3357 };
3358
3359 match finalize().await {
3360 Err(e) => {
3361 warn!("error during finalization of shard {shard_id}: {e:?}");
3364 None
3365 }
3366 Ok(()) => {
3367 debug!(%shard_id, "finalize success!");
3368 Some(shard_id)
3369 }
3370 }
3371 }
3372 })
3373 .buffer_unordered(10)
3378 .for_each(|shard_id| async move {
3382 match shard_id {
3383 None => metrics.finalization_failed.inc(),
3384 Some(shard_id) => {
3385 {
3392 let mut finalizable_shards = finalizable_shards.lock();
3393 let mut finalized_shards = finalized_shards.lock();
3394 finalizable_shards.remove(&shard_id);
3395 finalized_shards.insert(shard_id);
3396 }
3397
3398 metrics.finalization_succeeded.inc();
3399 }
3400 }
3401 })
3402 .await;
3403
3404 debug!("done finalizing shards");
3405 }
3406}
3407
3408#[derive(Debug)]
3409pub(crate) enum SnapshotStatsAsOf<T: TimelyTimestamp + Lattice + Codec64> {
3410 Direct(Antichain<T>),
3413 Txns(DataSnapshot<T>),
3416}
3417
3418#[cfg(test)]
3419mod tests {
3420 use std::str::FromStr;
3421 use std::sync::Arc;
3422
3423 use mz_build_info::DUMMY_BUILD_INFO;
3424 use mz_dyncfg::ConfigSet;
3425 use mz_ore::assert_err;
3426 use mz_ore::metrics::{MetricsRegistry, UIntGauge};
3427 use mz_ore::now::SYSTEM_TIME;
3428 use mz_ore::url::SensitiveUrl;
3429 use mz_persist_client::cache::PersistClientCache;
3430 use mz_persist_client::cfg::PersistConfig;
3431 use mz_persist_client::rpc::PubSubClientConnection;
3432 use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
3433 use mz_persist_types::codec_impls::UnitSchema;
3434 use mz_repr::{RelationDesc, Row};
3435 use mz_secrets::InMemorySecretsController;
3436
3437 use super::*;
3438
3439 #[mz_ore::test(tokio::test)]
3440 #[cfg_attr(miri, ignore)] async fn test_snapshot_stats(&self) {
3442 let persist_location = PersistLocation {
3443 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3444 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3445 };
3446 let persist_client = PersistClientCache::new(
3447 PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
3448 &MetricsRegistry::new(),
3449 |_, _| PubSubClientConnection::noop(),
3450 );
3451 let persist_client = Arc::new(persist_client);
3452
3453 let (cmds_tx, mut background_task) =
3454 BackgroundTask::new_for_test(persist_location.clone(), Arc::clone(&persist_client));
3455 let background_task =
3456 mz_ore::task::spawn(|| "storage_collections::background_task", async move {
3457 background_task.run().await
3458 });
3459
3460 let persist = persist_client.open(persist_location).await.unwrap();
3461
3462 let shard_id = ShardId::new();
3463 let since_handle = persist
3464 .open_critical_since(
3465 shard_id,
3466 PersistClient::CONTROLLER_CRITICAL_SINCE,
3467 Diagnostics::for_tests(),
3468 )
3469 .await
3470 .unwrap();
3471 let write_handle = persist
3472 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3473 shard_id,
3474 Arc::new(RelationDesc::empty()),
3475 Arc::new(UnitSchema),
3476 Diagnostics::for_tests(),
3477 )
3478 .await
3479 .unwrap();
3480
3481 cmds_tx
3482 .send(BackgroundCmd::Register {
3483 id: GlobalId::User(1),
3484 is_in_txns: false,
3485 since_handle: SinceHandleWrapper::Critical(since_handle),
3486 write_handle,
3487 })
3488 .unwrap();
3489
3490 let mut write_handle = persist
3491 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3492 shard_id,
3493 Arc::new(RelationDesc::empty()),
3494 Arc::new(UnitSchema),
3495 Diagnostics::for_tests(),
3496 )
3497 .await
3498 .unwrap();
3499
3500 let stats =
3502 snapshot_stats(&cmds_tx, GlobalId::User(2), Antichain::from_elem(0.into())).await;
3503 assert_err!(stats);
3504
3505 let stats_fut = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3507 assert_none!(stats_fut.now_or_never());
3508
3509 let stats_ts1_fut =
3511 snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3512
3513 let data = (
3515 (SourceData(Ok(Row::default())), ()),
3516 mz_repr::Timestamp::from(0),
3517 1i64,
3518 );
3519 let () = write_handle
3520 .compare_and_append(
3521 &[data],
3522 Antichain::from_elem(0.into()),
3523 Antichain::from_elem(1.into()),
3524 )
3525 .await
3526 .unwrap()
3527 .unwrap();
3528
3529 let stats = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(0.into()))
3531 .await
3532 .unwrap();
3533 assert_eq!(stats.num_updates, 1);
3534
3535 let data = (
3537 (SourceData(Ok(Row::default())), ()),
3538 mz_repr::Timestamp::from(1),
3539 1i64,
3540 );
3541 let () = write_handle
3542 .compare_and_append(
3543 &[data],
3544 Antichain::from_elem(1.into()),
3545 Antichain::from_elem(2.into()),
3546 )
3547 .await
3548 .unwrap()
3549 .unwrap();
3550
3551 let stats = stats_ts1_fut.await.unwrap();
3552 assert_eq!(stats.num_updates, 2);
3553
3554 drop(background_task);
3556 }
3557
3558 async fn snapshot_stats<T: TimelyTimestamp + Lattice + Codec64>(
3559 cmds_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
3560 id: GlobalId,
3561 as_of: Antichain<T>,
3562 ) -> Result<SnapshotStats, StorageError<T>> {
3563 let (tx, rx) = oneshot::channel();
3564 cmds_tx
3565 .send(BackgroundCmd::SnapshotStats(
3566 id,
3567 SnapshotStatsAsOf::Direct(as_of),
3568 tx,
3569 ))
3570 .unwrap();
3571 let res = rx.await.expect("BackgroundTask should be live").0;
3572
3573 res.await
3574 }
3575
3576 impl<T: TimelyTimestamp + Lattice + Codec64> BackgroundTask<T> {
3577 fn new_for_test(
3578 _persist_location: PersistLocation,
3579 _persist_client: Arc<PersistClientCache>,
3580 ) -> (mpsc::UnboundedSender<BackgroundCmd<T>>, Self) {
3581 let (cmds_tx, cmds_rx) = mpsc::unbounded_channel();
3582 let (_holds_tx, holds_rx) = mpsc::unbounded_channel();
3583 let connection_context =
3584 ConnectionContext::for_tests(Arc::new(InMemorySecretsController::new()));
3585
3586 let task = Self {
3587 config: Arc::new(Mutex::new(StorageConfiguration::new(
3588 connection_context,
3589 ConfigSet::default(),
3590 ))),
3591 cmds_tx: cmds_tx.clone(),
3592 cmds_rx,
3593 holds_rx,
3594 finalizable_shards: Arc::new(ShardIdSet::new(
3595 UIntGauge::new("finalizable_shards", "dummy gauge for tests").unwrap(),
3596 )),
3597 collections: Arc::new(Mutex::new(BTreeMap::new())),
3598 shard_by_id: BTreeMap::new(),
3599 since_handles: BTreeMap::new(),
3600 txns_handle: None,
3601 txns_shards: BTreeSet::new(),
3602 };
3603
3604 (cmds_tx, task)
3605 }
3606 }
3607}