1use std::cmp::Reverse;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::iter;
16use std::num::NonZeroI64;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::future::BoxFuture;
23use futures::stream::{BoxStream, FuturesUnordered};
24use futures::{Future, FutureExt, StreamExt};
25use itertools::Itertools;
26use mz_ore::collections::CollectionExt;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::now::NowFn;
29use mz_ore::task::AbortOnDropHandle;
30use mz_ore::{assert_none, instrument, soft_assert_or_log};
31use mz_persist_client::cache::PersistClientCache;
32use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
33use mz_persist_client::critical::{Opaque, SinceHandle};
34use mz_persist_client::read::{Cursor, ReadHandle};
35use mz_persist_client::schema::CaESchema;
36use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
37use mz_persist_client::write::WriteHandle;
38use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
39use mz_persist_types::codec_impls::UnitSchema;
40use mz_persist_types::txn::TxnsCodec;
41use mz_repr::{GlobalId, RelationDesc, RelationVersion, Row, Timestamp};
42use mz_storage_types::StorageDiff;
43use mz_storage_types::configuration::StorageConfiguration;
44use mz_storage_types::connections::ConnectionContext;
45use mz_storage_types::controller::{CollectionMetadata, StorageError, TxnsCodecRow};
46use mz_storage_types::dyncfgs::STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION;
47use mz_storage_types::errors::CollectionMissing;
48use mz_storage_types::parameters::StorageParameters;
49use mz_storage_types::read_holds::ReadHold;
50use mz_storage_types::read_policy::ReadPolicy;
51use mz_storage_types::sources::{GenericSourceConnection, SourceData, SourceEnvelope, Timeline};
52use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
53use mz_txn_wal::metrics::Metrics as TxnMetrics;
54use mz_txn_wal::txn_read::{DataSnapshot, TxnsRead};
55use mz_txn_wal::txns::TxnsHandle;
56use timely::PartialOrder;
57use timely::progress::frontier::MutableAntichain;
58use timely::progress::{Antichain, ChangeBatch};
59use tokio::sync::{mpsc, oneshot};
60use tokio::time::MissedTickBehavior;
61use tracing::{debug, info, trace, warn};
62
63use crate::client::TimestamplessUpdateBuilder;
64use crate::controller::{
65 CollectionDescription, DataSource, PersistEpoch, StorageMetadata, StorageTxn,
66};
67use crate::storage_collections::metrics::{ShardIdSet, StorageCollectionsMetrics};
68
69mod metrics;
70
71#[async_trait]
85pub trait StorageCollections: Debug + Sync {
86 async fn initialize_state(
93 &self,
94 txn: &mut (dyn StorageTxn + Send),
95 init_ids: BTreeSet<GlobalId>,
96 ) -> Result<(), StorageError>;
97
98 fn update_parameters(&self, config_params: StorageParameters);
100
101 fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing>;
103
104 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
110
111 fn collection_frontiers(&self, id: GlobalId) -> Result<CollectionFrontiers, CollectionMissing> {
113 let frontiers = self
114 .collections_frontiers(vec![id])?
115 .expect_element(|| "known to exist");
116
117 Ok(frontiers)
118 }
119
120 fn collections_frontiers(
123 &self,
124 id: Vec<GlobalId>,
125 ) -> Result<Vec<CollectionFrontiers>, CollectionMissing>;
126
127 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers>;
132
133 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError>;
136
137 async fn snapshot_stats(
140 &self,
141 id: GlobalId,
142 as_of: Antichain<Timestamp>,
143 ) -> Result<SnapshotStats, StorageError>;
144
145 async fn snapshot_parts_stats(
154 &self,
155 id: GlobalId,
156 as_of: Antichain<Timestamp>,
157 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError>>;
158
159 fn snapshot(
161 &self,
162 id: GlobalId,
163 as_of: Timestamp,
164 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError>>;
165
166 async fn snapshot_latest(&self, id: GlobalId) -> Result<Vec<Row>, StorageError>;
169
170 fn snapshot_cursor(
172 &self,
173 id: GlobalId,
174 as_of: Timestamp,
175 ) -> BoxFuture<'static, Result<SnapshotCursor, StorageError>>;
176
177 fn snapshot_and_stream(
182 &self,
183 id: GlobalId,
184 as_of: Timestamp,
185 ) -> BoxFuture<
186 'static,
187 Result<BoxStream<'static, (SourceData, Timestamp, StorageDiff)>, StorageError>,
188 >;
189
190 fn create_update_builder(
193 &self,
194 id: GlobalId,
195 ) -> BoxFuture<
196 'static,
197 Result<TimestamplessUpdateBuilder<SourceData, (), StorageDiff>, StorageError>,
198 >;
199
200 async fn prepare_state(
206 &self,
207 txn: &mut (dyn StorageTxn + Send),
208 ids_to_add: BTreeSet<GlobalId>,
209 ids_to_drop: BTreeSet<GlobalId>,
210 ids_to_register: BTreeMap<GlobalId, ShardId>,
211 ) -> Result<(), StorageError>;
212
213 async fn create_collections_for_bootstrap(
239 &self,
240 storage_metadata: &StorageMetadata,
241 register_ts: Option<Timestamp>,
242 collections: Vec<(GlobalId, CollectionDescription)>,
243 migrated_storage_collections: &BTreeSet<GlobalId>,
244 ) -> Result<(), StorageError>;
245
246 async fn alter_table_desc(
248 &self,
249 existing_collection: GlobalId,
250 new_collection: GlobalId,
251 new_desc: RelationDesc,
252 expected_version: RelationVersion,
253 ) -> Result<(), StorageError>;
254
255 fn drop_collections_unvalidated(
267 &self,
268 storage_metadata: &StorageMetadata,
269 identifiers: Vec<GlobalId>,
270 );
271
272 fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy)>);
286
287 fn acquire_read_holds(
290 &self,
291 desired_holds: Vec<GlobalId>,
292 ) -> Result<Vec<ReadHold>, CollectionMissing>;
293
294 fn determine_time_dependence(
297 &self,
298 id: GlobalId,
299 ) -> Result<Option<TimeDependence>, TimeDependenceError>;
300
301 fn dump(&self) -> Result<serde_json::Value, anyhow::Error>;
303}
304
305pub struct SnapshotCursor {
308 pub _read_handle: ReadHandle<SourceData, (), Timestamp, StorageDiff>,
311 pub cursor: Cursor<SourceData, (), Timestamp, StorageDiff>,
312}
313
314impl SnapshotCursor {
315 pub async fn next(
316 &mut self,
317 ) -> Option<impl Iterator<Item = (SourceData, Timestamp, StorageDiff)> + Sized + '_> {
318 let iter = self.cursor.next().await?;
319 Some(iter.map(|((k, ()), t, d)| (k, t, d)))
320 }
321}
322
323#[derive(Debug)]
325pub struct CollectionFrontiers {
326 pub id: GlobalId,
328
329 pub write_frontier: Antichain<Timestamp>,
331
332 pub implied_capability: Antichain<Timestamp>,
339
340 pub read_capabilities: Antichain<Timestamp>,
343}
344
345#[derive(Debug, Clone)]
348pub struct StorageCollectionsImpl {
349 envd_epoch: NonZeroI64,
352
353 read_only: bool,
359
360 finalizable_shards: Arc<ShardIdSet>,
363
364 finalized_shards: Arc<ShardIdSet>,
369
370 collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState>>>,
372
373 txns_read: TxnsRead<Timestamp>,
375
376 config: Arc<Mutex<StorageConfiguration>>,
378
379 initial_txn_upper: Antichain<Timestamp>,
388
389 persist_location: PersistLocation,
391
392 persist: Arc<PersistClientCache>,
394
395 cmd_tx: mpsc::UnboundedSender<BackgroundCmd>,
397
398 holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<Timestamp>)>,
400
401 _background_task: Arc<AbortOnDropHandle<()>>,
403 _finalize_shards_task: Arc<AbortOnDropHandle<()>>,
404}
405
406type SourceDataStream = BoxStream<'static, (SourceData, Timestamp, StorageDiff)>;
415
416impl StorageCollectionsImpl {
419 pub async fn new(
427 persist_location: PersistLocation,
428 persist_clients: Arc<PersistClientCache>,
429 metrics_registry: &MetricsRegistry,
430 _now: NowFn,
431 txns_metrics: Arc<TxnMetrics>,
432 envd_epoch: NonZeroI64,
433 read_only: bool,
434 connection_context: ConnectionContext,
435 txn: &dyn StorageTxn,
436 ) -> Self {
437 let metrics = StorageCollectionsMetrics::register_into(metrics_registry);
438
439 let txns_id = txn
443 .get_txn_wal_shard()
444 .expect("must call prepare initialization before creating StorageCollections");
445
446 let txns_client = persist_clients
447 .open(persist_location.clone())
448 .await
449 .expect("location should be valid");
450
451 let _txns_handle: TxnsHandle<SourceData, (), Timestamp, StorageDiff, TxnsCodecRow> =
454 TxnsHandle::open(
455 Timestamp::MIN,
456 txns_client.clone(),
457 txns_client.dyncfgs().clone(),
458 Arc::clone(&txns_metrics),
459 txns_id,
460 Opaque::encode(&PersistEpoch::default()),
461 )
462 .await;
463
464 let (txns_key_schema, txns_val_schema) = TxnsCodecRow::schemas();
466 let mut txns_write = txns_client
467 .open_writer(
468 txns_id,
469 Arc::new(txns_key_schema),
470 Arc::new(txns_val_schema),
471 Diagnostics {
472 shard_name: "txns".to_owned(),
473 handle_purpose: "commit txns".to_owned(),
474 },
475 )
476 .await
477 .expect("txns schema shouldn't change");
478
479 let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
480
481 let collections = Arc::new(std::sync::Mutex::new(BTreeMap::default()));
482 let finalizable_shards =
483 Arc::new(ShardIdSet::new(metrics.finalization_outstanding.clone()));
484 let finalized_shards =
485 Arc::new(ShardIdSet::new(metrics.finalization_pending_commit.clone()));
486 let config = Arc::new(Mutex::new(StorageConfiguration::new(
487 connection_context,
488 mz_dyncfgs::all_dyncfgs(),
489 )));
490
491 let initial_txn_upper = txns_write.fetch_recent_upper().await.to_owned();
492
493 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
494 let (holds_tx, holds_rx) = mpsc::unbounded_channel();
495 let mut background_task = BackgroundTask {
496 config: Arc::clone(&config),
497 cmds_tx: cmd_tx.clone(),
498 cmds_rx: cmd_rx,
499 holds_rx,
500 collections: Arc::clone(&collections),
501 finalizable_shards: Arc::clone(&finalizable_shards),
502 shard_by_id: BTreeMap::new(),
503 since_handles: BTreeMap::new(),
504 txns_handle: Some(txns_write),
505 txns_shards: Default::default(),
506 };
507
508 let background_task =
509 mz_ore::task::spawn(|| "storage_collections::background_task", async move {
510 background_task.run().await
511 });
512
513 let finalize_shards_task = mz_ore::task::spawn(
514 || "storage_collections::finalize_shards_task",
515 finalize_shards_task(FinalizeShardsTaskConfig {
516 envd_epoch: envd_epoch.clone(),
517 config: Arc::clone(&config),
518 metrics,
519 finalizable_shards: Arc::clone(&finalizable_shards),
520 finalized_shards: Arc::clone(&finalized_shards),
521 persist_location: persist_location.clone(),
522 persist: Arc::clone(&persist_clients),
523 read_only,
524 }),
525 );
526
527 Self {
528 finalizable_shards,
529 finalized_shards,
530 collections,
531 txns_read,
532 envd_epoch,
533 read_only,
534 config,
535 initial_txn_upper,
536 persist_location,
537 persist: persist_clients,
538 cmd_tx,
539 holds_tx,
540 _background_task: Arc::new(background_task.abort_on_drop()),
541 _finalize_shards_task: Arc::new(finalize_shards_task.abort_on_drop()),
542 }
543 }
544
545 async fn open_data_handles(
553 &self,
554 id: &GlobalId,
555 shard: ShardId,
556 since: Option<&Antichain<Timestamp>>,
557 relation_desc: RelationDesc,
558 persist_client: &PersistClient,
559 ) -> (
560 WriteHandle<SourceData, (), Timestamp, StorageDiff>,
561 SinceHandleWrapper,
562 ) {
563 let since_handle = if self.read_only {
564 let read_handle = self
565 .open_leased_handle(id, shard, relation_desc.clone(), since, persist_client)
566 .await;
567 SinceHandleWrapper::Leased(read_handle)
568 } else {
569 persist_client
572 .upgrade_version::<SourceData, (), Timestamp, StorageDiff>(
573 shard,
574 Diagnostics {
575 shard_name: id.to_string(),
576 handle_purpose: format!("controller data for {}", id),
577 },
578 )
579 .await
580 .expect("invalid persist usage");
581
582 let since_handle = self
583 .open_critical_handle(id, shard, since, persist_client)
584 .await;
585
586 SinceHandleWrapper::Critical(since_handle)
587 };
588
589 let mut write_handle = self
590 .open_write_handle(id, shard, relation_desc, persist_client)
591 .await;
592
593 write_handle.fetch_recent_upper().await;
604
605 (write_handle, since_handle)
606 }
607
608 async fn open_write_handle(
610 &self,
611 id: &GlobalId,
612 shard: ShardId,
613 relation_desc: RelationDesc,
614 persist_client: &PersistClient,
615 ) -> WriteHandle<SourceData, (), Timestamp, StorageDiff> {
616 let diagnostics = Diagnostics {
617 shard_name: id.to_string(),
618 handle_purpose: format!("controller data for {}", id),
619 };
620
621 let write = persist_client
622 .open_writer(
623 shard,
624 Arc::new(relation_desc),
625 Arc::new(UnitSchema),
626 diagnostics.clone(),
627 )
628 .await
629 .expect("invalid persist usage");
630
631 write
632 }
633
634 async fn open_critical_handle(
642 &self,
643 id: &GlobalId,
644 shard: ShardId,
645 since: Option<&Antichain<Timestamp>>,
646 persist_client: &PersistClient,
647 ) -> SinceHandle<SourceData, (), Timestamp, StorageDiff> {
648 tracing::debug!(%id, ?since, "opening critical handle");
649
650 assert!(
651 !self.read_only,
652 "attempting to open critical SinceHandle in read-only mode"
653 );
654
655 let diagnostics = Diagnostics {
656 shard_name: id.to_string(),
657 handle_purpose: format!("controller data for {}", id),
658 };
659
660 let since_handle = {
663 let mut handle = persist_client
666 .open_critical_since(
667 shard,
668 PersistClient::CONTROLLER_CRITICAL_SINCE,
669 Opaque::encode(&PersistEpoch::default()),
670 diagnostics.clone(),
671 )
672 .await
673 .expect("invalid persist usage");
674
675 let provided_since = match since {
679 Some(since) => since,
680 None => &Antichain::from_elem(Timestamp::MIN),
681 };
682 let since = handle.since().join(provided_since);
683
684 let our_epoch = self.envd_epoch;
685
686 loop {
687 let current_epoch: PersistEpoch = handle.opaque().decode();
688
689 let unchecked_success = current_epoch.0.map(|e| e <= our_epoch).unwrap_or(true);
691
692 if unchecked_success {
693 let checked_success = handle
696 .compare_and_downgrade_since(
697 &Opaque::encode(¤t_epoch),
698 (&Opaque::encode(&PersistEpoch::from(our_epoch)), &since),
699 )
700 .await
701 .is_ok();
702 if checked_success {
703 break handle;
704 }
705 } else {
706 mz_ore::halt!("fenced by envd @ {current_epoch:?}. ours = {our_epoch}");
707 }
708 }
709 };
710
711 since_handle
712 }
713
714 async fn open_leased_handle(
720 &self,
721 id: &GlobalId,
722 shard: ShardId,
723 relation_desc: RelationDesc,
724 since: Option<&Antichain<Timestamp>>,
725 persist_client: &PersistClient,
726 ) -> ReadHandle<SourceData, (), Timestamp, StorageDiff> {
727 tracing::debug!(%id, ?since, "opening leased handle");
728
729 let diagnostics = Diagnostics {
730 shard_name: id.to_string(),
731 handle_purpose: format!("controller data for {}", id),
732 };
733
734 let use_critical_since = false;
735 let mut handle: ReadHandle<_, _, _, _> = persist_client
736 .open_leased_reader(
737 shard,
738 Arc::new(relation_desc),
739 Arc::new(UnitSchema),
740 diagnostics.clone(),
741 use_critical_since,
742 )
743 .await
744 .expect("invalid persist usage");
745
746 let provided_since = match since {
750 Some(since) => since,
751 None => &Antichain::from_elem(Timestamp::MIN),
752 };
753 let since = handle.since().join(provided_since);
754
755 handle.downgrade_since(&since).await;
756
757 handle
758 }
759
760 fn register_handles(
761 &self,
762 id: GlobalId,
763 is_in_txns: bool,
764 since_handle: SinceHandleWrapper,
765 write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
766 ) {
767 self.send(BackgroundCmd::Register {
768 id,
769 is_in_txns,
770 since_handle,
771 write_handle,
772 });
773 }
774
775 fn send(&self, cmd: BackgroundCmd) {
776 let _ = self.cmd_tx.send(cmd);
777 }
778
779 async fn snapshot_stats_inner(
780 &self,
781 id: GlobalId,
782 as_of: SnapshotStatsAsOf,
783 ) -> Result<SnapshotStats, StorageError> {
784 let (tx, rx) = oneshot::channel();
791 self.send(BackgroundCmd::SnapshotStats(id, as_of, tx));
792 rx.await.expect("BackgroundTask should be live").0.await
793 }
794
795 fn install_collection_dependency_read_holds_inner(
801 &self,
802 self_collections: &mut BTreeMap<GlobalId, CollectionState>,
803 id: GlobalId,
804 ) -> Result<(), StorageError> {
805 let (deps, collection_implied_capability) = match self_collections.get(&id) {
806 Some(CollectionState {
807 storage_dependencies: deps,
808 implied_capability,
809 ..
810 }) => (deps.clone(), implied_capability),
811 _ => return Ok(()),
812 };
813
814 for dep in deps.iter() {
815 let dep_collection = self_collections
816 .get(dep)
817 .ok_or(StorageError::IdentifierMissing(id))?;
818
819 mz_ore::soft_assert_or_log!(
820 PartialOrder::less_equal(
821 &dep_collection.implied_capability,
822 collection_implied_capability
823 ),
824 "dependency since ({dep}@{:?}) cannot be in advance of dependent's since ({id}@{:?})",
825 dep_collection.implied_capability,
826 collection_implied_capability,
827 );
828 }
829
830 self.install_read_capabilities_inner(
831 self_collections,
832 id,
833 &deps,
834 collection_implied_capability.clone(),
835 )?;
836
837 Ok(())
838 }
839
840 fn determine_collection_dependencies(
842 self_collections: &BTreeMap<GlobalId, CollectionState>,
843 source_id: GlobalId,
844 collection_desc: &CollectionDescription,
845 ) -> Result<Vec<GlobalId>, StorageError> {
846 let mut dependencies = Vec::new();
847
848 if let Some(id) = collection_desc.primary {
849 dependencies.push(id);
850 }
851
852 match &collection_desc.data_source {
853 DataSource::Introspection(_)
854 | DataSource::Webhook
855 | DataSource::Table
856 | DataSource::Progress
857 | DataSource::Other => (),
858 DataSource::IngestionExport {
859 ingestion_id,
860 data_config,
861 ..
862 } => {
863 let source = self_collections
866 .get(ingestion_id)
867 .ok_or(StorageError::IdentifierMissing(*ingestion_id))?;
868 let Some(remap_collection_id) = &source.ingestion_remap_collection_id else {
869 panic!("SourceExport must refer to a primary source that already exists");
870 };
871
872 match data_config.envelope {
873 SourceEnvelope::CdcV2 => (),
874 _ => dependencies.push(*remap_collection_id),
875 }
876 }
877 DataSource::Ingestion(ingestion) => {
879 if ingestion.remap_collection_id != source_id {
880 dependencies.push(ingestion.remap_collection_id);
881 }
882 }
883 DataSource::Sink { desc } => dependencies.push(desc.sink.from),
884 }
885
886 Ok(dependencies)
887 }
888
889 #[instrument(level = "debug")]
891 fn install_read_capabilities_inner(
892 &self,
893 self_collections: &mut BTreeMap<GlobalId, CollectionState>,
894 from_id: GlobalId,
895 storage_dependencies: &[GlobalId],
896 read_capability: Antichain<Timestamp>,
897 ) -> Result<(), StorageError> {
898 let mut changes = ChangeBatch::new();
899 for time in read_capability.iter() {
900 changes.update(*time, 1);
901 }
902
903 if tracing::span_enabled!(tracing::Level::TRACE) {
904 let user_capabilities = self_collections
906 .iter_mut()
907 .filter(|(id, _c)| id.is_user())
908 .map(|(id, c)| {
909 let updates = c.read_capabilities.updates().cloned().collect_vec();
910 (*id, c.implied_capability.clone(), updates)
911 })
912 .collect_vec();
913
914 trace!(
915 %from_id,
916 ?storage_dependencies,
917 ?read_capability,
918 ?user_capabilities,
919 "install_read_capabilities_inner");
920 }
921
922 let mut storage_read_updates = storage_dependencies
923 .iter()
924 .map(|id| (*id, changes.clone()))
925 .collect();
926
927 StorageCollectionsImpl::update_read_capabilities_inner(
928 &self.cmd_tx,
929 self_collections,
930 &mut storage_read_updates,
931 );
932
933 if tracing::span_enabled!(tracing::Level::TRACE) {
934 let user_capabilities = self_collections
936 .iter_mut()
937 .filter(|(id, _c)| id.is_user())
938 .map(|(id, c)| {
939 let updates = c.read_capabilities.updates().cloned().collect_vec();
940 (*id, c.implied_capability.clone(), updates)
941 })
942 .collect_vec();
943
944 trace!(
945 %from_id,
946 ?storage_dependencies,
947 ?read_capability,
948 ?user_capabilities,
949 "after install_read_capabilities_inner!");
950 }
951
952 Ok(())
953 }
954
955 async fn recent_upper(&self, id: GlobalId) -> Result<Antichain<Timestamp>, StorageError> {
956 let metadata = &self.collection_metadata(id)?;
957 let persist_client = self
958 .persist
959 .open(metadata.persist_location.clone())
960 .await
961 .unwrap();
962 let diagnostics = Diagnostics {
965 shard_name: id.to_string(),
966 handle_purpose: format!("controller data for {}", id),
967 };
968 let write = persist_client
971 .open_writer::<SourceData, (), Timestamp, StorageDiff>(
972 metadata.data_shard,
973 Arc::new(metadata.relation_desc.clone()),
974 Arc::new(UnitSchema),
975 diagnostics.clone(),
976 )
977 .await
978 .expect("invalid persist usage");
979 Ok(write.shared_upper())
980 }
981
982 async fn read_handle_for_snapshot(
983 persist: Arc<PersistClientCache>,
984 metadata: &CollectionMetadata,
985 id: GlobalId,
986 ) -> Result<ReadHandle<SourceData, (), Timestamp, StorageDiff>, StorageError> {
987 let persist_client = persist
988 .open(metadata.persist_location.clone())
989 .await
990 .unwrap();
991
992 let read_handle = persist_client
998 .open_leased_reader::<SourceData, (), _, _>(
999 metadata.data_shard,
1000 Arc::new(metadata.relation_desc.clone()),
1001 Arc::new(UnitSchema),
1002 Diagnostics {
1003 shard_name: id.to_string(),
1004 handle_purpose: format!("snapshot {}", id),
1005 },
1006 USE_CRITICAL_SINCE_SNAPSHOT.get(&persist.cfg),
1007 )
1008 .await
1009 .expect("invalid persist usage");
1010 Ok(read_handle)
1011 }
1012
1013 fn snapshot(
1014 &self,
1015 id: GlobalId,
1016 as_of: Timestamp,
1017 txns_read: &TxnsRead<Timestamp>,
1018 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError>> {
1019 let metadata = match self.collection_metadata(id) {
1020 Ok(metadata) => metadata.clone(),
1021 Err(e) => return async { Err(e.into()) }.boxed(),
1022 };
1023 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1024 assert_eq!(txns_id, txns_read.txns_id());
1025 txns_read.clone()
1026 });
1027 let persist = Arc::clone(&self.persist);
1028 async move {
1029 let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1030 let contents = match txns_read {
1031 None => {
1032 read_handle
1034 .snapshot_and_fetch(Antichain::from_elem(as_of))
1035 .await
1036 }
1037 Some(txns_read) => {
1038 txns_read.update_gt(as_of).await;
1052 let data_snapshot = txns_read.data_snapshot(metadata.data_shard, as_of).await;
1053 data_snapshot.snapshot_and_fetch(&mut read_handle).await
1054 }
1055 };
1056 match contents {
1057 Ok(contents) => {
1058 let mut snapshot = Vec::with_capacity(contents.len());
1059 for ((data, _), _, diff) in contents {
1060 let row = data.0?;
1063 snapshot.push((row, diff));
1064 }
1065 Ok(snapshot)
1066 }
1067 Err(_) => Err(StorageError::ReadBeforeSince(id)),
1068 }
1069 }
1070 .boxed()
1071 }
1072
1073 fn snapshot_and_stream(
1074 &self,
1075 id: GlobalId,
1076 as_of: Timestamp,
1077 txns_read: &TxnsRead<Timestamp>,
1078 ) -> BoxFuture<'static, Result<SourceDataStream, StorageError>> {
1079 use futures::stream::StreamExt;
1080
1081 let metadata = match self.collection_metadata(id) {
1082 Ok(metadata) => metadata.clone(),
1083 Err(e) => return async { Err(e.into()) }.boxed(),
1084 };
1085 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1086 assert_eq!(txns_id, txns_read.txns_id());
1087 txns_read.clone()
1088 });
1089 let persist = Arc::clone(&self.persist);
1090
1091 async move {
1092 let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1093 let stream = match txns_read {
1094 None => {
1095 read_handle
1097 .snapshot_and_stream(Antichain::from_elem(as_of))
1098 .await
1099 .map_err(|_| StorageError::ReadBeforeSince(id))?
1100 .boxed()
1101 }
1102 Some(txns_read) => {
1103 txns_read.update_gt(as_of).await;
1104 let data_snapshot = txns_read.data_snapshot(metadata.data_shard, as_of).await;
1105 data_snapshot
1106 .snapshot_and_stream(&mut read_handle)
1107 .await
1108 .map_err(|_| StorageError::ReadBeforeSince(id))?
1109 .boxed()
1110 }
1111 };
1112
1113 let stream = stream.map(|((data, _v), t, d)| (data, t, d)).boxed();
1115 Ok(stream)
1116 }
1117 .boxed()
1118 }
1119
1120 fn set_read_policies_inner(
1121 &self,
1122 collections: &mut BTreeMap<GlobalId, CollectionState>,
1123 policies: Vec<(GlobalId, ReadPolicy)>,
1124 ) {
1125 trace!("set_read_policies: {:?}", policies);
1126
1127 let mut read_capability_changes = BTreeMap::default();
1128
1129 for (id, policy) in policies.into_iter() {
1130 let collection = match collections.get_mut(&id) {
1131 Some(c) => c,
1132 None => {
1133 panic!("Reference to absent collection {id}");
1134 }
1135 };
1136
1137 let mut new_read_capability = policy.frontier(collection.write_frontier.borrow());
1138
1139 if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
1140 let mut update = ChangeBatch::new();
1141 update.extend(new_read_capability.iter().map(|time| (*time, 1)));
1142 std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
1143 update.extend(new_read_capability.iter().map(|time| (*time, -1)));
1144 if !update.is_empty() {
1145 read_capability_changes.insert(id, update);
1146 }
1147 }
1148
1149 collection.read_policy = policy;
1150 }
1151
1152 for (id, changes) in read_capability_changes.iter() {
1153 if id.is_user() {
1154 trace!(%id, ?changes, "in set_read_policies, capability changes");
1155 }
1156 }
1157
1158 if !read_capability_changes.is_empty() {
1159 StorageCollectionsImpl::update_read_capabilities_inner(
1160 &self.cmd_tx,
1161 collections,
1162 &mut read_capability_changes,
1163 );
1164 }
1165 }
1166
1167 fn update_read_capabilities_inner(
1171 cmd_tx: &mpsc::UnboundedSender<BackgroundCmd>,
1172 collections: &mut BTreeMap<GlobalId, CollectionState>,
1173 updates: &mut BTreeMap<GlobalId, ChangeBatch<Timestamp>>,
1174 ) {
1175 let mut collections_net = BTreeMap::new();
1177
1178 while let Some(id) = updates.keys().rev().next().cloned() {
1183 let mut update = updates.remove(&id).unwrap();
1184
1185 if id.is_user() {
1186 trace!(id = ?id, update = ?update, "update_read_capabilities");
1187 }
1188
1189 let collection = if let Some(c) = collections.get_mut(&id) {
1190 c
1191 } else {
1192 let has_positive_updates = update.iter().any(|(_ts, diff)| *diff > 0);
1193 if has_positive_updates {
1194 panic!(
1195 "reference to absent collection {id} but we have positive updates: {:?}",
1196 update
1197 );
1198 } else {
1199 continue;
1202 }
1203 };
1204
1205 let current_read_capabilities = collection.read_capabilities.frontier().to_owned();
1206 for (time, diff) in update.iter() {
1207 assert!(
1208 collection.read_capabilities.count_for(time) + diff >= 0,
1209 "update {:?} for collection {id} would lead to negative \
1210 read capabilities, read capabilities before applying: {:?}",
1211 update,
1212 collection.read_capabilities
1213 );
1214
1215 if collection.read_capabilities.count_for(time) + diff > 0 {
1216 assert!(
1217 current_read_capabilities.less_equal(time),
1218 "update {:?} for collection {id} is trying to \
1219 install read capabilities before the current \
1220 frontier of read capabilities, read capabilities before applying: {:?}",
1221 update,
1222 collection.read_capabilities
1223 );
1224 }
1225 }
1226
1227 let changes = collection.read_capabilities.update_iter(update.drain());
1228 update.extend(changes);
1229
1230 if id.is_user() {
1231 trace!(
1232 %id,
1233 ?collection.storage_dependencies,
1234 ?update,
1235 "forwarding update to storage dependencies");
1236 }
1237
1238 for id in collection.storage_dependencies.iter() {
1239 updates
1240 .entry(*id)
1241 .or_insert_with(ChangeBatch::new)
1242 .extend(update.iter().cloned());
1243 }
1244
1245 let (changes, frontier) = collections_net
1246 .entry(id)
1247 .or_insert_with(|| (<ChangeBatch<_>>::new(), Antichain::new()));
1248
1249 changes.extend(update.drain());
1250 *frontier = collection.read_capabilities.frontier().to_owned();
1251 }
1252
1253 let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
1256 for (key, (mut changes, frontier)) in collections_net {
1257 if !changes.is_empty() {
1258 let collection = collections.get(&key).expect("must still exist");
1260 let should_emit_persist_compaction = collection.primary.is_none();
1261
1262 if frontier.is_empty() {
1263 info!(id = %key, "removing collection state because the since advanced to []!");
1264 collections.remove(&key).expect("must still exist");
1265 }
1266
1267 if should_emit_persist_compaction {
1268 persist_compaction_commands.push((key, frontier));
1269 }
1270 }
1271 }
1272
1273 if !persist_compaction_commands.is_empty() {
1274 cmd_tx
1275 .send(BackgroundCmd::DowngradeSince(persist_compaction_commands))
1276 .expect("cannot fail to send");
1277 }
1278 }
1279
1280 fn synchronize_finalized_shards(&self, storage_metadata: &StorageMetadata) {
1282 self.finalized_shards
1283 .lock()
1284 .retain(|shard| storage_metadata.unfinalized_shards.contains(shard));
1285 }
1286}
1287
1288#[async_trait]
1290impl StorageCollections for StorageCollectionsImpl {
1291 async fn initialize_state(
1292 &self,
1293 txn: &mut (dyn StorageTxn + Send),
1294 init_ids: BTreeSet<GlobalId>,
1295 ) -> Result<(), StorageError> {
1296 let metadata = txn.get_collection_metadata();
1297 let existing_metadata: BTreeSet<_> = metadata.into_iter().map(|(id, _)| id).collect();
1298
1299 let new_collections: BTreeSet<GlobalId> =
1301 init_ids.difference(&existing_metadata).cloned().collect();
1302
1303 self.prepare_state(
1304 txn,
1305 new_collections,
1306 BTreeSet::default(),
1307 BTreeMap::default(),
1308 )
1309 .await?;
1310
1311 let unfinalized_shards = txn.get_unfinalized_shards().into_iter().collect_vec();
1319
1320 info!(?unfinalized_shards, "initializing finalizable_shards");
1321
1322 self.finalizable_shards.lock().extend(unfinalized_shards);
1323
1324 Ok(())
1325 }
1326
1327 fn update_parameters(&self, config_params: StorageParameters) {
1328 config_params.dyncfg_updates.apply(self.persist.cfg());
1331
1332 self.config
1333 .lock()
1334 .expect("lock poisoned")
1335 .update(config_params);
1336 }
1337
1338 fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing> {
1339 let collections = self.collections.lock().expect("lock poisoned");
1340
1341 collections
1342 .get(&id)
1343 .map(|c| c.collection_metadata.clone())
1344 .ok_or(CollectionMissing(id))
1345 }
1346
1347 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
1348 let collections = self.collections.lock().expect("lock poisoned");
1349
1350 collections
1351 .iter()
1352 .filter(|(_id, c)| !c.is_dropped())
1353 .map(|(id, c)| (*id, c.collection_metadata.clone()))
1354 .collect()
1355 }
1356
1357 fn collections_frontiers(
1358 &self,
1359 ids: Vec<GlobalId>,
1360 ) -> Result<Vec<CollectionFrontiers>, CollectionMissing> {
1361 if ids.is_empty() {
1362 return Ok(vec![]);
1363 }
1364
1365 let collections = self.collections.lock().expect("lock poisoned");
1366
1367 let res = ids
1368 .into_iter()
1369 .map(|id| {
1370 collections
1371 .get(&id)
1372 .map(|c| CollectionFrontiers {
1373 id: id.clone(),
1374 write_frontier: c.write_frontier.clone(),
1375 implied_capability: c.implied_capability.clone(),
1376 read_capabilities: c.read_capabilities.frontier().to_owned(),
1377 })
1378 .ok_or(CollectionMissing(id))
1379 })
1380 .collect::<Result<Vec<_>, _>>()?;
1381
1382 Ok(res)
1383 }
1384
1385 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers> {
1386 let collections = self.collections.lock().expect("lock poisoned");
1387
1388 let res = collections
1389 .iter()
1390 .filter(|(_id, c)| !c.is_dropped())
1391 .map(|(id, c)| CollectionFrontiers {
1392 id: id.clone(),
1393 write_frontier: c.write_frontier.clone(),
1394 implied_capability: c.implied_capability.clone(),
1395 read_capabilities: c.read_capabilities.frontier().to_owned(),
1396 })
1397 .collect_vec();
1398
1399 res
1400 }
1401
1402 async fn snapshot_stats(
1403 &self,
1404 id: GlobalId,
1405 as_of: Antichain<Timestamp>,
1406 ) -> Result<SnapshotStats, StorageError> {
1407 let metadata = self.collection_metadata(id)?;
1408
1409 let as_of = match metadata.txns_shard.as_ref() {
1412 None => SnapshotStatsAsOf::Direct(as_of),
1413 Some(txns_id) => {
1414 assert_eq!(txns_id, self.txns_read.txns_id());
1415 let as_of = as_of
1416 .into_option()
1417 .expect("cannot read as_of the empty antichain");
1418 self.txns_read.update_gt(as_of).await;
1419 let data_snapshot = self
1420 .txns_read
1421 .data_snapshot(metadata.data_shard, as_of)
1422 .await;
1423 SnapshotStatsAsOf::Txns(data_snapshot)
1424 }
1425 };
1426 self.snapshot_stats_inner(id, as_of).await
1427 }
1428
1429 async fn snapshot_parts_stats(
1430 &self,
1431 id: GlobalId,
1432 as_of: Antichain<Timestamp>,
1433 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError>> {
1434 let metadata = {
1435 let self_collections = self.collections.lock().expect("lock poisoned");
1436
1437 let collection_metadata = self_collections
1438 .get(&id)
1439 .ok_or(StorageError::IdentifierMissing(id))
1440 .map(|c| c.collection_metadata.clone());
1441
1442 match collection_metadata {
1443 Ok(m) => m,
1444 Err(e) => return Box::pin(async move { Err(e) }),
1445 }
1446 };
1447
1448 let persist = Arc::clone(&self.persist);
1451 let read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await;
1452
1453 let data_snapshot = match (metadata, as_of.as_option()) {
1454 (
1455 CollectionMetadata {
1456 txns_shard: Some(txns_id),
1457 data_shard,
1458 ..
1459 },
1460 Some(as_of),
1461 ) => {
1462 assert_eq!(txns_id, *self.txns_read.txns_id());
1463 self.txns_read.update_gt(*as_of).await;
1464 let data_snapshot = self.txns_read.data_snapshot(data_shard, *as_of).await;
1465 Some(data_snapshot)
1466 }
1467 _ => None,
1468 };
1469
1470 Box::pin(async move {
1471 let read_handle = read_handle?;
1472 let result = match data_snapshot {
1473 Some(data_snapshot) => data_snapshot.snapshot_parts_stats(&read_handle).await,
1474 None => read_handle.snapshot_parts_stats(as_of).await,
1475 };
1476 read_handle.expire().await;
1477 result.map_err(|_| StorageError::ReadBeforeSince(id))
1478 })
1479 }
1480
1481 fn snapshot(
1482 &self,
1483 id: GlobalId,
1484 as_of: Timestamp,
1485 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError>> {
1486 self.snapshot(id, as_of, &self.txns_read)
1487 }
1488
1489 async fn snapshot_latest(&self, id: GlobalId) -> Result<Vec<Row>, StorageError> {
1490 let upper = self.recent_upper(id).await?;
1491 let res = match upper.as_option() {
1492 Some(f) if f > &Timestamp::MIN => {
1493 let as_of = f.step_back().unwrap();
1494
1495 let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap();
1496 snapshot
1497 .into_iter()
1498 .map(|(row, diff)| {
1499 assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
1500 row
1501 })
1502 .collect()
1503 }
1504 Some(_min) => {
1505 Vec::new()
1507 }
1508 _ => {
1511 return Err(StorageError::InvalidUsage(
1512 "collection closed, cannot determine a read timestamp based on the upper"
1513 .to_string(),
1514 ));
1515 }
1516 };
1517
1518 Ok(res)
1519 }
1520
1521 fn snapshot_cursor(
1522 &self,
1523 id: GlobalId,
1524 as_of: Timestamp,
1525 ) -> BoxFuture<'static, Result<SnapshotCursor, StorageError>> {
1526 let metadata = match self.collection_metadata(id) {
1527 Ok(metadata) => metadata.clone(),
1528 Err(e) => return async { Err(e.into()) }.boxed(),
1529 };
1530 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1531 assert_eq!(txns_id, self.txns_read.txns_id());
1534 self.txns_read.clone()
1535 });
1536 let persist = Arc::clone(&self.persist);
1537
1538 async move {
1540 let mut handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1541 let cursor = match txns_read {
1542 None => {
1543 let cursor = handle
1544 .snapshot_cursor(Antichain::from_elem(as_of), |_| true)
1545 .await
1546 .map_err(|_| StorageError::ReadBeforeSince(id))?;
1547 SnapshotCursor {
1548 _read_handle: handle,
1549 cursor,
1550 }
1551 }
1552 Some(txns_read) => {
1553 txns_read.update_gt(as_of).await;
1554 let data_snapshot = txns_read.data_snapshot(metadata.data_shard, as_of).await;
1555 let cursor = data_snapshot
1556 .snapshot_cursor(&mut handle, |_| true)
1557 .await
1558 .map_err(|_| StorageError::ReadBeforeSince(id))?;
1559 SnapshotCursor {
1560 _read_handle: handle,
1561 cursor,
1562 }
1563 }
1564 };
1565
1566 Ok(cursor)
1567 }
1568 .boxed()
1569 }
1570
1571 fn snapshot_and_stream(
1572 &self,
1573 id: GlobalId,
1574 as_of: Timestamp,
1575 ) -> BoxFuture<
1576 'static,
1577 Result<BoxStream<'static, (SourceData, Timestamp, StorageDiff)>, StorageError>,
1578 > {
1579 self.snapshot_and_stream(id, as_of, &self.txns_read)
1580 }
1581
1582 fn create_update_builder(
1583 &self,
1584 id: GlobalId,
1585 ) -> BoxFuture<
1586 'static,
1587 Result<TimestamplessUpdateBuilder<SourceData, (), StorageDiff>, StorageError>,
1588 > {
1589 let metadata = match self.collection_metadata(id) {
1590 Ok(m) => m,
1591 Err(e) => return Box::pin(async move { Err(e.into()) }),
1592 };
1593 let persist = Arc::clone(&self.persist);
1594
1595 async move {
1596 let persist_client = persist
1597 .open(metadata.persist_location.clone())
1598 .await
1599 .expect("invalid persist usage");
1600 let write_handle = persist_client
1601 .open_writer::<SourceData, (), Timestamp, StorageDiff>(
1602 metadata.data_shard,
1603 Arc::new(metadata.relation_desc.clone()),
1604 Arc::new(UnitSchema),
1605 Diagnostics {
1606 shard_name: id.to_string(),
1607 handle_purpose: format!("create write batch {}", id),
1608 },
1609 )
1610 .await
1611 .expect("invalid persist usage");
1612 let builder = TimestamplessUpdateBuilder::new(&write_handle);
1613
1614 Ok(builder)
1615 }
1616 .boxed()
1617 }
1618
1619 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError> {
1620 let collections = self.collections.lock().expect("lock poisoned");
1621
1622 if collections.contains_key(&id) {
1623 Ok(())
1624 } else {
1625 Err(StorageError::IdentifierMissing(id))
1626 }
1627 }
1628
1629 async fn prepare_state(
1630 &self,
1631 txn: &mut (dyn StorageTxn + Send),
1632 ids_to_add: BTreeSet<GlobalId>,
1633 ids_to_drop: BTreeSet<GlobalId>,
1634 ids_to_register: BTreeMap<GlobalId, ShardId>,
1635 ) -> Result<(), StorageError> {
1636 txn.insert_collection_metadata(
1637 ids_to_add
1638 .into_iter()
1639 .map(|id| (id, ShardId::new()))
1640 .collect(),
1641 )?;
1642 txn.insert_collection_metadata(ids_to_register)?;
1643
1644 let dropped_mappings = txn.delete_collection_metadata(ids_to_drop);
1646
1647 let mut dropped_shards = BTreeSet::new();
1650 {
1651 let collections = self.collections.lock().expect("poisoned");
1652 for (id, shard) in dropped_mappings {
1653 let coll = collections.get(&id).expect("must exist");
1654 if coll.primary.is_none() {
1655 dropped_shards.insert(shard);
1656 }
1657 }
1658 }
1659 txn.insert_unfinalized_shards(dropped_shards)?;
1660
1661 let finalized_shards = self.finalized_shards.lock().iter().copied().collect();
1664 txn.mark_shards_as_finalized(finalized_shards);
1665
1666 Ok(())
1667 }
1668
1669 #[instrument(level = "debug")]
1672 async fn create_collections_for_bootstrap(
1673 &self,
1674 storage_metadata: &StorageMetadata,
1675 register_ts: Option<Timestamp>,
1676 mut collections: Vec<(GlobalId, CollectionDescription)>,
1677 migrated_storage_collections: &BTreeSet<GlobalId>,
1678 ) -> Result<(), StorageError> {
1679 let is_in_txns = |id, metadata: &CollectionMetadata| {
1680 metadata.txns_shard.is_some()
1681 && !(self.read_only && migrated_storage_collections.contains(&id))
1682 };
1683
1684 collections.sort_by_key(|(id, _)| *id);
1689 collections.dedup();
1690 for pos in 1..collections.len() {
1691 if collections[pos - 1].0 == collections[pos].0 {
1692 return Err(StorageError::CollectionIdReused(collections[pos].0));
1693 }
1694 }
1695
1696 let enriched_with_metadata = collections
1699 .into_iter()
1700 .map(|(id, description)| {
1701 let data_shard = storage_metadata.get_collection_shard(id)?;
1702
1703 let txns_shard = description
1707 .data_source
1708 .in_txns()
1709 .then(|| *self.txns_read.txns_id());
1710
1711 let metadata = CollectionMetadata {
1712 persist_location: self.persist_location.clone(),
1713 data_shard,
1714 relation_desc: description.desc.clone(),
1715 txns_shard,
1716 };
1717
1718 Ok((id, description, metadata))
1719 })
1720 .collect_vec();
1721
1722 let persist_client = self
1724 .persist
1725 .open(self.persist_location.clone())
1726 .await
1727 .unwrap();
1728 let persist_client = &persist_client;
1729 use futures::stream::{StreamExt, TryStreamExt};
1732 let this = &*self;
1733 let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
1734 .map(|data: Result<_, StorageError>| {
1735 async move {
1736 let (id, description, metadata) = data?;
1737
1738 debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
1743
1744 let since = if description.primary.is_some() {
1748 None
1749 } else {
1750 description.since.as_ref()
1751 };
1752
1753 let (write, mut since_handle) = this
1754 .open_data_handles(
1755 &id,
1756 metadata.data_shard,
1757 since,
1758 metadata.relation_desc.clone(),
1759 persist_client,
1760 )
1761 .await;
1762
1763 match description.data_source {
1772 DataSource::Introspection(_)
1773 | DataSource::IngestionExport { .. }
1774 | DataSource::Webhook
1775 | DataSource::Ingestion(_)
1776 | DataSource::Progress
1777 | DataSource::Other => {}
1778 DataSource::Sink { .. } => {}
1779 DataSource::Table => {
1780 let register_ts = register_ts.expect(
1781 "caller should have provided a register_ts when creating a table",
1782 );
1783 if since_handle.since().elements() == &[Timestamp::MIN]
1784 && !migrated_storage_collections.contains(&id)
1785 {
1786 debug!("advancing {} to initial since of {:?}", id, register_ts);
1787 let token = since_handle.opaque();
1788 let _ = since_handle
1789 .compare_and_downgrade_since(
1790 &token,
1791 (&token, &Antichain::from_elem(register_ts)),
1792 )
1793 .await;
1794 }
1795 }
1796 }
1797
1798 Ok::<_, StorageError>((id, description, write, since_handle, metadata))
1799 }
1800 })
1801 .buffer_unordered(50)
1803 .try_collect()
1817 .await?;
1818
1819 #[derive(Ord, PartialOrd, Eq, PartialEq)]
1821 enum DependencyOrder {
1822 Table(Reverse<GlobalId>),
1824 Collection(GlobalId),
1826 Sink(GlobalId),
1828 }
1829 to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
1830 DataSource::Table => DependencyOrder::Table(Reverse(*id)),
1831 DataSource::Sink { .. } => DependencyOrder::Sink(*id),
1832 _ => DependencyOrder::Collection(*id),
1833 });
1834
1835 let mut self_collections = self.collections.lock().expect("lock poisoned");
1838
1839 for (id, description, write_handle, since_handle, metadata) in to_register {
1840 let write_frontier = write_handle.upper();
1841 let data_shard_since = since_handle.since().clone();
1842
1843 let storage_dependencies =
1845 Self::determine_collection_dependencies(&*self_collections, id, &description)?;
1846
1847 let initial_since = match storage_dependencies
1849 .iter()
1850 .at_most_one()
1851 .expect("should have at most one dependency")
1852 {
1853 Some(dep) => {
1854 let dependency_collection = self_collections
1855 .get(dep)
1856 .ok_or(StorageError::IdentifierMissing(*dep))?;
1857 let dependency_since = dependency_collection.implied_capability.clone();
1858
1859 if PartialOrder::less_than(&data_shard_since, &dependency_since) {
1870 mz_ore::soft_assert_or_log!(
1889 write_frontier.elements() == &[Timestamp::MIN]
1890 || write_frontier.is_empty()
1891 || PartialOrder::less_than(&dependency_since, write_frontier),
1892 "dependency ({dep}) since has advanced past dependent ({id}) upper \n
1893 dependent ({id}): since {:?}, upper {:?} \n
1894 dependency ({dep}): since {:?}",
1895 data_shard_since,
1896 write_frontier,
1897 dependency_since
1898 );
1899
1900 dependency_since
1901 } else {
1902 data_shard_since
1903 }
1904 }
1905 None => data_shard_since,
1906 };
1907
1908 let time_dependence = {
1910 use DataSource::*;
1911 if let Some(timeline) = &description.timeline
1912 && *timeline != Timeline::EpochMilliseconds
1913 {
1914 None
1916 } else {
1917 match &description.data_source {
1918 Ingestion(ingestion) => {
1919 use GenericSourceConnection::*;
1920 match ingestion.desc.connection {
1921 Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => {
1924 Some(TimeDependence::default())
1925 }
1926 LoadGenerator(_) => None,
1928 }
1929 }
1930 IngestionExport { ingestion_id, .. } => {
1931 let c = self_collections.get(ingestion_id).expect("known to exist");
1932 c.time_dependence.clone()
1933 }
1934 Introspection(_) | Progress | Table { .. } | Webhook { .. } => {
1936 Some(TimeDependence::default())
1937 }
1938 Other => None,
1940 Sink { .. } => None,
1941 }
1942 }
1943 };
1944
1945 let ingestion_remap_collection_id = match &description.data_source {
1946 DataSource::Ingestion(desc) => Some(desc.remap_collection_id),
1947 _ => None,
1948 };
1949
1950 let mut collection_state = CollectionState::new(
1951 description.primary,
1952 time_dependence,
1953 ingestion_remap_collection_id,
1954 initial_since,
1955 write_frontier.clone(),
1956 storage_dependencies,
1957 metadata.clone(),
1958 );
1959
1960 match &description.data_source {
1962 DataSource::Introspection(_) => {
1963 self_collections.insert(id, collection_state);
1964 }
1965 DataSource::Webhook => {
1966 self_collections.insert(id, collection_state);
1967 }
1968 DataSource::IngestionExport { .. } => {
1969 self_collections.insert(id, collection_state);
1970 }
1971 DataSource::Table => {
1972 if is_in_txns(id, &metadata)
1975 && PartialOrder::less_than(
1976 &collection_state.write_frontier,
1977 &self.initial_txn_upper,
1978 )
1979 {
1980 collection_state
1986 .write_frontier
1987 .clone_from(&self.initial_txn_upper);
1988 }
1989 self_collections.insert(id, collection_state);
1990 }
1991 DataSource::Progress | DataSource::Other => {
1992 self_collections.insert(id, collection_state);
1993 }
1994 DataSource::Ingestion(_) => {
1995 self_collections.insert(id, collection_state);
1996 }
1997 DataSource::Sink { .. } => {
1998 self_collections.insert(id, collection_state);
1999 }
2000 }
2001
2002 self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle);
2003
2004 self.install_collection_dependency_read_holds_inner(&mut *self_collections, id)?;
2006 }
2007
2008 drop(self_collections);
2009
2010 self.synchronize_finalized_shards(storage_metadata);
2011
2012 Ok(())
2013 }
2014
2015 async fn alter_table_desc(
2016 &self,
2017 existing_collection: GlobalId,
2018 new_collection: GlobalId,
2019 new_desc: RelationDesc,
2020 expected_version: RelationVersion,
2021 ) -> Result<(), StorageError> {
2022 let data_shard = {
2023 let self_collections = self.collections.lock().expect("lock poisoned");
2024 let existing = self_collections
2025 .get(&existing_collection)
2026 .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;
2027
2028 existing.collection_metadata.data_shard
2029 };
2030
2031 let persist_client = self
2032 .persist
2033 .open(self.persist_location.clone())
2034 .await
2035 .unwrap();
2036
2037 let diagnostics = Diagnostics {
2039 shard_name: existing_collection.to_string(),
2040 handle_purpose: "alter_table_desc".to_string(),
2041 };
2042 let expected_schema = expected_version.into();
2044 let schema_result = persist_client
2045 .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
2046 data_shard,
2047 expected_schema,
2048 &new_desc,
2049 &UnitSchema,
2050 diagnostics,
2051 )
2052 .await
2053 .map_err(|e| StorageError::InvalidUsage(e.to_string()))?;
2054 tracing::info!(
2055 ?existing_collection,
2056 ?new_collection,
2057 ?new_desc,
2058 "evolved schema"
2059 );
2060
2061 match schema_result {
2062 CaESchema::Ok(id) => id,
2063 CaESchema::ExpectedMismatch {
2065 schema_id,
2066 key,
2067 val,
2068 } => {
2069 mz_ore::soft_panic_or_log!(
2070 "schema expectation mismatch {schema_id:?}, {key:?}, {val:?}"
2071 );
2072 return Err(StorageError::Generic(anyhow::anyhow!(
2073 "schema expected mismatch, {existing_collection:?}",
2074 )));
2075 }
2076 CaESchema::Incompatible => {
2077 mz_ore::soft_panic_or_log!(
2078 "incompatible schema! {existing_collection} {new_desc:?}"
2079 );
2080 return Err(StorageError::Generic(anyhow::anyhow!(
2081 "schema incompatible, {existing_collection:?}"
2082 )));
2083 }
2084 };
2085
2086 let (write_handle, since_handle) = self
2088 .open_data_handles(
2089 &new_collection,
2090 data_shard,
2091 None,
2092 new_desc.clone(),
2093 &persist_client,
2094 )
2095 .await;
2096
2097 {
2103 let mut self_collections = self.collections.lock().expect("lock poisoned");
2104
2105 let existing = self_collections
2107 .get_mut(&existing_collection)
2108 .expect("existing collection missing");
2109
2110 assert_none!(existing.primary);
2112
2113 existing.primary = Some(new_collection);
2115 existing.storage_dependencies.push(new_collection);
2116
2117 let implied_capability = existing.read_capabilities.frontier().to_owned();
2121 let write_frontier = existing.write_frontier.clone();
2122
2123 let mut changes = ChangeBatch::new();
2130 changes.extend(implied_capability.iter().map(|t| (*t, 1)));
2131
2132 let collection_meta = CollectionMetadata {
2134 persist_location: self.persist_location.clone(),
2135 relation_desc: new_desc.clone(),
2136 data_shard,
2137 txns_shard: Some(self.txns_read.txns_id().clone()),
2138 };
2139 let collection_state = CollectionState::new(
2140 None,
2141 existing.time_dependence.clone(),
2142 existing.ingestion_remap_collection_id.clone(),
2143 implied_capability,
2144 write_frontier,
2145 Vec::new(),
2146 collection_meta,
2147 );
2148
2149 self_collections.insert(new_collection, collection_state);
2151
2152 let mut updates = BTreeMap::from([(new_collection, changes)]);
2153 StorageCollectionsImpl::update_read_capabilities_inner(
2154 &self.cmd_tx,
2155 &mut *self_collections,
2156 &mut updates,
2157 );
2158 };
2159
2160 self.register_handles(new_collection, true, since_handle, write_handle);
2162
2163 info!(%existing_collection, %new_collection, ?new_desc, "altered table");
2164
2165 Ok(())
2166 }
2167
2168 fn drop_collections_unvalidated(
2169 &self,
2170 storage_metadata: &StorageMetadata,
2171 identifiers: Vec<GlobalId>,
2172 ) {
2173 debug!(?identifiers, "drop_collections_unvalidated");
2174
2175 let mut self_collections = self.collections.lock().expect("lock poisoned");
2176
2177 let mut finalized_policies = Vec::new();
2185
2186 for id in identifiers {
2187 let Some(collection) = self_collections.get(&id) else {
2189 continue;
2190 };
2191
2192 if collection.primary.is_none() {
2195 let metadata = storage_metadata.get_collection_shard(id);
2196 mz_ore::soft_assert_or_log!(
2197 matches!(metadata, Err(StorageError::IdentifierMissing(_))),
2198 "dropping {id}, but drop was not synchronized with storage \
2199 controller via `prepare_state`"
2200 );
2201 }
2202
2203 finalized_policies.push((id, ReadPolicy::ValidFrom(Antichain::new())));
2204 }
2205
2206 self.set_read_policies_inner(&mut self_collections, finalized_policies);
2207
2208 drop(self_collections);
2209
2210 self.synchronize_finalized_shards(storage_metadata);
2211 }
2212
2213 fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy)>) {
2214 let mut collections = self.collections.lock().expect("lock poisoned");
2215
2216 if tracing::enabled!(tracing::Level::TRACE) {
2217 let user_capabilities = collections
2218 .iter_mut()
2219 .filter(|(id, _c)| id.is_user())
2220 .map(|(id, c)| {
2221 let updates = c.read_capabilities.updates().cloned().collect_vec();
2222 (*id, c.implied_capability.clone(), updates)
2223 })
2224 .collect_vec();
2225
2226 trace!(?policies, ?user_capabilities, "set_read_policies");
2227 }
2228
2229 self.set_read_policies_inner(&mut collections, policies);
2230
2231 if tracing::enabled!(tracing::Level::TRACE) {
2232 let user_capabilities = collections
2233 .iter_mut()
2234 .filter(|(id, _c)| id.is_user())
2235 .map(|(id, c)| {
2236 let updates = c.read_capabilities.updates().cloned().collect_vec();
2237 (*id, c.implied_capability.clone(), updates)
2238 })
2239 .collect_vec();
2240
2241 trace!(?user_capabilities, "after! set_read_policies");
2242 }
2243 }
2244
2245 fn acquire_read_holds(
2246 &self,
2247 desired_holds: Vec<GlobalId>,
2248 ) -> Result<Vec<ReadHold>, CollectionMissing> {
2249 if desired_holds.is_empty() {
2250 return Ok(vec![]);
2251 }
2252
2253 let mut collections = self.collections.lock().expect("lock poisoned");
2254
2255 let mut advanced_holds = Vec::new();
2256 for id in desired_holds.iter() {
2267 let collection = collections.get(id).ok_or(CollectionMissing(*id))?;
2268 let since = collection.read_capabilities.frontier().to_owned();
2269 advanced_holds.push((*id, since));
2270 }
2271
2272 let mut updates = advanced_holds
2273 .iter()
2274 .map(|(id, hold)| {
2275 let mut changes = ChangeBatch::new();
2276 changes.extend(hold.iter().map(|time| (*time, 1)));
2277 (*id, changes)
2278 })
2279 .collect::<BTreeMap<_, _>>();
2280
2281 StorageCollectionsImpl::update_read_capabilities_inner(
2282 &self.cmd_tx,
2283 &mut collections,
2284 &mut updates,
2285 );
2286
2287 let acquired_holds = advanced_holds
2288 .into_iter()
2289 .map(|(id, since)| ReadHold::with_channel(id, since, self.holds_tx.clone()))
2290 .collect_vec();
2291
2292 trace!(?desired_holds, ?acquired_holds, "acquire_read_holds");
2293
2294 Ok(acquired_holds)
2295 }
2296
2297 fn determine_time_dependence(
2299 &self,
2300 id: GlobalId,
2301 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
2302 use TimeDependenceError::CollectionMissing;
2303 let collections = self.collections.lock().expect("lock poisoned");
2304 let state = collections.get(&id).ok_or(CollectionMissing(id))?;
2305 Ok(state.time_dependence.clone())
2306 }
2307
2308 fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2309 let Self {
2311 envd_epoch,
2312 read_only,
2313 finalizable_shards,
2314 finalized_shards,
2315 collections,
2316 txns_read: _,
2317 config,
2318 initial_txn_upper,
2319 persist_location,
2320 persist: _,
2321 cmd_tx: _,
2322 holds_tx: _,
2323 _background_task: _,
2324 _finalize_shards_task: _,
2325 } = self;
2326
2327 let finalizable_shards: Vec<_> = finalizable_shards
2328 .lock()
2329 .iter()
2330 .map(ToString::to_string)
2331 .collect();
2332 let finalized_shards: Vec<_> = finalized_shards
2333 .lock()
2334 .iter()
2335 .map(ToString::to_string)
2336 .collect();
2337 let collections: BTreeMap<_, _> = collections
2338 .lock()
2339 .expect("poisoned")
2340 .iter()
2341 .map(|(id, c)| (id.to_string(), format!("{c:?}")))
2342 .collect();
2343 let config = format!("{:?}", config.lock().expect("poisoned"));
2344
2345 Ok(serde_json::json!({
2346 "envd_epoch": envd_epoch,
2347 "read_only": read_only,
2348 "finalizable_shards": finalizable_shards,
2349 "finalized_shards": finalized_shards,
2350 "collections": collections,
2351 "config": config,
2352 "initial_txn_upper": initial_txn_upper,
2353 "persist_location": format!("{persist_location:?}"),
2354 }))
2355 }
2356}
2357
2358#[derive(Debug)]
2365enum SinceHandleWrapper {
2366 Critical(SinceHandle<SourceData, (), Timestamp, StorageDiff>),
2367 Leased(ReadHandle<SourceData, (), Timestamp, StorageDiff>),
2368}
2369
2370impl SinceHandleWrapper {
2371 pub fn since(&self) -> &Antichain<Timestamp> {
2372 match self {
2373 Self::Critical(handle) => handle.since(),
2374 Self::Leased(handle) => handle.since(),
2375 }
2376 }
2377
2378 pub fn opaque(&self) -> PersistEpoch {
2379 match self {
2380 Self::Critical(handle) => handle.opaque().decode(),
2381 Self::Leased(_handle) => {
2382 PersistEpoch(None)
2387 }
2388 }
2389 }
2390
2391 pub async fn compare_and_downgrade_since(
2392 &mut self,
2393 expected: &PersistEpoch,
2394 (opaque, since): (&PersistEpoch, &Antichain<Timestamp>),
2395 ) -> Result<Antichain<Timestamp>, PersistEpoch> {
2396 match self {
2397 Self::Critical(handle) => handle
2398 .compare_and_downgrade_since(
2399 &Opaque::encode(expected),
2400 (&Opaque::encode(opaque), since),
2401 )
2402 .await
2403 .map_err(|e| e.decode()),
2404 Self::Leased(handle) => {
2405 assert_none!(opaque.0);
2406
2407 handle.downgrade_since(since).await;
2408
2409 Ok(since.clone())
2410 }
2411 }
2412 }
2413
2414 pub async fn maybe_compare_and_downgrade_since(
2415 &mut self,
2416 expected: &PersistEpoch,
2417 (opaque, since): (&PersistEpoch, &Antichain<Timestamp>),
2418 ) -> Option<Result<Antichain<Timestamp>, PersistEpoch>> {
2419 match self {
2420 Self::Critical(handle) => handle
2421 .maybe_compare_and_downgrade_since(
2422 &Opaque::encode(expected),
2423 (&Opaque::encode(opaque), since),
2424 )
2425 .await
2426 .map(|r| r.map_err(|o| o.decode())),
2427 Self::Leased(handle) => {
2428 assert_none!(opaque.0);
2429
2430 handle.maybe_downgrade_since(since).await;
2431
2432 Some(Ok(since.clone()))
2433 }
2434 }
2435 }
2436
2437 pub fn snapshot_stats(
2438 &self,
2439 id: GlobalId,
2440 as_of: Option<Antichain<Timestamp>>,
2441 ) -> BoxFuture<'static, Result<SnapshotStats, StorageError>> {
2442 match self {
2443 Self::Critical(handle) => {
2444 let res = handle
2445 .snapshot_stats(as_of)
2446 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2447 Box::pin(res)
2448 }
2449 Self::Leased(handle) => {
2450 let res = handle
2451 .snapshot_stats(as_of)
2452 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2453 Box::pin(res)
2454 }
2455 }
2456 }
2457
2458 pub fn snapshot_stats_from_txn(
2459 &self,
2460 id: GlobalId,
2461 data_snapshot: DataSnapshot<Timestamp>,
2462 ) -> BoxFuture<'static, Result<SnapshotStats, StorageError>> {
2463 match self {
2464 Self::Critical(handle) => Box::pin(
2465 data_snapshot
2466 .snapshot_stats_from_critical(handle)
2467 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2468 ),
2469 Self::Leased(handle) => Box::pin(
2470 data_snapshot
2471 .snapshot_stats_from_leased(handle)
2472 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2473 ),
2474 }
2475 }
2476}
2477
2478#[derive(Debug, Clone)]
2480struct CollectionState {
2481 primary: Option<GlobalId>,
2488
2489 time_dependence: Option<TimeDependence>,
2491 ingestion_remap_collection_id: Option<GlobalId>,
2493
2494 pub read_capabilities: MutableAntichain<Timestamp>,
2500
2501 pub implied_capability: Antichain<Timestamp>,
2505
2506 pub read_policy: ReadPolicy,
2508
2509 pub storage_dependencies: Vec<GlobalId>,
2511
2512 pub write_frontier: Antichain<Timestamp>,
2514
2515 pub collection_metadata: CollectionMetadata,
2516}
2517
2518impl CollectionState {
2519 pub fn new(
2522 primary: Option<GlobalId>,
2523 time_dependence: Option<TimeDependence>,
2524 ingestion_remap_collection_id: Option<GlobalId>,
2525 since: Antichain<Timestamp>,
2526 write_frontier: Antichain<Timestamp>,
2527 storage_dependencies: Vec<GlobalId>,
2528 metadata: CollectionMetadata,
2529 ) -> Self {
2530 let mut read_capabilities = MutableAntichain::new();
2531 read_capabilities.update_iter(since.iter().map(|time| (*time, 1)));
2532 Self {
2533 primary,
2534 time_dependence,
2535 ingestion_remap_collection_id,
2536 read_capabilities,
2537 implied_capability: since.clone(),
2538 read_policy: ReadPolicy::NoPolicy {
2539 initial_since: since,
2540 },
2541 storage_dependencies,
2542 write_frontier,
2543 collection_metadata: metadata,
2544 }
2545 }
2546
2547 pub fn is_dropped(&self) -> bool {
2549 self.read_capabilities.is_empty()
2550 }
2551}
2552
2553#[derive(Debug)]
2559struct BackgroundTask {
2560 config: Arc<Mutex<StorageConfiguration>>,
2561 cmds_tx: mpsc::UnboundedSender<BackgroundCmd>,
2562 cmds_rx: mpsc::UnboundedReceiver<BackgroundCmd>,
2563 holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch<Timestamp>)>,
2564 finalizable_shards: Arc<ShardIdSet>,
2565 collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState>>>,
2566 shard_by_id: BTreeMap<GlobalId, ShardId>,
2569 since_handles: BTreeMap<GlobalId, SinceHandleWrapper>,
2570 txns_handle: Option<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
2571 txns_shards: BTreeSet<GlobalId>,
2572}
2573
2574#[derive(Debug)]
2575enum BackgroundCmd {
2576 Register {
2577 id: GlobalId,
2578 is_in_txns: bool,
2579 write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
2580 since_handle: SinceHandleWrapper,
2581 },
2582 DowngradeSince(Vec<(GlobalId, Antichain<Timestamp>)>),
2583 SnapshotStats(
2584 GlobalId,
2585 SnapshotStatsAsOf,
2586 oneshot::Sender<SnapshotStatsRes>,
2587 ),
2588}
2589
2590pub(crate) struct SnapshotStatsRes(BoxFuture<'static, Result<SnapshotStats, StorageError>>);
2592
2593impl Debug for SnapshotStatsRes {
2594 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2595 f.debug_struct("SnapshotStatsRes").finish_non_exhaustive()
2596 }
2597}
2598
2599impl BackgroundTask {
2600 async fn run(&mut self) {
2601 let mut upper_futures: FuturesUnordered<
2603 std::pin::Pin<
2604 Box<
2605 dyn Future<
2606 Output = (
2607 GlobalId,
2608 WriteHandle<SourceData, (), Timestamp, StorageDiff>,
2609 Antichain<Timestamp>,
2610 ),
2611 > + Send,
2612 >,
2613 >,
2614 > = FuturesUnordered::new();
2615
2616 let gen_upper_future =
2617 |id, mut handle: WriteHandle<_, _, _, _>, prev_upper: Antichain<Timestamp>| {
2618 let fut = async move {
2619 soft_assert_or_log!(
2620 !prev_upper.is_empty(),
2621 "cannot await progress when upper is already empty"
2622 );
2623 handle.wait_for_upper_past(&prev_upper).await;
2624 let new_upper = handle.shared_upper();
2625 (id, handle, new_upper)
2626 };
2627
2628 fut
2629 };
2630
2631 let mut txns_upper_future = match self.txns_handle.take() {
2632 Some(txns_handle) => {
2633 let upper = txns_handle.upper().clone();
2634 let txns_upper_future =
2635 gen_upper_future(GlobalId::Transient(1), txns_handle, upper);
2636 txns_upper_future.boxed()
2637 }
2638 None => async { std::future::pending().await }.boxed(),
2639 };
2640
2641 loop {
2642 tokio::select! {
2643 (id, handle, upper) = &mut txns_upper_future => {
2644 trace!("new upper from txns shard: {:?}", upper);
2645 let mut uppers = Vec::new();
2646 for id in self.txns_shards.iter() {
2647 uppers.push((*id, &upper));
2648 }
2649 self.update_write_frontiers(&uppers).await;
2650
2651 let fut = gen_upper_future(id, handle, upper);
2652 txns_upper_future = fut.boxed();
2653 }
2654 Some((id, handle, upper)) = upper_futures.next() => {
2655 if id.is_user() {
2656 trace!("new upper for collection {id}: {:?}", upper);
2657 }
2658 let current_shard = self.shard_by_id.get(&id);
2659 if let Some(shard_id) = current_shard {
2660 if shard_id == &handle.shard_id() {
2661 let uppers = &[(id, &upper)];
2664 self.update_write_frontiers(uppers).await;
2665 if !upper.is_empty() {
2666 let fut = gen_upper_future(id, handle, upper);
2667 upper_futures.push(fut.boxed());
2668 }
2669 } else {
2670 handle.expire().await;
2674 }
2675 }
2676 }
2677 cmd = self.cmds_rx.recv() => {
2678 let Some(cmd) = cmd else {
2679 break;
2681 };
2682
2683 let commands = iter::once(cmd).chain(
2687 iter::from_fn(|| self.cmds_rx.try_recv().ok())
2688 );
2689 let mut downgrades = BTreeMap::<_, Antichain<_>>::new();
2690 for cmd in commands {
2691 match cmd {
2692 BackgroundCmd::Register{
2693 id,
2694 is_in_txns,
2695 write_handle,
2696 since_handle
2697 } => {
2698 debug!("registering handles for {}", id);
2699 let previous = self.shard_by_id.insert(id, write_handle.shard_id());
2700 if previous.is_some() {
2701 panic!("already registered a WriteHandle for collection {id}");
2702 }
2703
2704 let previous = self.since_handles.insert(id, since_handle);
2705 if previous.is_some() {
2706 panic!("already registered a SinceHandle for collection {id}");
2707 }
2708
2709 if is_in_txns {
2710 self.txns_shards.insert(id);
2711 } else {
2712 let upper = write_handle.upper().clone();
2713 if !upper.is_empty() {
2714 let fut = gen_upper_future(id, write_handle, upper);
2715 upper_futures.push(fut.boxed());
2716 }
2717 }
2718 }
2719 BackgroundCmd::DowngradeSince(cmds) => {
2720 for (id, new) in cmds {
2721 downgrades.entry(id)
2722 .and_modify(|since| since.join_assign(&new))
2723 .or_insert(new);
2724 }
2725 }
2726 BackgroundCmd::SnapshotStats(id, as_of, tx) => {
2727 let res = match self.since_handles.get(&id) {
2733 Some(x) => {
2734 let fut: BoxFuture<
2735 'static,
2736 Result<SnapshotStats, StorageError>,
2737 > = match as_of {
2738 SnapshotStatsAsOf::Direct(as_of) => {
2739 x.snapshot_stats(id, Some(as_of))
2740 }
2741 SnapshotStatsAsOf::Txns(data_snapshot) => {
2742 x.snapshot_stats_from_txn(id, data_snapshot)
2743 }
2744 };
2745 SnapshotStatsRes(fut)
2746 }
2747 None => SnapshotStatsRes(Box::pin(futures::future::ready(Err(
2748 StorageError::IdentifierMissing(id),
2749 )))),
2750 };
2751 let _ = tx.send(res);
2753 }
2754 }
2755 }
2756
2757 if !downgrades.is_empty() {
2758 self.downgrade_sinces(downgrades).await;
2759 }
2760 }
2761 Some(holds_changes) = self.holds_rx.recv() => {
2762 let mut batched_changes = BTreeMap::new();
2763 batched_changes.insert(holds_changes.0, holds_changes.1);
2764
2765 while let Ok(mut holds_changes) = self.holds_rx.try_recv() {
2766 let entry = batched_changes.entry(holds_changes.0);
2767 entry
2768 .and_modify(|existing| existing.extend(holds_changes.1.drain()))
2769 .or_insert_with(|| holds_changes.1);
2770 }
2771
2772 let mut collections = self.collections.lock().expect("lock poisoned");
2773
2774 let user_changes = batched_changes
2775 .iter()
2776 .filter(|(id, _c)| id.is_user())
2777 .map(|(id, c)| {
2778 (id.clone(), c.clone())
2779 })
2780 .collect_vec();
2781
2782 if !user_changes.is_empty() {
2783 trace!(?user_changes, "applying holds changes from channel");
2784 }
2785
2786 StorageCollectionsImpl::update_read_capabilities_inner(
2787 &self.cmds_tx,
2788 &mut collections,
2789 &mut batched_changes,
2790 );
2791 }
2792 }
2793 }
2794
2795 warn!("BackgroundTask shutting down");
2796 }
2797
2798 #[instrument(level = "debug")]
2799 async fn update_write_frontiers(&self, updates: &[(GlobalId, &Antichain<Timestamp>)]) {
2800 let mut read_capability_changes = BTreeMap::default();
2801
2802 let mut self_collections = self.collections.lock().expect("lock poisoned");
2803
2804 for (id, new_upper) in updates.iter() {
2805 let collection = if let Some(c) = self_collections.get_mut(id) {
2806 c
2807 } else {
2808 trace!(
2809 "Reference to absent collection {id}, due to concurrent removal of that collection"
2810 );
2811 continue;
2812 };
2813
2814 if PartialOrder::less_than(&collection.write_frontier, *new_upper) {
2815 collection.write_frontier.clone_from(new_upper);
2816 }
2817
2818 let mut new_read_capability = collection
2819 .read_policy
2820 .frontier(collection.write_frontier.borrow());
2821
2822 if id.is_user() {
2823 trace!(
2824 %id,
2825 implied_capability = ?collection.implied_capability,
2826 policy = ?collection.read_policy,
2827 write_frontier = ?collection.write_frontier,
2828 ?new_read_capability,
2829 "update_write_frontiers");
2830 }
2831
2832 if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
2833 let mut update = ChangeBatch::new();
2834 update.extend(new_read_capability.iter().map(|time| (*time, 1)));
2835 std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
2836 update.extend(new_read_capability.iter().map(|time| (*time, -1)));
2837
2838 if !update.is_empty() {
2839 read_capability_changes.insert(*id, update);
2840 }
2841 }
2842 }
2843
2844 if !read_capability_changes.is_empty() {
2845 StorageCollectionsImpl::update_read_capabilities_inner(
2846 &self.cmds_tx,
2847 &mut self_collections,
2848 &mut read_capability_changes,
2849 );
2850 }
2851 }
2852
2853 async fn downgrade_sinces(&mut self, cmds: BTreeMap<GlobalId, Antichain<Timestamp>>) {
2854 let mut futures = Vec::with_capacity(cmds.len());
2856 for (id, new_since) in cmds {
2857 let Some(mut since_handle) = self.since_handles.remove(&id) else {
2860 trace!("downgrade_sinces: reference to absent collection {id}");
2862 continue;
2863 };
2864
2865 let fut = async move {
2866 if id.is_user() {
2867 trace!("downgrading since of {} to {:?}", id, new_since);
2868 }
2869
2870 let epoch = since_handle.opaque().clone();
2871 let result = if new_since.is_empty() {
2872 Some(
2876 since_handle
2877 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2878 .await,
2879 )
2880 } else {
2881 since_handle
2882 .maybe_compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2883 .await
2884 };
2885 (id, since_handle, result)
2886 };
2887 futures.push(fut);
2888 }
2889
2890 for (id, since_handle, result) in futures::future::join_all(futures).await {
2891 let new_since = match result {
2892 Some(Ok(since)) => Some(since),
2893 Some(Err(other_epoch)) => mz_ore::halt!(
2894 "fenced by envd @ {other_epoch:?}. ours = {:?}",
2895 since_handle.opaque(),
2896 ),
2897 None => None,
2898 };
2899
2900 self.since_handles.insert(id, since_handle);
2901
2902 if new_since.is_some_and(|s| s.is_empty()) {
2903 info!(%id, "removing persist handles because the since advanced to []!");
2904
2905 let _since_handle = self.since_handles.remove(&id).expect("known to exist");
2906 let Some(dropped_shard_id) = self.shard_by_id.remove(&id) else {
2907 panic!("missing GlobalId -> ShardId mapping for id {id}");
2908 };
2909
2910 self.txns_shards.remove(&id);
2915
2916 if self
2917 .config
2918 .lock()
2919 .expect("lock poisoned")
2920 .parameters
2921 .finalize_shards
2922 {
2923 info!(
2924 %id, %dropped_shard_id,
2925 "enqueuing shard finalization due to dropped collection and dropped \
2926 persist handle",
2927 );
2928 self.finalizable_shards.lock().insert(dropped_shard_id);
2929 } else {
2930 info!(
2931 "not triggering shard finalization due to dropped storage object \
2932 because enable_storage_shard_finalization parameter is false"
2933 );
2934 }
2935 }
2936 }
2937 }
2938}
2939
2940struct FinalizeShardsTaskConfig {
2941 envd_epoch: NonZeroI64,
2942 config: Arc<Mutex<StorageConfiguration>>,
2943 metrics: StorageCollectionsMetrics,
2944 finalizable_shards: Arc<ShardIdSet>,
2945 finalized_shards: Arc<ShardIdSet>,
2946 persist_location: PersistLocation,
2947 persist: Arc<PersistClientCache>,
2948 read_only: bool,
2949}
2950
2951async fn finalize_shards_task(
2952 FinalizeShardsTaskConfig {
2953 envd_epoch,
2954 config,
2955 metrics,
2956 finalizable_shards,
2957 finalized_shards,
2958 persist_location,
2959 persist,
2960 read_only,
2961 }: FinalizeShardsTaskConfig,
2962) {
2963 if read_only {
2964 info!("disabling shard finalization in read only mode");
2965 return;
2966 }
2967
2968 let mut interval = tokio::time::interval(Duration::from_secs(5));
2969 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
2970 loop {
2971 interval.tick().await;
2972
2973 if !config
2974 .lock()
2975 .expect("lock poisoned")
2976 .parameters
2977 .finalize_shards
2978 {
2979 debug!(
2980 "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
2981 );
2982 continue;
2983 }
2984
2985 let current_finalizable_shards = {
2986 finalizable_shards.lock().iter().cloned().collect_vec()
2989 };
2990
2991 if current_finalizable_shards.is_empty() {
2992 debug!("no shards to finalize");
2993 continue;
2994 }
2995
2996 debug!(?current_finalizable_shards, "attempting to finalize shards");
2997
2998 let persist_client = persist.open(persist_location.clone()).await.unwrap();
3000
3001 let metrics = &metrics;
3002 let finalizable_shards = &finalizable_shards;
3003 let finalized_shards = &finalized_shards;
3004 let persist_client = &persist_client;
3005 let diagnostics = &Diagnostics::from_purpose("finalizing shards");
3006
3007 let force_downgrade_since = STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION
3008 .get(config.lock().expect("lock poisoned").config_set());
3009
3010 let epoch = &PersistEpoch::from(envd_epoch);
3011
3012 futures::stream::iter(current_finalizable_shards.clone())
3013 .map(|shard_id| async move {
3014 let persist_client = persist_client.clone();
3015 let diagnostics = diagnostics.clone();
3016 let epoch = epoch.clone();
3017
3018 metrics.finalization_started.inc();
3019
3020 let is_finalized = persist_client
3021 .is_finalized::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics)
3022 .await
3023 .expect("invalid persist usage");
3024
3025 if is_finalized {
3026 debug!(%shard_id, "shard is already finalized!");
3027 Some(shard_id)
3028 } else {
3029 debug!(%shard_id, "finalizing shard");
3030 let finalize = || async move {
3031 let diagnostics = Diagnostics::from_purpose("finalizing shards");
3033
3034 let mut write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff> =
3037 persist_client
3038 .open_writer(
3039 shard_id,
3040 Arc::new(RelationDesc::empty()),
3041 Arc::new(UnitSchema),
3042 diagnostics,
3043 )
3044 .await
3045 .expect("invalid persist usage");
3046 write_handle.advance_upper(&Antichain::new()).await;
3047 write_handle.expire().await;
3048
3049 if force_downgrade_since {
3050 let our_opaque = Opaque::encode(&epoch);
3051 let mut since_handle: SinceHandle<
3052 SourceData,
3053 (),
3054 Timestamp,
3055 StorageDiff,
3056 > = persist_client
3057 .open_critical_since(
3058 shard_id,
3059 PersistClient::CONTROLLER_CRITICAL_SINCE,
3060 our_opaque.clone(),
3061 Diagnostics::from_purpose("finalizing shards"),
3062 )
3063 .await
3064 .expect("invalid persist usage");
3065 let handle_opaque = since_handle.opaque().clone();
3066 let opaque = if our_opaque.codec_name() == handle_opaque.codec_name()
3067 && epoch.0 > handle_opaque.decode::<PersistEpoch>().0
3068 {
3069 handle_opaque
3072 } else {
3073 our_opaque
3079 };
3080 let new_since = Antichain::new();
3081 let downgrade = since_handle
3082 .compare_and_downgrade_since(&opaque, (&opaque, &new_since))
3083 .await;
3084 if let Err(e) = downgrade {
3085 warn!("tried to finalize a shard with an advancing epoch: {e:?}");
3086 return Ok(());
3087 }
3088 }
3091
3092 persist_client
3093 .finalize_shard::<SourceData, (), Timestamp, StorageDiff>(
3094 shard_id,
3095 Diagnostics::from_purpose("finalizing shards"),
3096 )
3097 .await
3098 };
3099
3100 match finalize().await {
3101 Err(e) => {
3102 warn!("error during finalization of shard {shard_id}: {e:?}");
3105 None
3106 }
3107 Ok(()) => {
3108 debug!(%shard_id, "finalize success!");
3109 Some(shard_id)
3110 }
3111 }
3112 }
3113 })
3114 .buffer_unordered(10)
3119 .for_each(|shard_id| async move {
3123 match shard_id {
3124 None => metrics.finalization_failed.inc(),
3125 Some(shard_id) => {
3126 {
3133 let mut finalizable_shards = finalizable_shards.lock();
3134 let mut finalized_shards = finalized_shards.lock();
3135 finalizable_shards.remove(&shard_id);
3136 finalized_shards.insert(shard_id);
3137 }
3138
3139 metrics.finalization_succeeded.inc();
3140 }
3141 }
3142 })
3143 .await;
3144
3145 debug!("done finalizing shards");
3146 }
3147}
3148
3149#[derive(Debug)]
3150pub(crate) enum SnapshotStatsAsOf {
3151 Direct(Antichain<Timestamp>),
3154 Txns(DataSnapshot<Timestamp>),
3157}
3158
3159#[cfg(test)]
3160mod tests {
3161 use std::str::FromStr;
3162 use std::sync::Arc;
3163
3164 use mz_build_info::DUMMY_BUILD_INFO;
3165 use mz_dyncfg::ConfigSet;
3166 use mz_ore::assert_err;
3167 use mz_ore::metrics::{MetricsRegistry, UIntGauge};
3168 use mz_ore::now::SYSTEM_TIME;
3169 use mz_ore::url::SensitiveUrl;
3170 use mz_persist_client::cache::PersistClientCache;
3171 use mz_persist_client::cfg::PersistConfig;
3172 use mz_persist_client::rpc::PubSubClientConnection;
3173 use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
3174 use mz_persist_types::codec_impls::UnitSchema;
3175 use mz_repr::{RelationDesc, Row};
3176 use mz_secrets::InMemorySecretsController;
3177
3178 use super::*;
3179
3180 #[mz_ore::test(tokio::test)]
3181 #[cfg_attr(miri, ignore)] async fn test_snapshot_stats(&self) {
3183 let persist_location = PersistLocation {
3184 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3185 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3186 };
3187 let persist_client = PersistClientCache::new(
3188 PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
3189 &MetricsRegistry::new(),
3190 |_, _| PubSubClientConnection::noop(),
3191 );
3192 let persist_client = Arc::new(persist_client);
3193
3194 let (cmds_tx, mut background_task) =
3195 BackgroundTask::new_for_test(persist_location.clone(), Arc::clone(&persist_client));
3196 let background_task =
3197 mz_ore::task::spawn(|| "storage_collections::background_task", async move {
3198 background_task.run().await
3199 });
3200
3201 let persist = persist_client.open(persist_location).await.unwrap();
3202
3203 let shard_id = ShardId::new();
3204 let since_handle = persist
3205 .open_critical_since(
3206 shard_id,
3207 PersistClient::CONTROLLER_CRITICAL_SINCE,
3208 Opaque::encode(&PersistEpoch::default()),
3209 Diagnostics::for_tests(),
3210 )
3211 .await
3212 .unwrap();
3213 let write_handle = persist
3214 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3215 shard_id,
3216 Arc::new(RelationDesc::empty()),
3217 Arc::new(UnitSchema),
3218 Diagnostics::for_tests(),
3219 )
3220 .await
3221 .unwrap();
3222
3223 cmds_tx
3224 .send(BackgroundCmd::Register {
3225 id: GlobalId::User(1),
3226 is_in_txns: false,
3227 since_handle: SinceHandleWrapper::Critical(since_handle),
3228 write_handle,
3229 })
3230 .unwrap();
3231
3232 let mut write_handle = persist
3233 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3234 shard_id,
3235 Arc::new(RelationDesc::empty()),
3236 Arc::new(UnitSchema),
3237 Diagnostics::for_tests(),
3238 )
3239 .await
3240 .unwrap();
3241
3242 let stats =
3244 snapshot_stats(&cmds_tx, GlobalId::User(2), Antichain::from_elem(0.into())).await;
3245 assert_err!(stats);
3246
3247 let stats_fut = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3249 assert_none!(stats_fut.now_or_never());
3250
3251 let stats_ts1_fut =
3253 snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3254
3255 let data = (
3257 (SourceData(Ok(Row::default())), ()),
3258 mz_repr::Timestamp::from(0),
3259 1i64,
3260 );
3261 let () = write_handle
3262 .compare_and_append(
3263 &[data],
3264 Antichain::from_elem(0.into()),
3265 Antichain::from_elem(1.into()),
3266 )
3267 .await
3268 .unwrap()
3269 .unwrap();
3270
3271 let stats = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(0.into()))
3273 .await
3274 .unwrap();
3275 assert_eq!(stats.num_updates, 1);
3276
3277 let data = (
3279 (SourceData(Ok(Row::default())), ()),
3280 mz_repr::Timestamp::from(1),
3281 1i64,
3282 );
3283 let () = write_handle
3284 .compare_and_append(
3285 &[data],
3286 Antichain::from_elem(1.into()),
3287 Antichain::from_elem(2.into()),
3288 )
3289 .await
3290 .unwrap()
3291 .unwrap();
3292
3293 let stats = stats_ts1_fut.await.unwrap();
3294 assert_eq!(stats.num_updates, 2);
3295
3296 drop(background_task);
3298 }
3299
3300 async fn snapshot_stats(
3301 cmds_tx: &mpsc::UnboundedSender<BackgroundCmd>,
3302 id: GlobalId,
3303 as_of: Antichain<Timestamp>,
3304 ) -> Result<SnapshotStats, StorageError> {
3305 let (tx, rx) = oneshot::channel();
3306 cmds_tx
3307 .send(BackgroundCmd::SnapshotStats(
3308 id,
3309 SnapshotStatsAsOf::Direct(as_of),
3310 tx,
3311 ))
3312 .unwrap();
3313 let res = rx.await.expect("BackgroundTask should be live").0;
3314
3315 res.await
3316 }
3317
3318 impl BackgroundTask {
3319 fn new_for_test(
3320 _persist_location: PersistLocation,
3321 _persist_client: Arc<PersistClientCache>,
3322 ) -> (mpsc::UnboundedSender<BackgroundCmd>, Self) {
3323 let (cmds_tx, cmds_rx) = mpsc::unbounded_channel();
3324 let (_holds_tx, holds_rx) = mpsc::unbounded_channel();
3325 let connection_context =
3326 ConnectionContext::for_tests(Arc::new(InMemorySecretsController::new()));
3327
3328 let task = Self {
3329 config: Arc::new(Mutex::new(StorageConfiguration::new(
3330 connection_context,
3331 ConfigSet::default(),
3332 ))),
3333 cmds_tx: cmds_tx.clone(),
3334 cmds_rx,
3335 holds_rx,
3336 finalizable_shards: Arc::new(ShardIdSet::new(
3337 UIntGauge::new("finalizable_shards", "dummy gauge for tests").unwrap(),
3338 )),
3339 collections: Arc::new(Mutex::new(BTreeMap::new())),
3340 shard_by_id: BTreeMap::new(),
3341 since_handles: BTreeMap::new(),
3342 txns_handle: None,
3343 txns_shards: BTreeSet::new(),
3344 };
3345
3346 (cmds_tx, task)
3347 }
3348 }
3349}