1use std::cmp::Reverse;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::iter;
16use std::num::NonZeroI64;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::future::BoxFuture;
23use futures::stream::{BoxStream, FuturesUnordered};
24use futures::{Future, FutureExt, StreamExt};
25use itertools::Itertools;
26use mz_ore::collections::CollectionExt;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::now::NowFn;
29use mz_ore::task::AbortOnDropHandle;
30use mz_ore::{assert_none, instrument, soft_assert_or_log};
31use mz_persist_client::cache::PersistClientCache;
32use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
33use mz_persist_client::critical::{Opaque, SinceHandle};
34use mz_persist_client::read::{Cursor, ReadHandle};
35use mz_persist_client::schema::CaESchema;
36use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
37use mz_persist_client::write::WriteHandle;
38use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
39use mz_persist_types::codec_impls::UnitSchema;
40use mz_persist_types::txn::TxnsCodec;
41use mz_repr::{GlobalId, RelationDesc, RelationVersion, Row, Timestamp};
42use mz_storage_types::StorageDiff;
43use mz_storage_types::configuration::StorageConfiguration;
44use mz_storage_types::connections::ConnectionContext;
45use mz_storage_types::controller::{CollectionMetadata, StorageError, TxnsCodecRow};
46use mz_storage_types::dyncfgs::STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION;
47use mz_storage_types::errors::CollectionMissing;
48use mz_storage_types::parameters::StorageParameters;
49use mz_storage_types::read_holds::ReadHold;
50use mz_storage_types::read_policy::ReadPolicy;
51use mz_storage_types::sources::{GenericSourceConnection, SourceData, SourceEnvelope, Timeline};
52use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
53use mz_txn_wal::metrics::Metrics as TxnMetrics;
54use mz_txn_wal::txn_read::{DataSnapshot, TxnsRead};
55use mz_txn_wal::txns::TxnsHandle;
56use timely::PartialOrder;
57use timely::progress::frontier::MutableAntichain;
58use timely::progress::{Antichain, ChangeBatch};
59use tokio::sync::{mpsc, oneshot};
60use tokio::time::MissedTickBehavior;
61use tracing::{debug, info, trace, warn};
62
63use crate::client::TimestamplessUpdateBuilder;
64use crate::controller::{
65 CollectionDescription, DataSource, PersistEpoch, StorageMetadata, StorageTxn,
66};
67use crate::storage_collections::metrics::{ShardIdSet, StorageCollectionsMetrics};
68
69mod metrics;
70
71#[async_trait]
85pub trait StorageCollections: Debug + Sync {
86 async fn initialize_state(
93 &self,
94 txn: &mut (dyn StorageTxn + Send),
95 init_ids: BTreeSet<GlobalId>,
96 ) -> Result<(), StorageError>;
97
98 fn update_parameters(&self, config_params: StorageParameters);
100
101 fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing>;
103
104 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
110
111 fn collection_frontiers(&self, id: GlobalId) -> Result<CollectionFrontiers, CollectionMissing> {
113 let frontiers = self
114 .collections_frontiers(vec![id])?
115 .expect_element(|| "known to exist");
116
117 Ok(frontiers)
118 }
119
120 fn collections_frontiers(
123 &self,
124 id: Vec<GlobalId>,
125 ) -> Result<Vec<CollectionFrontiers>, CollectionMissing>;
126
127 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers>;
132
133 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError>;
136
137 async fn snapshot_stats(
140 &self,
141 id: GlobalId,
142 as_of: Antichain<Timestamp>,
143 ) -> Result<SnapshotStats, StorageError>;
144
145 async fn snapshot_parts_stats(
154 &self,
155 id: GlobalId,
156 as_of: Antichain<Timestamp>,
157 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError>>;
158
159 fn snapshot(
161 &self,
162 id: GlobalId,
163 as_of: Timestamp,
164 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError>>;
165
166 async fn snapshot_latest(&self, id: GlobalId) -> Result<Vec<Row>, StorageError>;
181
182 fn snapshot_cursor(
184 &self,
185 id: GlobalId,
186 as_of: Timestamp,
187 ) -> BoxFuture<'static, Result<SnapshotCursor, StorageError>>;
188
189 fn snapshot_and_stream(
194 &self,
195 id: GlobalId,
196 as_of: Timestamp,
197 ) -> BoxFuture<
198 'static,
199 Result<BoxStream<'static, (SourceData, Timestamp, StorageDiff)>, StorageError>,
200 >;
201
202 fn create_update_builder(
205 &self,
206 id: GlobalId,
207 ) -> BoxFuture<
208 'static,
209 Result<TimestamplessUpdateBuilder<SourceData, (), StorageDiff>, StorageError>,
210 >;
211
212 async fn prepare_state(
218 &self,
219 txn: &mut (dyn StorageTxn + Send),
220 ids_to_add: BTreeSet<GlobalId>,
221 ids_to_drop: BTreeSet<GlobalId>,
222 ids_to_register: BTreeMap<GlobalId, ShardId>,
223 ) -> Result<(), StorageError>;
224
225 async fn create_collections_for_bootstrap(
251 &self,
252 storage_metadata: &StorageMetadata,
253 register_ts: Option<Timestamp>,
254 collections: Vec<(GlobalId, CollectionDescription)>,
255 migrated_storage_collections: &BTreeSet<GlobalId>,
256 ) -> Result<(), StorageError>;
257
258 async fn alter_table_desc(
260 &self,
261 existing_collection: GlobalId,
262 new_collection: GlobalId,
263 new_desc: RelationDesc,
264 expected_version: RelationVersion,
265 ) -> Result<(), StorageError>;
266
267 fn drop_collections_unvalidated(
279 &self,
280 storage_metadata: &StorageMetadata,
281 identifiers: Vec<GlobalId>,
282 );
283
284 fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy)>);
298
299 fn acquire_read_holds(
302 &self,
303 desired_holds: Vec<GlobalId>,
304 ) -> Result<Vec<ReadHold>, CollectionMissing>;
305
306 fn determine_time_dependence(
309 &self,
310 id: GlobalId,
311 ) -> Result<Option<TimeDependence>, TimeDependenceError>;
312
313 fn dump(&self) -> Result<serde_json::Value, anyhow::Error>;
315}
316
317pub struct SnapshotCursor {
320 pub _read_handle: ReadHandle<SourceData, (), Timestamp, StorageDiff>,
323 pub cursor: Cursor<SourceData, (), Timestamp, StorageDiff>,
324}
325
326impl SnapshotCursor {
327 pub async fn next(
328 &mut self,
329 ) -> Option<impl Iterator<Item = (SourceData, Timestamp, StorageDiff)> + Sized + '_> {
330 let iter = self.cursor.next().await?;
331 Some(iter.map(|((k, ()), t, d)| (k, t, d)))
332 }
333}
334
335#[derive(Debug)]
337pub struct CollectionFrontiers {
338 pub id: GlobalId,
340
341 pub write_frontier: Antichain<Timestamp>,
343
344 pub implied_capability: Antichain<Timestamp>,
351
352 pub read_capabilities: Antichain<Timestamp>,
355}
356
357#[derive(Debug, Clone)]
360pub struct StorageCollectionsImpl {
361 envd_epoch: NonZeroI64,
364
365 read_only: bool,
371
372 finalizable_shards: Arc<ShardIdSet>,
375
376 finalized_shards: Arc<ShardIdSet>,
381
382 collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState>>>,
384
385 txns_read: TxnsRead<Timestamp>,
387
388 config: Arc<Mutex<StorageConfiguration>>,
390
391 initial_txn_upper: Antichain<Timestamp>,
400
401 persist_location: PersistLocation,
403
404 persist: Arc<PersistClientCache>,
406
407 cmd_tx: mpsc::UnboundedSender<BackgroundCmd>,
409
410 holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<Timestamp>)>,
412
413 _background_task: Arc<AbortOnDropHandle<()>>,
415 _finalize_shards_task: Arc<AbortOnDropHandle<()>>,
416}
417
418type SourceDataStream = BoxStream<'static, (SourceData, Timestamp, StorageDiff)>;
427
428impl StorageCollectionsImpl {
431 pub async fn new(
439 persist_location: PersistLocation,
440 persist_clients: Arc<PersistClientCache>,
441 metrics_registry: &MetricsRegistry,
442 _now: NowFn,
443 txns_metrics: Arc<TxnMetrics>,
444 envd_epoch: NonZeroI64,
445 read_only: bool,
446 connection_context: ConnectionContext,
447 txn: &dyn StorageTxn,
448 ) -> Self {
449 let metrics = StorageCollectionsMetrics::register_into(metrics_registry);
450
451 let txns_id = txn
455 .get_txn_wal_shard()
456 .expect("must call prepare initialization before creating StorageCollections");
457
458 let txns_client = persist_clients
459 .open(persist_location.clone())
460 .await
461 .expect("location should be valid");
462
463 let _txns_handle: TxnsHandle<SourceData, (), Timestamp, StorageDiff, TxnsCodecRow> =
466 TxnsHandle::open(
467 Timestamp::MIN,
468 txns_client.clone(),
469 txns_client.dyncfgs().clone(),
470 Arc::clone(&txns_metrics),
471 txns_id,
472 Opaque::encode(&PersistEpoch::default()),
473 )
474 .await;
475
476 let (txns_key_schema, txns_val_schema) = TxnsCodecRow::schemas();
478 let mut txns_write = txns_client
479 .open_writer(
480 txns_id,
481 Arc::new(txns_key_schema),
482 Arc::new(txns_val_schema),
483 Diagnostics {
484 shard_name: "txns".to_owned(),
485 handle_purpose: "commit txns".to_owned(),
486 },
487 )
488 .await
489 .expect("txns schema shouldn't change");
490
491 let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
492
493 let collections = Arc::new(std::sync::Mutex::new(BTreeMap::default()));
494 let finalizable_shards =
495 Arc::new(ShardIdSet::new(metrics.finalization_outstanding.clone()));
496 let finalized_shards =
497 Arc::new(ShardIdSet::new(metrics.finalization_pending_commit.clone()));
498 let config = Arc::new(Mutex::new(StorageConfiguration::new(
499 connection_context,
500 mz_dyncfgs::all_dyncfgs(),
501 )));
502
503 let initial_txn_upper = txns_write.fetch_recent_upper().await.to_owned();
504
505 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
506 let (holds_tx, holds_rx) = mpsc::unbounded_channel();
507 let mut background_task = BackgroundTask {
508 config: Arc::clone(&config),
509 cmds_tx: cmd_tx.clone(),
510 cmds_rx: cmd_rx,
511 holds_rx,
512 collections: Arc::clone(&collections),
513 finalizable_shards: Arc::clone(&finalizable_shards),
514 shard_by_id: BTreeMap::new(),
515 since_handles: BTreeMap::new(),
516 txns_handle: Some(txns_write),
517 txns_shards: Default::default(),
518 };
519
520 let background_task =
521 mz_ore::task::spawn(|| "storage_collections::background_task", async move {
522 background_task.run().await
523 });
524
525 let finalize_shards_task = mz_ore::task::spawn(
526 || "storage_collections::finalize_shards_task",
527 finalize_shards_task(FinalizeShardsTaskConfig {
528 envd_epoch: envd_epoch.clone(),
529 config: Arc::clone(&config),
530 metrics,
531 finalizable_shards: Arc::clone(&finalizable_shards),
532 finalized_shards: Arc::clone(&finalized_shards),
533 persist_location: persist_location.clone(),
534 persist: Arc::clone(&persist_clients),
535 read_only,
536 }),
537 );
538
539 Self {
540 finalizable_shards,
541 finalized_shards,
542 collections,
543 txns_read,
544 envd_epoch,
545 read_only,
546 config,
547 initial_txn_upper,
548 persist_location,
549 persist: persist_clients,
550 cmd_tx,
551 holds_tx,
552 _background_task: Arc::new(background_task.abort_on_drop()),
553 _finalize_shards_task: Arc::new(finalize_shards_task.abort_on_drop()),
554 }
555 }
556
557 async fn open_data_handles(
565 &self,
566 id: &GlobalId,
567 shard: ShardId,
568 since: Option<&Antichain<Timestamp>>,
569 relation_desc: RelationDesc,
570 persist_client: &PersistClient,
571 ) -> (
572 WriteHandle<SourceData, (), Timestamp, StorageDiff>,
573 SinceHandleWrapper,
574 ) {
575 let since_handle = if self.read_only {
576 let read_handle = self
577 .open_leased_handle(id, shard, relation_desc.clone(), since, persist_client)
578 .await;
579 SinceHandleWrapper::Leased(read_handle)
580 } else {
581 persist_client
584 .upgrade_version::<SourceData, (), Timestamp, StorageDiff>(
585 shard,
586 Diagnostics {
587 shard_name: id.to_string(),
588 handle_purpose: format!("controller data for {}", id),
589 },
590 )
591 .await
592 .expect("invalid persist usage");
593
594 let since_handle = self
595 .open_critical_handle(id, shard, since, persist_client)
596 .await;
597
598 SinceHandleWrapper::Critical(since_handle)
599 };
600
601 let mut write_handle = self
602 .open_write_handle(id, shard, relation_desc, persist_client)
603 .await;
604
605 write_handle.fetch_recent_upper().await;
616
617 (write_handle, since_handle)
618 }
619
620 async fn open_write_handle(
622 &self,
623 id: &GlobalId,
624 shard: ShardId,
625 relation_desc: RelationDesc,
626 persist_client: &PersistClient,
627 ) -> WriteHandle<SourceData, (), Timestamp, StorageDiff> {
628 let diagnostics = Diagnostics {
629 shard_name: id.to_string(),
630 handle_purpose: format!("controller data for {}", id),
631 };
632
633 let write = persist_client
634 .open_writer(
635 shard,
636 Arc::new(relation_desc),
637 Arc::new(UnitSchema),
638 diagnostics.clone(),
639 )
640 .await
641 .expect("invalid persist usage");
642
643 write
644 }
645
646 async fn open_critical_handle(
654 &self,
655 id: &GlobalId,
656 shard: ShardId,
657 since: Option<&Antichain<Timestamp>>,
658 persist_client: &PersistClient,
659 ) -> SinceHandle<SourceData, (), Timestamp, StorageDiff> {
660 tracing::debug!(%id, ?since, "opening critical handle");
661
662 assert!(
663 !self.read_only,
664 "attempting to open critical SinceHandle in read-only mode"
665 );
666
667 let diagnostics = Diagnostics {
668 shard_name: id.to_string(),
669 handle_purpose: format!("controller data for {}", id),
670 };
671
672 let since_handle = {
675 let mut handle = persist_client
678 .open_critical_since(
679 shard,
680 PersistClient::CONTROLLER_CRITICAL_SINCE,
681 Opaque::encode(&PersistEpoch::default()),
682 diagnostics.clone(),
683 )
684 .await
685 .expect("invalid persist usage");
686
687 let provided_since = match since {
691 Some(since) => since,
692 None => &Antichain::from_elem(Timestamp::MIN),
693 };
694 let since = handle.since().join(provided_since);
695
696 let our_epoch = self.envd_epoch;
697
698 loop {
699 let current_epoch: PersistEpoch = handle.opaque().decode();
700
701 let unchecked_success = current_epoch.0.map(|e| e <= our_epoch).unwrap_or(true);
703
704 if unchecked_success {
705 let checked_success = handle
708 .compare_and_downgrade_since(
709 &Opaque::encode(¤t_epoch),
710 (&Opaque::encode(&PersistEpoch::from(our_epoch)), &since),
711 )
712 .await
713 .is_ok();
714 if checked_success {
715 break handle;
716 }
717 } else {
718 mz_ore::halt!("fenced by envd @ {current_epoch:?}. ours = {our_epoch}");
719 }
720 }
721 };
722
723 since_handle
724 }
725
726 async fn open_leased_handle(
732 &self,
733 id: &GlobalId,
734 shard: ShardId,
735 relation_desc: RelationDesc,
736 since: Option<&Antichain<Timestamp>>,
737 persist_client: &PersistClient,
738 ) -> ReadHandle<SourceData, (), Timestamp, StorageDiff> {
739 tracing::debug!(%id, ?since, "opening leased handle");
740
741 let diagnostics = Diagnostics {
742 shard_name: id.to_string(),
743 handle_purpose: format!("controller data for {}", id),
744 };
745
746 let use_critical_since = false;
747 let mut handle: ReadHandle<_, _, _, _> = persist_client
748 .open_leased_reader(
749 shard,
750 Arc::new(relation_desc),
751 Arc::new(UnitSchema),
752 diagnostics.clone(),
753 use_critical_since,
754 )
755 .await
756 .expect("invalid persist usage");
757
758 let provided_since = match since {
762 Some(since) => since,
763 None => &Antichain::from_elem(Timestamp::MIN),
764 };
765 let since = handle.since().join(provided_since);
766
767 handle.downgrade_since(&since).await;
768
769 handle
770 }
771
772 fn register_handles(
773 &self,
774 id: GlobalId,
775 is_in_txns: bool,
776 since_handle: SinceHandleWrapper,
777 write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
778 ) {
779 self.send(BackgroundCmd::Register {
780 id,
781 is_in_txns,
782 since_handle,
783 write_handle,
784 });
785 }
786
787 fn send(&self, cmd: BackgroundCmd) {
788 let _ = self.cmd_tx.send(cmd);
789 }
790
791 async fn snapshot_stats_inner(
792 &self,
793 id: GlobalId,
794 as_of: SnapshotStatsAsOf,
795 ) -> Result<SnapshotStats, StorageError> {
796 let (tx, rx) = oneshot::channel();
803 self.send(BackgroundCmd::SnapshotStats(id, as_of, tx));
804 rx.await.expect("BackgroundTask should be live").0.await
805 }
806
807 fn install_collection_dependency_read_holds_inner(
813 &self,
814 self_collections: &mut BTreeMap<GlobalId, CollectionState>,
815 id: GlobalId,
816 ) -> Result<(), StorageError> {
817 let (deps, collection_implied_capability) = match self_collections.get(&id) {
818 Some(CollectionState {
819 storage_dependencies: deps,
820 implied_capability,
821 ..
822 }) => (deps.clone(), implied_capability),
823 _ => return Ok(()),
824 };
825
826 for dep in deps.iter() {
827 let dep_collection = self_collections
828 .get(dep)
829 .ok_or(StorageError::IdentifierMissing(id))?;
830
831 mz_ore::soft_assert_or_log!(
832 PartialOrder::less_equal(
833 &dep_collection.implied_capability,
834 collection_implied_capability
835 ),
836 "dependency since ({dep}@{:?}) cannot be in advance of dependent's since ({id}@{:?})",
837 dep_collection.implied_capability,
838 collection_implied_capability,
839 );
840 }
841
842 self.install_read_capabilities_inner(
843 self_collections,
844 id,
845 &deps,
846 collection_implied_capability.clone(),
847 )?;
848
849 Ok(())
850 }
851
852 fn determine_collection_dependencies(
854 self_collections: &BTreeMap<GlobalId, CollectionState>,
855 source_id: GlobalId,
856 collection_desc: &CollectionDescription,
857 ) -> Result<Vec<GlobalId>, StorageError> {
858 let mut dependencies = Vec::new();
859
860 if let Some(id) = collection_desc.primary {
861 dependencies.push(id);
862 }
863
864 match &collection_desc.data_source {
865 DataSource::Introspection(_)
866 | DataSource::Webhook
867 | DataSource::Table
868 | DataSource::Progress
869 | DataSource::Other => (),
870 DataSource::IngestionExport {
871 ingestion_id,
872 data_config,
873 ..
874 } => {
875 let source = self_collections
878 .get(ingestion_id)
879 .ok_or(StorageError::IdentifierMissing(*ingestion_id))?;
880 let Some(remap_collection_id) = &source.ingestion_remap_collection_id else {
881 panic!("SourceExport must refer to a primary source that already exists");
882 };
883
884 match data_config.envelope {
885 SourceEnvelope::CdcV2 => (),
886 _ => dependencies.push(*remap_collection_id),
887 }
888 }
889 DataSource::Ingestion(ingestion) => {
891 if ingestion.remap_collection_id != source_id {
892 dependencies.push(ingestion.remap_collection_id);
893 }
894 }
895 DataSource::Sink { desc } => dependencies.push(desc.sink.from),
896 }
897
898 Ok(dependencies)
899 }
900
901 #[instrument(level = "debug")]
903 fn install_read_capabilities_inner(
904 &self,
905 self_collections: &mut BTreeMap<GlobalId, CollectionState>,
906 from_id: GlobalId,
907 storage_dependencies: &[GlobalId],
908 read_capability: Antichain<Timestamp>,
909 ) -> Result<(), StorageError> {
910 let mut changes = ChangeBatch::new();
911 for time in read_capability.iter() {
912 changes.update(*time, 1);
913 }
914
915 if tracing::span_enabled!(tracing::Level::TRACE) {
916 let user_capabilities = self_collections
918 .iter_mut()
919 .filter(|(id, _c)| id.is_user())
920 .map(|(id, c)| {
921 let updates = c.read_capabilities.updates().cloned().collect_vec();
922 (*id, c.implied_capability.clone(), updates)
923 })
924 .collect_vec();
925
926 trace!(
927 %from_id,
928 ?storage_dependencies,
929 ?read_capability,
930 ?user_capabilities,
931 "install_read_capabilities_inner");
932 }
933
934 let mut storage_read_updates = storage_dependencies
935 .iter()
936 .map(|id| (*id, changes.clone()))
937 .collect();
938
939 StorageCollectionsImpl::update_read_capabilities_inner(
940 &self.cmd_tx,
941 self_collections,
942 &mut storage_read_updates,
943 );
944
945 if tracing::span_enabled!(tracing::Level::TRACE) {
946 let user_capabilities = self_collections
948 .iter_mut()
949 .filter(|(id, _c)| id.is_user())
950 .map(|(id, c)| {
951 let updates = c.read_capabilities.updates().cloned().collect_vec();
952 (*id, c.implied_capability.clone(), updates)
953 })
954 .collect_vec();
955
956 trace!(
957 %from_id,
958 ?storage_dependencies,
959 ?read_capability,
960 ?user_capabilities,
961 "after install_read_capabilities_inner!");
962 }
963
964 Ok(())
965 }
966
967 async fn recent_upper(&self, id: GlobalId) -> Result<Antichain<Timestamp>, StorageError> {
968 let metadata = &self.collection_metadata(id)?;
969 let persist_client = self
970 .persist
971 .open(metadata.persist_location.clone())
972 .await
973 .unwrap();
974 let diagnostics = Diagnostics {
977 shard_name: id.to_string(),
978 handle_purpose: format!("controller data for {}", id),
979 };
980 let write = persist_client
983 .open_writer::<SourceData, (), Timestamp, StorageDiff>(
984 metadata.data_shard,
985 Arc::new(metadata.relation_desc.clone()),
986 Arc::new(UnitSchema),
987 diagnostics.clone(),
988 )
989 .await
990 .expect("invalid persist usage");
991 Ok(write.shared_upper())
992 }
993
994 async fn read_handle_for_snapshot(
995 persist: Arc<PersistClientCache>,
996 metadata: &CollectionMetadata,
997 id: GlobalId,
998 ) -> Result<ReadHandle<SourceData, (), Timestamp, StorageDiff>, StorageError> {
999 let persist_client = persist
1000 .open(metadata.persist_location.clone())
1001 .await
1002 .unwrap();
1003
1004 let read_handle = persist_client
1010 .open_leased_reader::<SourceData, (), _, _>(
1011 metadata.data_shard,
1012 Arc::new(metadata.relation_desc.clone()),
1013 Arc::new(UnitSchema),
1014 Diagnostics {
1015 shard_name: id.to_string(),
1016 handle_purpose: format!("snapshot {}", id),
1017 },
1018 USE_CRITICAL_SINCE_SNAPSHOT.get(&persist.cfg),
1019 )
1020 .await
1021 .expect("invalid persist usage");
1022 Ok(read_handle)
1023 }
1024
1025 fn snapshot(
1026 &self,
1027 id: GlobalId,
1028 as_of: Timestamp,
1029 txns_read: &TxnsRead<Timestamp>,
1030 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError>> {
1031 let metadata = match self.collection_metadata(id) {
1032 Ok(metadata) => metadata.clone(),
1033 Err(e) => return async { Err(e.into()) }.boxed(),
1034 };
1035 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1036 assert_eq!(txns_id, txns_read.txns_id());
1037 txns_read.clone()
1038 });
1039 let persist = Arc::clone(&self.persist);
1040 async move {
1041 let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1042 let contents = match txns_read {
1043 None => {
1044 read_handle
1046 .snapshot_and_fetch(Antichain::from_elem(as_of))
1047 .await
1048 }
1049 Some(txns_read) => {
1050 txns_read.update_gt(as_of).await;
1064 let data_snapshot = txns_read.data_snapshot(metadata.data_shard, as_of).await;
1065 data_snapshot.snapshot_and_fetch(&mut read_handle).await
1066 }
1067 };
1068 match contents {
1069 Ok(contents) => {
1070 let mut snapshot = Vec::with_capacity(contents.len());
1071 for ((data, _), _, diff) in contents {
1072 let row = data.0?;
1075 snapshot.push((row, diff));
1076 }
1077 Ok(snapshot)
1078 }
1079 Err(_) => Err(StorageError::ReadBeforeSince(id)),
1080 }
1081 }
1082 .boxed()
1083 }
1084
1085 fn snapshot_and_stream(
1086 &self,
1087 id: GlobalId,
1088 as_of: Timestamp,
1089 txns_read: &TxnsRead<Timestamp>,
1090 ) -> BoxFuture<'static, Result<SourceDataStream, StorageError>> {
1091 use futures::stream::StreamExt;
1092
1093 let metadata = match self.collection_metadata(id) {
1094 Ok(metadata) => metadata.clone(),
1095 Err(e) => return async { Err(e.into()) }.boxed(),
1096 };
1097 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1098 assert_eq!(txns_id, txns_read.txns_id());
1099 txns_read.clone()
1100 });
1101 let persist = Arc::clone(&self.persist);
1102
1103 async move {
1104 let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1105 let stream = match txns_read {
1106 None => {
1107 read_handle
1109 .snapshot_and_stream(Antichain::from_elem(as_of))
1110 .await
1111 .map_err(|_| StorageError::ReadBeforeSince(id))?
1112 .boxed()
1113 }
1114 Some(txns_read) => {
1115 txns_read.update_gt(as_of).await;
1116 let data_snapshot = txns_read.data_snapshot(metadata.data_shard, as_of).await;
1117 data_snapshot
1118 .snapshot_and_stream(&mut read_handle)
1119 .await
1120 .map_err(|_| StorageError::ReadBeforeSince(id))?
1121 .boxed()
1122 }
1123 };
1124
1125 let stream = stream.map(|((data, _v), t, d)| (data, t, d)).boxed();
1127 Ok(stream)
1128 }
1129 .boxed()
1130 }
1131
1132 fn set_read_policies_inner(
1133 &self,
1134 collections: &mut BTreeMap<GlobalId, CollectionState>,
1135 policies: Vec<(GlobalId, ReadPolicy)>,
1136 ) {
1137 trace!("set_read_policies: {:?}", policies);
1138
1139 let mut read_capability_changes = BTreeMap::default();
1140
1141 for (id, policy) in policies.into_iter() {
1142 let collection = match collections.get_mut(&id) {
1143 Some(c) => c,
1144 None => {
1145 panic!("Reference to absent collection {id}");
1146 }
1147 };
1148
1149 let mut new_read_capability = policy.frontier(collection.write_frontier.borrow());
1150
1151 if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
1152 let mut update = ChangeBatch::new();
1153 update.extend(new_read_capability.iter().map(|time| (*time, 1)));
1154 std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
1155 update.extend(new_read_capability.iter().map(|time| (*time, -1)));
1156 if !update.is_empty() {
1157 read_capability_changes.insert(id, update);
1158 }
1159 }
1160
1161 collection.read_policy = policy;
1162 }
1163
1164 for (id, changes) in read_capability_changes.iter() {
1165 if id.is_user() {
1166 trace!(%id, ?changes, "in set_read_policies, capability changes");
1167 }
1168 }
1169
1170 if !read_capability_changes.is_empty() {
1171 StorageCollectionsImpl::update_read_capabilities_inner(
1172 &self.cmd_tx,
1173 collections,
1174 &mut read_capability_changes,
1175 );
1176 }
1177 }
1178
1179 fn update_read_capabilities_inner(
1183 cmd_tx: &mpsc::UnboundedSender<BackgroundCmd>,
1184 collections: &mut BTreeMap<GlobalId, CollectionState>,
1185 updates: &mut BTreeMap<GlobalId, ChangeBatch<Timestamp>>,
1186 ) {
1187 let mut collections_net = BTreeMap::new();
1189
1190 while let Some(id) = updates.keys().rev().next().cloned() {
1195 let mut update = updates.remove(&id).unwrap();
1196
1197 if id.is_user() {
1198 trace!(id = ?id, update = ?update, "update_read_capabilities");
1199 }
1200
1201 let collection = if let Some(c) = collections.get_mut(&id) {
1202 c
1203 } else {
1204 let has_positive_updates = update.iter().any(|(_ts, diff)| *diff > 0);
1205 if has_positive_updates {
1206 panic!(
1207 "reference to absent collection {id} but we have positive updates: {:?}",
1208 update
1209 );
1210 } else {
1211 continue;
1214 }
1215 };
1216
1217 let current_read_capabilities = collection.read_capabilities.frontier().to_owned();
1218 for (time, diff) in update.iter() {
1219 assert!(
1220 collection.read_capabilities.count_for(time) + diff >= 0,
1221 "update {:?} for collection {id} would lead to negative \
1222 read capabilities, read capabilities before applying: {:?}",
1223 update,
1224 collection.read_capabilities
1225 );
1226
1227 if collection.read_capabilities.count_for(time) + diff > 0 {
1228 assert!(
1229 current_read_capabilities.less_equal(time),
1230 "update {:?} for collection {id} is trying to \
1231 install read capabilities before the current \
1232 frontier of read capabilities, read capabilities before applying: {:?}",
1233 update,
1234 collection.read_capabilities
1235 );
1236 }
1237 }
1238
1239 let changes = collection.read_capabilities.update_iter(update.drain());
1240 update.extend(changes);
1241
1242 if id.is_user() {
1243 trace!(
1244 %id,
1245 ?collection.storage_dependencies,
1246 ?update,
1247 "forwarding update to storage dependencies");
1248 }
1249
1250 for id in collection.storage_dependencies.iter() {
1251 updates
1252 .entry(*id)
1253 .or_insert_with(ChangeBatch::new)
1254 .extend(update.iter().cloned());
1255 }
1256
1257 let (changes, frontier) = collections_net
1258 .entry(id)
1259 .or_insert_with(|| (<ChangeBatch<_>>::new(), Antichain::new()));
1260
1261 changes.extend(update.drain());
1262 *frontier = collection.read_capabilities.frontier().to_owned();
1263 }
1264
1265 let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
1268 for (key, (mut changes, frontier)) in collections_net {
1269 if !changes.is_empty() {
1270 let collection = collections.get(&key).expect("must still exist");
1272 let should_emit_persist_compaction = collection.primary.is_none();
1273
1274 if frontier.is_empty() {
1275 info!(id = %key, "removing collection state because the since advanced to []!");
1276 collections.remove(&key).expect("must still exist");
1277 }
1278
1279 if should_emit_persist_compaction {
1280 persist_compaction_commands.push((key, frontier));
1281 }
1282 }
1283 }
1284
1285 if !persist_compaction_commands.is_empty() {
1286 cmd_tx
1287 .send(BackgroundCmd::DowngradeSince(persist_compaction_commands))
1288 .expect("cannot fail to send");
1289 }
1290 }
1291
1292 fn synchronize_finalized_shards(&self, storage_metadata: &StorageMetadata) {
1294 self.finalized_shards
1295 .lock()
1296 .retain(|shard| storage_metadata.unfinalized_shards.contains(shard));
1297 }
1298}
1299
1300#[async_trait]
1302impl StorageCollections for StorageCollectionsImpl {
1303 async fn initialize_state(
1304 &self,
1305 txn: &mut (dyn StorageTxn + Send),
1306 init_ids: BTreeSet<GlobalId>,
1307 ) -> Result<(), StorageError> {
1308 let metadata = txn.get_collection_metadata();
1309 let existing_metadata: BTreeSet<_> = metadata.into_iter().map(|(id, _)| id).collect();
1310
1311 let new_collections: BTreeSet<GlobalId> =
1313 init_ids.difference(&existing_metadata).cloned().collect();
1314
1315 self.prepare_state(
1316 txn,
1317 new_collections,
1318 BTreeSet::default(),
1319 BTreeMap::default(),
1320 )
1321 .await?;
1322
1323 let unfinalized_shards = txn.get_unfinalized_shards().into_iter().collect_vec();
1331
1332 info!(?unfinalized_shards, "initializing finalizable_shards");
1333
1334 self.finalizable_shards.lock().extend(unfinalized_shards);
1335
1336 Ok(())
1337 }
1338
1339 fn update_parameters(&self, config_params: StorageParameters) {
1340 config_params.dyncfg_updates.apply(self.persist.cfg());
1343
1344 self.config
1345 .lock()
1346 .expect("lock poisoned")
1347 .update(config_params);
1348 }
1349
1350 fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing> {
1351 let collections = self.collections.lock().expect("lock poisoned");
1352
1353 collections
1354 .get(&id)
1355 .map(|c| c.collection_metadata.clone())
1356 .ok_or(CollectionMissing(id))
1357 }
1358
1359 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
1360 let collections = self.collections.lock().expect("lock poisoned");
1361
1362 collections
1363 .iter()
1364 .filter(|(_id, c)| !c.is_dropped())
1365 .map(|(id, c)| (*id, c.collection_metadata.clone()))
1366 .collect()
1367 }
1368
1369 fn collections_frontiers(
1370 &self,
1371 ids: Vec<GlobalId>,
1372 ) -> Result<Vec<CollectionFrontiers>, CollectionMissing> {
1373 if ids.is_empty() {
1374 return Ok(vec![]);
1375 }
1376
1377 let collections = self.collections.lock().expect("lock poisoned");
1378
1379 let res = ids
1380 .into_iter()
1381 .map(|id| {
1382 collections
1383 .get(&id)
1384 .map(|c| CollectionFrontiers {
1385 id: id.clone(),
1386 write_frontier: c.write_frontier.clone(),
1387 implied_capability: c.implied_capability.clone(),
1388 read_capabilities: c.read_capabilities.frontier().to_owned(),
1389 })
1390 .ok_or(CollectionMissing(id))
1391 })
1392 .collect::<Result<Vec<_>, _>>()?;
1393
1394 Ok(res)
1395 }
1396
1397 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers> {
1398 let collections = self.collections.lock().expect("lock poisoned");
1399
1400 let res = collections
1401 .iter()
1402 .filter(|(_id, c)| !c.is_dropped())
1403 .map(|(id, c)| CollectionFrontiers {
1404 id: id.clone(),
1405 write_frontier: c.write_frontier.clone(),
1406 implied_capability: c.implied_capability.clone(),
1407 read_capabilities: c.read_capabilities.frontier().to_owned(),
1408 })
1409 .collect_vec();
1410
1411 res
1412 }
1413
1414 async fn snapshot_stats(
1415 &self,
1416 id: GlobalId,
1417 as_of: Antichain<Timestamp>,
1418 ) -> Result<SnapshotStats, StorageError> {
1419 let metadata = self.collection_metadata(id)?;
1420
1421 let as_of = match metadata.txns_shard.as_ref() {
1424 None => SnapshotStatsAsOf::Direct(as_of),
1425 Some(txns_id) => {
1426 assert_eq!(txns_id, self.txns_read.txns_id());
1427 let as_of = as_of
1428 .into_option()
1429 .expect("cannot read as_of the empty antichain");
1430 self.txns_read.update_gt(as_of).await;
1431 let data_snapshot = self
1432 .txns_read
1433 .data_snapshot(metadata.data_shard, as_of)
1434 .await;
1435 SnapshotStatsAsOf::Txns(data_snapshot)
1436 }
1437 };
1438 self.snapshot_stats_inner(id, as_of).await
1439 }
1440
1441 async fn snapshot_parts_stats(
1442 &self,
1443 id: GlobalId,
1444 as_of: Antichain<Timestamp>,
1445 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError>> {
1446 let metadata = {
1447 let self_collections = self.collections.lock().expect("lock poisoned");
1448
1449 let collection_metadata = self_collections
1450 .get(&id)
1451 .ok_or(StorageError::IdentifierMissing(id))
1452 .map(|c| c.collection_metadata.clone());
1453
1454 match collection_metadata {
1455 Ok(m) => m,
1456 Err(e) => return Box::pin(async move { Err(e) }),
1457 }
1458 };
1459
1460 let persist = Arc::clone(&self.persist);
1463 let read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await;
1464
1465 let data_snapshot = match (metadata, as_of.as_option()) {
1466 (
1467 CollectionMetadata {
1468 txns_shard: Some(txns_id),
1469 data_shard,
1470 ..
1471 },
1472 Some(as_of),
1473 ) => {
1474 assert_eq!(txns_id, *self.txns_read.txns_id());
1475 self.txns_read.update_gt(*as_of).await;
1476 let data_snapshot = self.txns_read.data_snapshot(data_shard, *as_of).await;
1477 Some(data_snapshot)
1478 }
1479 _ => None,
1480 };
1481
1482 Box::pin(async move {
1483 let read_handle = read_handle?;
1484 let result = match data_snapshot {
1485 Some(data_snapshot) => data_snapshot.snapshot_parts_stats(&read_handle).await,
1486 None => read_handle.snapshot_parts_stats(as_of).await,
1487 };
1488 read_handle.expire().await;
1489 result.map_err(|_| StorageError::ReadBeforeSince(id))
1490 })
1491 }
1492
1493 fn snapshot(
1494 &self,
1495 id: GlobalId,
1496 as_of: Timestamp,
1497 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError>> {
1498 self.snapshot(id, as_of, &self.txns_read)
1499 }
1500
1501 async fn snapshot_latest(&self, id: GlobalId) -> Result<Vec<Row>, StorageError> {
1502 let upper = self.recent_upper(id).await?;
1503 let res = match upper.as_option() {
1504 Some(f) if f > &Timestamp::MIN => {
1505 let as_of = f.step_back().expect("checked that f > &Timestamp::MIN");
1506
1507 let snapshot = self.snapshot(id, as_of, &self.txns_read).await?;
1508 snapshot
1509 .into_iter()
1510 .map(|(row, diff)| {
1511 assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
1514 row
1515 })
1516 .collect()
1517 }
1518 Some(_min) => {
1519 Vec::new()
1521 }
1522 _ => {
1525 return Err(StorageError::InvalidUsage(
1526 "collection closed, cannot determine a read timestamp based on the upper"
1527 .to_string(),
1528 ));
1529 }
1530 };
1531
1532 Ok(res)
1533 }
1534
1535 fn snapshot_cursor(
1536 &self,
1537 id: GlobalId,
1538 as_of: Timestamp,
1539 ) -> BoxFuture<'static, Result<SnapshotCursor, StorageError>> {
1540 let metadata = match self.collection_metadata(id) {
1541 Ok(metadata) => metadata.clone(),
1542 Err(e) => return async { Err(e.into()) }.boxed(),
1543 };
1544 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1545 assert_eq!(txns_id, self.txns_read.txns_id());
1548 self.txns_read.clone()
1549 });
1550 let persist = Arc::clone(&self.persist);
1551
1552 async move {
1554 let mut handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1555 let cursor = match txns_read {
1556 None => {
1557 let cursor = handle
1558 .snapshot_cursor(Antichain::from_elem(as_of), |_| true)
1559 .await
1560 .map_err(|_| StorageError::ReadBeforeSince(id))?;
1561 SnapshotCursor {
1562 _read_handle: handle,
1563 cursor,
1564 }
1565 }
1566 Some(txns_read) => {
1567 txns_read.update_gt(as_of).await;
1568 let data_snapshot = txns_read.data_snapshot(metadata.data_shard, as_of).await;
1569 let cursor = data_snapshot
1570 .snapshot_cursor(&mut handle, |_| true)
1571 .await
1572 .map_err(|_| StorageError::ReadBeforeSince(id))?;
1573 SnapshotCursor {
1574 _read_handle: handle,
1575 cursor,
1576 }
1577 }
1578 };
1579
1580 Ok(cursor)
1581 }
1582 .boxed()
1583 }
1584
1585 fn snapshot_and_stream(
1586 &self,
1587 id: GlobalId,
1588 as_of: Timestamp,
1589 ) -> BoxFuture<
1590 'static,
1591 Result<BoxStream<'static, (SourceData, Timestamp, StorageDiff)>, StorageError>,
1592 > {
1593 self.snapshot_and_stream(id, as_of, &self.txns_read)
1594 }
1595
1596 fn create_update_builder(
1597 &self,
1598 id: GlobalId,
1599 ) -> BoxFuture<
1600 'static,
1601 Result<TimestamplessUpdateBuilder<SourceData, (), StorageDiff>, StorageError>,
1602 > {
1603 let metadata = match self.collection_metadata(id) {
1604 Ok(m) => m,
1605 Err(e) => return Box::pin(async move { Err(e.into()) }),
1606 };
1607 let persist = Arc::clone(&self.persist);
1608
1609 async move {
1610 let persist_client = persist
1611 .open(metadata.persist_location.clone())
1612 .await
1613 .expect("invalid persist usage");
1614 let write_handle = persist_client
1615 .open_writer::<SourceData, (), Timestamp, StorageDiff>(
1616 metadata.data_shard,
1617 Arc::new(metadata.relation_desc.clone()),
1618 Arc::new(UnitSchema),
1619 Diagnostics {
1620 shard_name: id.to_string(),
1621 handle_purpose: format!("create write batch {}", id),
1622 },
1623 )
1624 .await
1625 .expect("invalid persist usage");
1626 let builder = TimestamplessUpdateBuilder::new(&write_handle);
1627
1628 Ok(builder)
1629 }
1630 .boxed()
1631 }
1632
1633 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError> {
1634 let collections = self.collections.lock().expect("lock poisoned");
1635
1636 if collections.contains_key(&id) {
1637 Ok(())
1638 } else {
1639 Err(StorageError::IdentifierMissing(id))
1640 }
1641 }
1642
1643 async fn prepare_state(
1644 &self,
1645 txn: &mut (dyn StorageTxn + Send),
1646 ids_to_add: BTreeSet<GlobalId>,
1647 ids_to_drop: BTreeSet<GlobalId>,
1648 ids_to_register: BTreeMap<GlobalId, ShardId>,
1649 ) -> Result<(), StorageError> {
1650 txn.insert_collection_metadata(
1651 ids_to_add
1652 .into_iter()
1653 .map(|id| (id, ShardId::new()))
1654 .collect(),
1655 )?;
1656 txn.insert_collection_metadata(ids_to_register)?;
1657
1658 let dropped_mappings = txn.delete_collection_metadata(ids_to_drop);
1660
1661 let mut dropped_shards = BTreeSet::new();
1664 {
1665 let collections = self.collections.lock().expect("poisoned");
1666 for (id, shard) in dropped_mappings {
1667 let coll = collections.get(&id).expect("must exist");
1668 if coll.primary.is_none() {
1669 dropped_shards.insert(shard);
1670 }
1671 }
1672 }
1673 txn.insert_unfinalized_shards(dropped_shards)?;
1674
1675 let finalized_shards = self.finalized_shards.lock().iter().copied().collect();
1678 txn.mark_shards_as_finalized(finalized_shards);
1679
1680 Ok(())
1681 }
1682
1683 #[instrument(level = "debug")]
1686 async fn create_collections_for_bootstrap(
1687 &self,
1688 storage_metadata: &StorageMetadata,
1689 register_ts: Option<Timestamp>,
1690 mut collections: Vec<(GlobalId, CollectionDescription)>,
1691 migrated_storage_collections: &BTreeSet<GlobalId>,
1692 ) -> Result<(), StorageError> {
1693 let is_in_txns = |id, metadata: &CollectionMetadata| {
1694 metadata.txns_shard.is_some()
1695 && !(self.read_only && migrated_storage_collections.contains(&id))
1696 };
1697
1698 collections.sort_by_key(|(id, _)| *id);
1703 collections.dedup();
1704 for pos in 1..collections.len() {
1705 if collections[pos - 1].0 == collections[pos].0 {
1706 return Err(StorageError::CollectionIdReused(collections[pos].0));
1707 }
1708 }
1709
1710 let enriched_with_metadata = collections
1713 .into_iter()
1714 .map(|(id, description)| {
1715 let data_shard = storage_metadata.get_collection_shard(id)?;
1716
1717 let txns_shard = description
1721 .data_source
1722 .in_txns()
1723 .then(|| *self.txns_read.txns_id());
1724
1725 let metadata = CollectionMetadata {
1726 persist_location: self.persist_location.clone(),
1727 data_shard,
1728 relation_desc: description.desc.clone(),
1729 txns_shard,
1730 };
1731
1732 Ok((id, description, metadata))
1733 })
1734 .collect_vec();
1735
1736 let persist_client = self
1738 .persist
1739 .open(self.persist_location.clone())
1740 .await
1741 .unwrap();
1742 let persist_client = &persist_client;
1743 use futures::stream::{StreamExt, TryStreamExt};
1746 let this = &*self;
1747 let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
1748 .map(|data: Result<_, StorageError>| {
1749 async move {
1750 let (id, description, metadata) = data?;
1751
1752 debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
1757
1758 let since = if description.primary.is_some() {
1762 None
1763 } else {
1764 description.since.as_ref()
1765 };
1766
1767 let (write, mut since_handle) = this
1768 .open_data_handles(
1769 &id,
1770 metadata.data_shard,
1771 since,
1772 metadata.relation_desc.clone(),
1773 persist_client,
1774 )
1775 .await;
1776
1777 match description.data_source {
1786 DataSource::Introspection(_)
1787 | DataSource::IngestionExport { .. }
1788 | DataSource::Webhook
1789 | DataSource::Ingestion(_)
1790 | DataSource::Progress
1791 | DataSource::Other => {}
1792 DataSource::Sink { .. } => {}
1793 DataSource::Table => {
1794 let register_ts = register_ts.expect(
1795 "caller should have provided a register_ts when creating a table",
1796 );
1797 if since_handle.since().elements() == &[Timestamp::MIN]
1798 && !migrated_storage_collections.contains(&id)
1799 {
1800 debug!("advancing {} to initial since of {:?}", id, register_ts);
1801 let token = since_handle.opaque();
1802 let _ = since_handle
1803 .compare_and_downgrade_since(
1804 &token,
1805 (&token, &Antichain::from_elem(register_ts)),
1806 )
1807 .await;
1808 }
1809 }
1810 }
1811
1812 Ok::<_, StorageError>((id, description, write, since_handle, metadata))
1813 }
1814 })
1815 .buffer_unordered(50)
1817 .try_collect()
1831 .await?;
1832
1833 #[derive(Ord, PartialOrd, Eq, PartialEq)]
1835 enum DependencyOrder {
1836 Table(Reverse<GlobalId>),
1838 Collection(GlobalId),
1840 Sink(GlobalId),
1842 }
1843 to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
1844 DataSource::Table => DependencyOrder::Table(Reverse(*id)),
1845 DataSource::Sink { .. } => DependencyOrder::Sink(*id),
1846 _ => DependencyOrder::Collection(*id),
1847 });
1848
1849 let mut self_collections = self.collections.lock().expect("lock poisoned");
1852
1853 for (id, description, write_handle, since_handle, metadata) in to_register {
1854 let write_frontier = write_handle.upper();
1855 let data_shard_since = since_handle.since().clone();
1856
1857 let storage_dependencies =
1859 Self::determine_collection_dependencies(&*self_collections, id, &description)?;
1860
1861 let initial_since = match storage_dependencies
1863 .iter()
1864 .at_most_one()
1865 .expect("should have at most one dependency")
1866 {
1867 Some(dep) => {
1868 let dependency_collection = self_collections
1869 .get(dep)
1870 .ok_or(StorageError::IdentifierMissing(*dep))?;
1871 let dependency_since = dependency_collection.implied_capability.clone();
1872
1873 if PartialOrder::less_than(&data_shard_since, &dependency_since) {
1884 mz_ore::soft_assert_or_log!(
1903 write_frontier.elements() == &[Timestamp::MIN]
1904 || write_frontier.is_empty()
1905 || PartialOrder::less_than(&dependency_since, write_frontier),
1906 "dependency ({dep}) since has advanced past dependent ({id}) upper \n
1907 dependent ({id}): since {:?}, upper {:?} \n
1908 dependency ({dep}): since {:?}",
1909 data_shard_since,
1910 write_frontier,
1911 dependency_since
1912 );
1913
1914 dependency_since
1915 } else {
1916 data_shard_since
1917 }
1918 }
1919 None => data_shard_since,
1920 };
1921
1922 let time_dependence = {
1924 use DataSource::*;
1925 if let Some(timeline) = &description.timeline
1926 && *timeline != Timeline::EpochMilliseconds
1927 {
1928 None
1930 } else {
1931 match &description.data_source {
1932 Ingestion(ingestion) => {
1933 use GenericSourceConnection::*;
1934 match ingestion.desc.connection {
1935 Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => {
1938 Some(TimeDependence::default())
1939 }
1940 LoadGenerator(_) => None,
1942 }
1943 }
1944 IngestionExport { ingestion_id, .. } => {
1945 let c = self_collections.get(ingestion_id).expect("known to exist");
1946 c.time_dependence.clone()
1947 }
1948 Introspection(_) | Progress | Table { .. } | Webhook { .. } => {
1950 Some(TimeDependence::default())
1951 }
1952 Other => None,
1954 Sink { .. } => None,
1955 }
1956 }
1957 };
1958
1959 let ingestion_remap_collection_id = match &description.data_source {
1960 DataSource::Ingestion(desc) => Some(desc.remap_collection_id),
1961 _ => None,
1962 };
1963
1964 let mut collection_state = CollectionState::new(
1965 description.primary,
1966 time_dependence,
1967 ingestion_remap_collection_id,
1968 initial_since,
1969 write_frontier.clone(),
1970 storage_dependencies,
1971 metadata.clone(),
1972 );
1973
1974 match &description.data_source {
1976 DataSource::Introspection(_) => {
1977 self_collections.insert(id, collection_state);
1978 }
1979 DataSource::Webhook => {
1980 self_collections.insert(id, collection_state);
1981 }
1982 DataSource::IngestionExport { .. } => {
1983 self_collections.insert(id, collection_state);
1984 }
1985 DataSource::Table => {
1986 if is_in_txns(id, &metadata)
1989 && PartialOrder::less_than(
1990 &collection_state.write_frontier,
1991 &self.initial_txn_upper,
1992 )
1993 {
1994 collection_state
2000 .write_frontier
2001 .clone_from(&self.initial_txn_upper);
2002 }
2003 self_collections.insert(id, collection_state);
2004 }
2005 DataSource::Progress | DataSource::Other => {
2006 self_collections.insert(id, collection_state);
2007 }
2008 DataSource::Ingestion(_) => {
2009 self_collections.insert(id, collection_state);
2010 }
2011 DataSource::Sink { .. } => {
2012 self_collections.insert(id, collection_state);
2013 }
2014 }
2015
2016 self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle);
2017
2018 self.install_collection_dependency_read_holds_inner(&mut *self_collections, id)?;
2020 }
2021
2022 drop(self_collections);
2023
2024 self.synchronize_finalized_shards(storage_metadata);
2025
2026 Ok(())
2027 }
2028
2029 async fn alter_table_desc(
2030 &self,
2031 existing_collection: GlobalId,
2032 new_collection: GlobalId,
2033 new_desc: RelationDesc,
2034 expected_version: RelationVersion,
2035 ) -> Result<(), StorageError> {
2036 let data_shard = {
2037 let self_collections = self.collections.lock().expect("lock poisoned");
2038 let existing = self_collections
2039 .get(&existing_collection)
2040 .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;
2041
2042 existing.collection_metadata.data_shard
2043 };
2044
2045 let persist_client = self
2046 .persist
2047 .open(self.persist_location.clone())
2048 .await
2049 .unwrap();
2050
2051 let diagnostics = Diagnostics {
2053 shard_name: existing_collection.to_string(),
2054 handle_purpose: "alter_table_desc".to_string(),
2055 };
2056 let expected_schema = expected_version.into();
2058 let schema_result = persist_client
2059 .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
2060 data_shard,
2061 expected_schema,
2062 &new_desc,
2063 &UnitSchema,
2064 diagnostics,
2065 )
2066 .await
2067 .map_err(|e| StorageError::InvalidUsage(e.to_string()))?;
2068 tracing::info!(
2069 ?existing_collection,
2070 ?new_collection,
2071 ?new_desc,
2072 "evolved schema"
2073 );
2074
2075 match schema_result {
2076 CaESchema::Ok(id) => id,
2077 CaESchema::ExpectedMismatch {
2079 schema_id,
2080 key,
2081 val,
2082 } => {
2083 mz_ore::soft_panic_or_log!(
2084 "schema expectation mismatch {schema_id:?}, {key:?}, {val:?}"
2085 );
2086 return Err(StorageError::Generic(anyhow::anyhow!(
2087 "schema expected mismatch, {existing_collection:?}",
2088 )));
2089 }
2090 CaESchema::Incompatible => {
2091 mz_ore::soft_panic_or_log!(
2092 "incompatible schema! {existing_collection} {new_desc:?}"
2093 );
2094 return Err(StorageError::Generic(anyhow::anyhow!(
2095 "schema incompatible, {existing_collection:?}"
2096 )));
2097 }
2098 };
2099
2100 let (write_handle, since_handle) = self
2102 .open_data_handles(
2103 &new_collection,
2104 data_shard,
2105 None,
2106 new_desc.clone(),
2107 &persist_client,
2108 )
2109 .await;
2110
2111 {
2117 let mut self_collections = self.collections.lock().expect("lock poisoned");
2118
2119 let existing = self_collections
2121 .get_mut(&existing_collection)
2122 .expect("existing collection missing");
2123
2124 assert_none!(existing.primary);
2126
2127 existing.primary = Some(new_collection);
2129 existing.storage_dependencies.push(new_collection);
2130
2131 let implied_capability = existing.read_capabilities.frontier().to_owned();
2135 let write_frontier = existing.write_frontier.clone();
2136
2137 let mut changes = ChangeBatch::new();
2144 changes.extend(implied_capability.iter().map(|t| (*t, 1)));
2145
2146 let collection_meta = CollectionMetadata {
2148 persist_location: self.persist_location.clone(),
2149 relation_desc: new_desc.clone(),
2150 data_shard,
2151 txns_shard: Some(self.txns_read.txns_id().clone()),
2152 };
2153 let collection_state = CollectionState::new(
2154 None,
2155 existing.time_dependence.clone(),
2156 existing.ingestion_remap_collection_id.clone(),
2157 implied_capability,
2158 write_frontier,
2159 Vec::new(),
2160 collection_meta,
2161 );
2162
2163 self_collections.insert(new_collection, collection_state);
2165
2166 let mut updates = BTreeMap::from([(new_collection, changes)]);
2167 StorageCollectionsImpl::update_read_capabilities_inner(
2168 &self.cmd_tx,
2169 &mut *self_collections,
2170 &mut updates,
2171 );
2172 };
2173
2174 self.register_handles(new_collection, true, since_handle, write_handle);
2176
2177 info!(%existing_collection, %new_collection, ?new_desc, "altered table");
2178
2179 Ok(())
2180 }
2181
2182 fn drop_collections_unvalidated(
2183 &self,
2184 storage_metadata: &StorageMetadata,
2185 identifiers: Vec<GlobalId>,
2186 ) {
2187 debug!(?identifiers, "drop_collections_unvalidated");
2188
2189 let mut self_collections = self.collections.lock().expect("lock poisoned");
2190
2191 let mut finalized_policies = Vec::new();
2199
2200 for id in identifiers {
2201 let Some(collection) = self_collections.get(&id) else {
2203 continue;
2204 };
2205
2206 if collection.primary.is_none() {
2209 let metadata = storage_metadata.get_collection_shard(id);
2210 mz_ore::soft_assert_or_log!(
2211 matches!(metadata, Err(StorageError::IdentifierMissing(_))),
2212 "dropping {id}, but drop was not synchronized with storage \
2213 controller via `prepare_state`"
2214 );
2215 }
2216
2217 finalized_policies.push((id, ReadPolicy::ValidFrom(Antichain::new())));
2218 }
2219
2220 self.set_read_policies_inner(&mut self_collections, finalized_policies);
2221
2222 drop(self_collections);
2223
2224 self.synchronize_finalized_shards(storage_metadata);
2225 }
2226
2227 fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy)>) {
2228 let mut collections = self.collections.lock().expect("lock poisoned");
2229
2230 if tracing::enabled!(tracing::Level::TRACE) {
2231 let user_capabilities = collections
2232 .iter_mut()
2233 .filter(|(id, _c)| id.is_user())
2234 .map(|(id, c)| {
2235 let updates = c.read_capabilities.updates().cloned().collect_vec();
2236 (*id, c.implied_capability.clone(), updates)
2237 })
2238 .collect_vec();
2239
2240 trace!(?policies, ?user_capabilities, "set_read_policies");
2241 }
2242
2243 self.set_read_policies_inner(&mut collections, policies);
2244
2245 if tracing::enabled!(tracing::Level::TRACE) {
2246 let user_capabilities = collections
2247 .iter_mut()
2248 .filter(|(id, _c)| id.is_user())
2249 .map(|(id, c)| {
2250 let updates = c.read_capabilities.updates().cloned().collect_vec();
2251 (*id, c.implied_capability.clone(), updates)
2252 })
2253 .collect_vec();
2254
2255 trace!(?user_capabilities, "after! set_read_policies");
2256 }
2257 }
2258
2259 fn acquire_read_holds(
2260 &self,
2261 desired_holds: Vec<GlobalId>,
2262 ) -> Result<Vec<ReadHold>, CollectionMissing> {
2263 if desired_holds.is_empty() {
2264 return Ok(vec![]);
2265 }
2266
2267 let mut collections = self.collections.lock().expect("lock poisoned");
2268
2269 let mut advanced_holds = Vec::new();
2270 for id in desired_holds.iter() {
2281 let collection = collections.get(id).ok_or(CollectionMissing(*id))?;
2282 let since = collection.read_capabilities.frontier().to_owned();
2283 advanced_holds.push((*id, since));
2284 }
2285
2286 let mut updates = advanced_holds
2287 .iter()
2288 .map(|(id, hold)| {
2289 let mut changes = ChangeBatch::new();
2290 changes.extend(hold.iter().map(|time| (*time, 1)));
2291 (*id, changes)
2292 })
2293 .collect::<BTreeMap<_, _>>();
2294
2295 StorageCollectionsImpl::update_read_capabilities_inner(
2296 &self.cmd_tx,
2297 &mut collections,
2298 &mut updates,
2299 );
2300
2301 let acquired_holds = advanced_holds
2302 .into_iter()
2303 .map(|(id, since)| ReadHold::with_channel(id, since, self.holds_tx.clone()))
2304 .collect_vec();
2305
2306 trace!(?desired_holds, ?acquired_holds, "acquire_read_holds");
2307
2308 Ok(acquired_holds)
2309 }
2310
2311 fn determine_time_dependence(
2313 &self,
2314 id: GlobalId,
2315 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
2316 use TimeDependenceError::CollectionMissing;
2317 let collections = self.collections.lock().expect("lock poisoned");
2318 let state = collections.get(&id).ok_or(CollectionMissing(id))?;
2319 Ok(state.time_dependence.clone())
2320 }
2321
2322 fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2323 let Self {
2325 envd_epoch,
2326 read_only,
2327 finalizable_shards,
2328 finalized_shards,
2329 collections,
2330 txns_read: _,
2331 config,
2332 initial_txn_upper,
2333 persist_location,
2334 persist: _,
2335 cmd_tx: _,
2336 holds_tx: _,
2337 _background_task: _,
2338 _finalize_shards_task: _,
2339 } = self;
2340
2341 let finalizable_shards: Vec<_> = finalizable_shards
2342 .lock()
2343 .iter()
2344 .map(ToString::to_string)
2345 .collect();
2346 let finalized_shards: Vec<_> = finalized_shards
2347 .lock()
2348 .iter()
2349 .map(ToString::to_string)
2350 .collect();
2351 let collections: BTreeMap<_, _> = collections
2352 .lock()
2353 .expect("poisoned")
2354 .iter()
2355 .map(|(id, c)| (id.to_string(), format!("{c:?}")))
2356 .collect();
2357 let config = format!("{:?}", config.lock().expect("poisoned"));
2358
2359 Ok(serde_json::json!({
2360 "envd_epoch": envd_epoch,
2361 "read_only": read_only,
2362 "finalizable_shards": finalizable_shards,
2363 "finalized_shards": finalized_shards,
2364 "collections": collections,
2365 "config": config,
2366 "initial_txn_upper": initial_txn_upper,
2367 "persist_location": format!("{persist_location:?}"),
2368 }))
2369 }
2370}
2371
2372#[derive(Debug)]
2379enum SinceHandleWrapper {
2380 Critical(SinceHandle<SourceData, (), Timestamp, StorageDiff>),
2381 Leased(ReadHandle<SourceData, (), Timestamp, StorageDiff>),
2382}
2383
2384impl SinceHandleWrapper {
2385 pub fn since(&self) -> &Antichain<Timestamp> {
2386 match self {
2387 Self::Critical(handle) => handle.since(),
2388 Self::Leased(handle) => handle.since(),
2389 }
2390 }
2391
2392 pub fn opaque(&self) -> PersistEpoch {
2393 match self {
2394 Self::Critical(handle) => handle.opaque().decode(),
2395 Self::Leased(_handle) => {
2396 PersistEpoch(None)
2401 }
2402 }
2403 }
2404
2405 pub async fn compare_and_downgrade_since(
2406 &mut self,
2407 expected: &PersistEpoch,
2408 (opaque, since): (&PersistEpoch, &Antichain<Timestamp>),
2409 ) -> Result<Antichain<Timestamp>, PersistEpoch> {
2410 match self {
2411 Self::Critical(handle) => handle
2412 .compare_and_downgrade_since(
2413 &Opaque::encode(expected),
2414 (&Opaque::encode(opaque), since),
2415 )
2416 .await
2417 .map_err(|e| e.decode()),
2418 Self::Leased(handle) => {
2419 assert_none!(opaque.0);
2420
2421 handle.downgrade_since(since).await;
2422
2423 Ok(since.clone())
2424 }
2425 }
2426 }
2427
2428 pub async fn maybe_compare_and_downgrade_since(
2429 &mut self,
2430 expected: &PersistEpoch,
2431 (opaque, since): (&PersistEpoch, &Antichain<Timestamp>),
2432 ) -> Option<Result<Antichain<Timestamp>, PersistEpoch>> {
2433 match self {
2434 Self::Critical(handle) => handle
2435 .maybe_compare_and_downgrade_since(
2436 &Opaque::encode(expected),
2437 (&Opaque::encode(opaque), since),
2438 )
2439 .await
2440 .map(|r| r.map_err(|o| o.decode())),
2441 Self::Leased(handle) => {
2442 assert_none!(opaque.0);
2443
2444 handle.maybe_downgrade_since(since).await;
2445
2446 Some(Ok(since.clone()))
2447 }
2448 }
2449 }
2450
2451 pub fn snapshot_stats(
2452 &self,
2453 id: GlobalId,
2454 as_of: Option<Antichain<Timestamp>>,
2455 ) -> BoxFuture<'static, Result<SnapshotStats, StorageError>> {
2456 match self {
2457 Self::Critical(handle) => {
2458 let res = handle
2459 .snapshot_stats(as_of)
2460 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2461 Box::pin(res)
2462 }
2463 Self::Leased(handle) => {
2464 let res = handle
2465 .snapshot_stats(as_of)
2466 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2467 Box::pin(res)
2468 }
2469 }
2470 }
2471
2472 pub fn snapshot_stats_from_txn(
2473 &self,
2474 id: GlobalId,
2475 data_snapshot: DataSnapshot<Timestamp>,
2476 ) -> BoxFuture<'static, Result<SnapshotStats, StorageError>> {
2477 match self {
2478 Self::Critical(handle) => Box::pin(
2479 data_snapshot
2480 .snapshot_stats_from_critical(handle)
2481 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2482 ),
2483 Self::Leased(handle) => Box::pin(
2484 data_snapshot
2485 .snapshot_stats_from_leased(handle)
2486 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2487 ),
2488 }
2489 }
2490}
2491
2492#[derive(Debug, Clone)]
2494struct CollectionState {
2495 primary: Option<GlobalId>,
2502
2503 time_dependence: Option<TimeDependence>,
2505 ingestion_remap_collection_id: Option<GlobalId>,
2507
2508 pub read_capabilities: MutableAntichain<Timestamp>,
2514
2515 pub implied_capability: Antichain<Timestamp>,
2519
2520 pub read_policy: ReadPolicy,
2522
2523 pub storage_dependencies: Vec<GlobalId>,
2525
2526 pub write_frontier: Antichain<Timestamp>,
2528
2529 pub collection_metadata: CollectionMetadata,
2530}
2531
2532impl CollectionState {
2533 pub fn new(
2536 primary: Option<GlobalId>,
2537 time_dependence: Option<TimeDependence>,
2538 ingestion_remap_collection_id: Option<GlobalId>,
2539 since: Antichain<Timestamp>,
2540 write_frontier: Antichain<Timestamp>,
2541 storage_dependencies: Vec<GlobalId>,
2542 metadata: CollectionMetadata,
2543 ) -> Self {
2544 let mut read_capabilities = MutableAntichain::new();
2545 read_capabilities.update_iter(since.iter().map(|time| (*time, 1)));
2546 Self {
2547 primary,
2548 time_dependence,
2549 ingestion_remap_collection_id,
2550 read_capabilities,
2551 implied_capability: since.clone(),
2552 read_policy: ReadPolicy::NoPolicy {
2553 initial_since: since,
2554 },
2555 storage_dependencies,
2556 write_frontier,
2557 collection_metadata: metadata,
2558 }
2559 }
2560
2561 pub fn is_dropped(&self) -> bool {
2563 self.read_capabilities.is_empty()
2564 }
2565}
2566
2567#[derive(Debug)]
2573struct BackgroundTask {
2574 config: Arc<Mutex<StorageConfiguration>>,
2575 cmds_tx: mpsc::UnboundedSender<BackgroundCmd>,
2576 cmds_rx: mpsc::UnboundedReceiver<BackgroundCmd>,
2577 holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch<Timestamp>)>,
2578 finalizable_shards: Arc<ShardIdSet>,
2579 collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState>>>,
2580 shard_by_id: BTreeMap<GlobalId, ShardId>,
2583 since_handles: BTreeMap<GlobalId, SinceHandleWrapper>,
2584 txns_handle: Option<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
2585 txns_shards: BTreeSet<GlobalId>,
2586}
2587
2588#[derive(Debug)]
2589enum BackgroundCmd {
2590 Register {
2591 id: GlobalId,
2592 is_in_txns: bool,
2593 write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
2594 since_handle: SinceHandleWrapper,
2595 },
2596 DowngradeSince(Vec<(GlobalId, Antichain<Timestamp>)>),
2597 SnapshotStats(
2598 GlobalId,
2599 SnapshotStatsAsOf,
2600 oneshot::Sender<SnapshotStatsRes>,
2601 ),
2602}
2603
2604pub(crate) struct SnapshotStatsRes(BoxFuture<'static, Result<SnapshotStats, StorageError>>);
2606
2607impl Debug for SnapshotStatsRes {
2608 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2609 f.debug_struct("SnapshotStatsRes").finish_non_exhaustive()
2610 }
2611}
2612
2613impl BackgroundTask {
2614 async fn run(&mut self) {
2615 let mut upper_futures: FuturesUnordered<
2617 std::pin::Pin<
2618 Box<
2619 dyn Future<
2620 Output = (
2621 GlobalId,
2622 WriteHandle<SourceData, (), Timestamp, StorageDiff>,
2623 Antichain<Timestamp>,
2624 ),
2625 > + Send,
2626 >,
2627 >,
2628 > = FuturesUnordered::new();
2629
2630 let gen_upper_future =
2631 |id, mut handle: WriteHandle<_, _, _, _>, prev_upper: Antichain<Timestamp>| {
2632 let fut = async move {
2633 soft_assert_or_log!(
2634 !prev_upper.is_empty(),
2635 "cannot await progress when upper is already empty"
2636 );
2637 handle.wait_for_upper_past(&prev_upper).await;
2638 let new_upper = handle.shared_upper();
2639 (id, handle, new_upper)
2640 };
2641
2642 fut
2643 };
2644
2645 let mut txns_upper_future = match self.txns_handle.take() {
2646 Some(txns_handle) => {
2647 let upper = txns_handle.upper().clone();
2648 let txns_upper_future =
2649 gen_upper_future(GlobalId::Transient(1), txns_handle, upper);
2650 txns_upper_future.boxed()
2651 }
2652 None => async { std::future::pending().await }.boxed(),
2653 };
2654
2655 loop {
2656 tokio::select! {
2657 (id, handle, upper) = &mut txns_upper_future => {
2658 trace!("new upper from txns shard: {:?}", upper);
2659 let mut uppers = Vec::new();
2660 for id in self.txns_shards.iter() {
2661 uppers.push((*id, &upper));
2662 }
2663 self.update_write_frontiers(&uppers).await;
2664
2665 let fut = gen_upper_future(id, handle, upper);
2666 txns_upper_future = fut.boxed();
2667 }
2668 Some((id, handle, upper)) = upper_futures.next() => {
2669 if id.is_user() {
2670 trace!("new upper for collection {id}: {:?}", upper);
2671 }
2672 let current_shard = self.shard_by_id.get(&id);
2673 if let Some(shard_id) = current_shard {
2674 if shard_id == &handle.shard_id() {
2675 let uppers = &[(id, &upper)];
2678 self.update_write_frontiers(uppers).await;
2679 if !upper.is_empty() {
2680 let fut = gen_upper_future(id, handle, upper);
2681 upper_futures.push(fut.boxed());
2682 }
2683 } else {
2684 handle.expire().await;
2688 }
2689 }
2690 }
2691 cmd = self.cmds_rx.recv() => {
2692 let Some(cmd) = cmd else {
2693 break;
2695 };
2696
2697 let commands = iter::once(cmd).chain(
2701 iter::from_fn(|| self.cmds_rx.try_recv().ok())
2702 );
2703 let mut downgrades = BTreeMap::<_, Antichain<_>>::new();
2704 for cmd in commands {
2705 match cmd {
2706 BackgroundCmd::Register{
2707 id,
2708 is_in_txns,
2709 write_handle,
2710 since_handle
2711 } => {
2712 debug!("registering handles for {}", id);
2713 let previous = self.shard_by_id.insert(id, write_handle.shard_id());
2714 if previous.is_some() {
2715 panic!("already registered a WriteHandle for collection {id}");
2716 }
2717
2718 let previous = self.since_handles.insert(id, since_handle);
2719 if previous.is_some() {
2720 panic!("already registered a SinceHandle for collection {id}");
2721 }
2722
2723 if is_in_txns {
2724 self.txns_shards.insert(id);
2725 } else {
2726 let upper = write_handle.upper().clone();
2727 if !upper.is_empty() {
2728 let fut = gen_upper_future(id, write_handle, upper);
2729 upper_futures.push(fut.boxed());
2730 }
2731 }
2732 }
2733 BackgroundCmd::DowngradeSince(cmds) => {
2734 for (id, new) in cmds {
2735 downgrades.entry(id)
2736 .and_modify(|since| since.join_assign(&new))
2737 .or_insert(new);
2738 }
2739 }
2740 BackgroundCmd::SnapshotStats(id, as_of, tx) => {
2741 let res = match self.since_handles.get(&id) {
2747 Some(x) => {
2748 let fut: BoxFuture<
2749 'static,
2750 Result<SnapshotStats, StorageError>,
2751 > = match as_of {
2752 SnapshotStatsAsOf::Direct(as_of) => {
2753 x.snapshot_stats(id, Some(as_of))
2754 }
2755 SnapshotStatsAsOf::Txns(data_snapshot) => {
2756 x.snapshot_stats_from_txn(id, data_snapshot)
2757 }
2758 };
2759 SnapshotStatsRes(fut)
2760 }
2761 None => SnapshotStatsRes(Box::pin(futures::future::ready(Err(
2762 StorageError::IdentifierMissing(id),
2763 )))),
2764 };
2765 let _ = tx.send(res);
2767 }
2768 }
2769 }
2770
2771 if !downgrades.is_empty() {
2772 self.downgrade_sinces(downgrades).await;
2773 }
2774 }
2775 Some(holds_changes) = self.holds_rx.recv() => {
2776 let mut batched_changes = BTreeMap::new();
2777 batched_changes.insert(holds_changes.0, holds_changes.1);
2778
2779 while let Ok(mut holds_changes) = self.holds_rx.try_recv() {
2780 let entry = batched_changes.entry(holds_changes.0);
2781 entry
2782 .and_modify(|existing| existing.extend(holds_changes.1.drain()))
2783 .or_insert_with(|| holds_changes.1);
2784 }
2785
2786 let mut collections = self.collections.lock().expect("lock poisoned");
2787
2788 let user_changes = batched_changes
2789 .iter()
2790 .filter(|(id, _c)| id.is_user())
2791 .map(|(id, c)| {
2792 (id.clone(), c.clone())
2793 })
2794 .collect_vec();
2795
2796 if !user_changes.is_empty() {
2797 trace!(?user_changes, "applying holds changes from channel");
2798 }
2799
2800 StorageCollectionsImpl::update_read_capabilities_inner(
2801 &self.cmds_tx,
2802 &mut collections,
2803 &mut batched_changes,
2804 );
2805 }
2806 }
2807 }
2808
2809 warn!("BackgroundTask shutting down");
2810 }
2811
2812 #[instrument(level = "debug")]
2813 async fn update_write_frontiers(&self, updates: &[(GlobalId, &Antichain<Timestamp>)]) {
2814 let mut read_capability_changes = BTreeMap::default();
2815
2816 let mut self_collections = self.collections.lock().expect("lock poisoned");
2817
2818 for (id, new_upper) in updates.iter() {
2819 let collection = if let Some(c) = self_collections.get_mut(id) {
2820 c
2821 } else {
2822 trace!(
2823 "Reference to absent collection {id}, due to concurrent removal of that collection"
2824 );
2825 continue;
2826 };
2827
2828 if PartialOrder::less_than(&collection.write_frontier, *new_upper) {
2829 collection.write_frontier.clone_from(new_upper);
2830 }
2831
2832 let mut new_read_capability = collection
2833 .read_policy
2834 .frontier(collection.write_frontier.borrow());
2835
2836 if id.is_user() {
2837 trace!(
2838 %id,
2839 implied_capability = ?collection.implied_capability,
2840 policy = ?collection.read_policy,
2841 write_frontier = ?collection.write_frontier,
2842 ?new_read_capability,
2843 "update_write_frontiers");
2844 }
2845
2846 if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
2847 let mut update = ChangeBatch::new();
2848 update.extend(new_read_capability.iter().map(|time| (*time, 1)));
2849 std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
2850 update.extend(new_read_capability.iter().map(|time| (*time, -1)));
2851
2852 if !update.is_empty() {
2853 read_capability_changes.insert(*id, update);
2854 }
2855 }
2856 }
2857
2858 if !read_capability_changes.is_empty() {
2859 StorageCollectionsImpl::update_read_capabilities_inner(
2860 &self.cmds_tx,
2861 &mut self_collections,
2862 &mut read_capability_changes,
2863 );
2864 }
2865 }
2866
2867 async fn downgrade_sinces(&mut self, cmds: BTreeMap<GlobalId, Antichain<Timestamp>>) {
2868 let mut futures = Vec::with_capacity(cmds.len());
2870 for (id, new_since) in cmds {
2871 let Some(mut since_handle) = self.since_handles.remove(&id) else {
2874 trace!("downgrade_sinces: reference to absent collection {id}");
2876 continue;
2877 };
2878
2879 let fut = async move {
2880 if id.is_user() {
2881 trace!("downgrading since of {} to {:?}", id, new_since);
2882 }
2883
2884 let epoch = since_handle.opaque().clone();
2885 let result = if new_since.is_empty() {
2886 Some(
2890 since_handle
2891 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2892 .await,
2893 )
2894 } else {
2895 since_handle
2896 .maybe_compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2897 .await
2898 };
2899 (id, since_handle, result)
2900 };
2901 futures.push(fut);
2902 }
2903
2904 for (id, since_handle, result) in futures::future::join_all(futures).await {
2905 let new_since = match result {
2906 Some(Ok(since)) => Some(since),
2907 Some(Err(other_epoch)) => mz_ore::halt!(
2908 "fenced by envd @ {other_epoch:?}. ours = {:?}",
2909 since_handle.opaque(),
2910 ),
2911 None => None,
2912 };
2913
2914 self.since_handles.insert(id, since_handle);
2915
2916 if new_since.is_some_and(|s| s.is_empty()) {
2917 info!(%id, "removing persist handles because the since advanced to []!");
2918
2919 let _since_handle = self.since_handles.remove(&id).expect("known to exist");
2920 let Some(dropped_shard_id) = self.shard_by_id.remove(&id) else {
2921 panic!("missing GlobalId -> ShardId mapping for id {id}");
2922 };
2923
2924 self.txns_shards.remove(&id);
2929
2930 if self
2931 .config
2932 .lock()
2933 .expect("lock poisoned")
2934 .parameters
2935 .finalize_shards
2936 {
2937 info!(
2938 %id, %dropped_shard_id,
2939 "enqueuing shard finalization due to dropped collection and dropped \
2940 persist handle",
2941 );
2942 self.finalizable_shards.lock().insert(dropped_shard_id);
2943 } else {
2944 info!(
2945 "not triggering shard finalization due to dropped storage object \
2946 because enable_storage_shard_finalization parameter is false"
2947 );
2948 }
2949 }
2950 }
2951 }
2952}
2953
2954struct FinalizeShardsTaskConfig {
2955 envd_epoch: NonZeroI64,
2956 config: Arc<Mutex<StorageConfiguration>>,
2957 metrics: StorageCollectionsMetrics,
2958 finalizable_shards: Arc<ShardIdSet>,
2959 finalized_shards: Arc<ShardIdSet>,
2960 persist_location: PersistLocation,
2961 persist: Arc<PersistClientCache>,
2962 read_only: bool,
2963}
2964
2965async fn finalize_shards_task(
2966 FinalizeShardsTaskConfig {
2967 envd_epoch,
2968 config,
2969 metrics,
2970 finalizable_shards,
2971 finalized_shards,
2972 persist_location,
2973 persist,
2974 read_only,
2975 }: FinalizeShardsTaskConfig,
2976) {
2977 if read_only {
2978 info!("disabling shard finalization in read only mode");
2979 return;
2980 }
2981
2982 let mut interval = tokio::time::interval(Duration::from_secs(5));
2983 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
2984 loop {
2985 interval.tick().await;
2986
2987 if !config
2988 .lock()
2989 .expect("lock poisoned")
2990 .parameters
2991 .finalize_shards
2992 {
2993 debug!(
2994 "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
2995 );
2996 continue;
2997 }
2998
2999 let current_finalizable_shards = {
3000 finalizable_shards.lock().iter().cloned().collect_vec()
3003 };
3004
3005 if current_finalizable_shards.is_empty() {
3006 debug!("no shards to finalize");
3007 continue;
3008 }
3009
3010 debug!(?current_finalizable_shards, "attempting to finalize shards");
3011
3012 let persist_client = persist.open(persist_location.clone()).await.unwrap();
3014
3015 let metrics = &metrics;
3016 let finalizable_shards = &finalizable_shards;
3017 let finalized_shards = &finalized_shards;
3018 let persist_client = &persist_client;
3019 let diagnostics = &Diagnostics::from_purpose("finalizing shards");
3020
3021 let force_downgrade_since = STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION
3022 .get(config.lock().expect("lock poisoned").config_set());
3023
3024 let epoch = &PersistEpoch::from(envd_epoch);
3025
3026 futures::stream::iter(current_finalizable_shards.clone())
3027 .map(|shard_id| async move {
3028 let persist_client = persist_client.clone();
3029 let diagnostics = diagnostics.clone();
3030 let epoch = epoch.clone();
3031
3032 metrics.finalization_started.inc();
3033
3034 let is_finalized = persist_client
3035 .is_finalized::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics)
3036 .await
3037 .expect("invalid persist usage");
3038
3039 if is_finalized {
3040 debug!(%shard_id, "shard is already finalized!");
3041 Some(shard_id)
3042 } else {
3043 debug!(%shard_id, "finalizing shard");
3044 let finalize = || async move {
3045 let diagnostics = Diagnostics::from_purpose("finalizing shards");
3047
3048 let mut write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff> =
3051 persist_client
3052 .open_writer(
3053 shard_id,
3054 Arc::new(RelationDesc::empty()),
3055 Arc::new(UnitSchema),
3056 diagnostics,
3057 )
3058 .await
3059 .expect("invalid persist usage");
3060 write_handle.advance_upper(&Antichain::new()).await;
3061 write_handle.expire().await;
3062
3063 if force_downgrade_since {
3064 let our_opaque = Opaque::encode(&epoch);
3065 let mut since_handle: SinceHandle<
3066 SourceData,
3067 (),
3068 Timestamp,
3069 StorageDiff,
3070 > = persist_client
3071 .open_critical_since(
3072 shard_id,
3073 PersistClient::CONTROLLER_CRITICAL_SINCE,
3074 our_opaque.clone(),
3075 Diagnostics::from_purpose("finalizing shards"),
3076 )
3077 .await
3078 .expect("invalid persist usage");
3079 let handle_opaque = since_handle.opaque().clone();
3080 let opaque = if our_opaque.codec_name() == handle_opaque.codec_name()
3081 && epoch.0 > handle_opaque.decode::<PersistEpoch>().0
3082 {
3083 handle_opaque
3086 } else {
3087 our_opaque
3093 };
3094 let new_since = Antichain::new();
3095 let downgrade = since_handle
3096 .compare_and_downgrade_since(&opaque, (&opaque, &new_since))
3097 .await;
3098 if let Err(e) = downgrade {
3099 warn!("tried to finalize a shard with an advancing epoch: {e:?}");
3100 return Ok(());
3101 }
3102 }
3105
3106 persist_client
3107 .finalize_shard::<SourceData, (), Timestamp, StorageDiff>(
3108 shard_id,
3109 Diagnostics::from_purpose("finalizing shards"),
3110 )
3111 .await
3112 };
3113
3114 match finalize().await {
3115 Err(e) => {
3116 warn!("error during finalization of shard {shard_id}: {e:?}");
3119 None
3120 }
3121 Ok(()) => {
3122 debug!(%shard_id, "finalize success!");
3123 Some(shard_id)
3124 }
3125 }
3126 }
3127 })
3128 .buffer_unordered(10)
3133 .for_each(|shard_id| async move {
3137 match shard_id {
3138 None => metrics.finalization_failed.inc(),
3139 Some(shard_id) => {
3140 {
3147 let mut finalizable_shards = finalizable_shards.lock();
3148 let mut finalized_shards = finalized_shards.lock();
3149 finalizable_shards.remove(&shard_id);
3150 finalized_shards.insert(shard_id);
3151 }
3152
3153 metrics.finalization_succeeded.inc();
3154 }
3155 }
3156 })
3157 .await;
3158
3159 debug!("done finalizing shards");
3160 }
3161}
3162
3163#[derive(Debug)]
3164pub(crate) enum SnapshotStatsAsOf {
3165 Direct(Antichain<Timestamp>),
3168 Txns(DataSnapshot<Timestamp>),
3171}
3172
3173#[cfg(test)]
3174mod tests {
3175 use std::str::FromStr;
3176 use std::sync::Arc;
3177
3178 use mz_build_info::DUMMY_BUILD_INFO;
3179 use mz_dyncfg::ConfigSet;
3180 use mz_ore::assert_err;
3181 use mz_ore::metrics::{MetricsRegistry, UIntGauge};
3182 use mz_ore::now::SYSTEM_TIME;
3183 use mz_ore::url::SensitiveUrl;
3184 use mz_persist_client::cache::PersistClientCache;
3185 use mz_persist_client::cfg::PersistConfig;
3186 use mz_persist_client::rpc::PubSubClientConnection;
3187 use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
3188 use mz_persist_types::codec_impls::UnitSchema;
3189 use mz_repr::{RelationDesc, Row};
3190 use mz_secrets::InMemorySecretsController;
3191
3192 use super::*;
3193
3194 #[mz_ore::test(tokio::test)]
3195 #[cfg_attr(miri, ignore)] async fn test_snapshot_stats(&self) {
3197 let persist_location = PersistLocation {
3198 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3199 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3200 };
3201 let persist_client = PersistClientCache::new(
3202 PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
3203 &MetricsRegistry::new(),
3204 |_, _| PubSubClientConnection::noop(),
3205 );
3206 let persist_client = Arc::new(persist_client);
3207
3208 let (cmds_tx, mut background_task) =
3209 BackgroundTask::new_for_test(persist_location.clone(), Arc::clone(&persist_client));
3210 let background_task =
3211 mz_ore::task::spawn(|| "storage_collections::background_task", async move {
3212 background_task.run().await
3213 });
3214
3215 let persist = persist_client.open(persist_location).await.unwrap();
3216
3217 let shard_id = ShardId::new();
3218 let since_handle = persist
3219 .open_critical_since(
3220 shard_id,
3221 PersistClient::CONTROLLER_CRITICAL_SINCE,
3222 Opaque::encode(&PersistEpoch::default()),
3223 Diagnostics::for_tests(),
3224 )
3225 .await
3226 .unwrap();
3227 let write_handle = persist
3228 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3229 shard_id,
3230 Arc::new(RelationDesc::empty()),
3231 Arc::new(UnitSchema),
3232 Diagnostics::for_tests(),
3233 )
3234 .await
3235 .unwrap();
3236
3237 cmds_tx
3238 .send(BackgroundCmd::Register {
3239 id: GlobalId::User(1),
3240 is_in_txns: false,
3241 since_handle: SinceHandleWrapper::Critical(since_handle),
3242 write_handle,
3243 })
3244 .unwrap();
3245
3246 let mut write_handle = persist
3247 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3248 shard_id,
3249 Arc::new(RelationDesc::empty()),
3250 Arc::new(UnitSchema),
3251 Diagnostics::for_tests(),
3252 )
3253 .await
3254 .unwrap();
3255
3256 let stats =
3258 snapshot_stats(&cmds_tx, GlobalId::User(2), Antichain::from_elem(0.into())).await;
3259 assert_err!(stats);
3260
3261 let stats_fut = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3263 assert_none!(stats_fut.now_or_never());
3264
3265 let stats_ts1_fut =
3267 snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3268
3269 let data = (
3271 (SourceData(Ok(Row::default())), ()),
3272 mz_repr::Timestamp::from(0),
3273 1i64,
3274 );
3275 let () = write_handle
3276 .compare_and_append(
3277 &[data],
3278 Antichain::from_elem(0.into()),
3279 Antichain::from_elem(1.into()),
3280 )
3281 .await
3282 .unwrap()
3283 .unwrap();
3284
3285 let stats = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(0.into()))
3287 .await
3288 .unwrap();
3289 assert_eq!(stats.num_updates, 1);
3290
3291 let data = (
3293 (SourceData(Ok(Row::default())), ()),
3294 mz_repr::Timestamp::from(1),
3295 1i64,
3296 );
3297 let () = write_handle
3298 .compare_and_append(
3299 &[data],
3300 Antichain::from_elem(1.into()),
3301 Antichain::from_elem(2.into()),
3302 )
3303 .await
3304 .unwrap()
3305 .unwrap();
3306
3307 let stats = stats_ts1_fut.await.unwrap();
3308 assert_eq!(stats.num_updates, 2);
3309
3310 drop(background_task);
3312 }
3313
3314 async fn snapshot_stats(
3315 cmds_tx: &mpsc::UnboundedSender<BackgroundCmd>,
3316 id: GlobalId,
3317 as_of: Antichain<Timestamp>,
3318 ) -> Result<SnapshotStats, StorageError> {
3319 let (tx, rx) = oneshot::channel();
3320 cmds_tx
3321 .send(BackgroundCmd::SnapshotStats(
3322 id,
3323 SnapshotStatsAsOf::Direct(as_of),
3324 tx,
3325 ))
3326 .unwrap();
3327 let res = rx.await.expect("BackgroundTask should be live").0;
3328
3329 res.await
3330 }
3331
3332 impl BackgroundTask {
3333 fn new_for_test(
3334 _persist_location: PersistLocation,
3335 _persist_client: Arc<PersistClientCache>,
3336 ) -> (mpsc::UnboundedSender<BackgroundCmd>, Self) {
3337 let (cmds_tx, cmds_rx) = mpsc::unbounded_channel();
3338 let (_holds_tx, holds_rx) = mpsc::unbounded_channel();
3339 let connection_context =
3340 ConnectionContext::for_tests(Arc::new(InMemorySecretsController::new()));
3341
3342 let task = Self {
3343 config: Arc::new(Mutex::new(StorageConfiguration::new(
3344 connection_context,
3345 ConfigSet::default(),
3346 ))),
3347 cmds_tx: cmds_tx.clone(),
3348 cmds_rx,
3349 holds_rx,
3350 finalizable_shards: Arc::new(ShardIdSet::new(
3351 UIntGauge::new("finalizable_shards", "dummy gauge for tests").unwrap(),
3352 )),
3353 collections: Arc::new(Mutex::new(BTreeMap::new())),
3354 shard_by_id: BTreeMap::new(),
3355 since_handles: BTreeMap::new(),
3356 txns_handle: None,
3357 txns_shards: BTreeSet::new(),
3358 };
3359
3360 (cmds_tx, task)
3361 }
3362 }
3363}