1use std::cmp::Reverse;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::iter;
16use std::num::NonZeroI64;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::future::BoxFuture;
23use futures::stream::{BoxStream, FuturesUnordered};
24use futures::{Future, FutureExt, StreamExt};
25use itertools::Itertools;
26use mz_ore::collections::CollectionExt;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::now::{EpochMillis, NowFn};
29use mz_ore::task::AbortOnDropHandle;
30use mz_ore::{assert_none, instrument, soft_assert_or_log};
31use mz_persist_client::cache::PersistClientCache;
32use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
33use mz_persist_client::critical::{Opaque, SinceHandle};
34use mz_persist_client::read::{Cursor, ReadHandle};
35use mz_persist_client::schema::CaESchema;
36use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
37use mz_persist_client::write::WriteHandle;
38use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
39use mz_persist_types::Codec64;
40use mz_persist_types::codec_impls::UnitSchema;
41use mz_persist_types::txn::TxnsCodec;
42use mz_repr::{GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
43use mz_storage_types::StorageDiff;
44use mz_storage_types::configuration::StorageConfiguration;
45use mz_storage_types::connections::ConnectionContext;
46use mz_storage_types::controller::{CollectionMetadata, StorageError, TxnsCodecRow};
47use mz_storage_types::dyncfgs::STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION;
48use mz_storage_types::errors::CollectionMissing;
49use mz_storage_types::parameters::StorageParameters;
50use mz_storage_types::read_holds::ReadHold;
51use mz_storage_types::read_policy::ReadPolicy;
52use mz_storage_types::sources::{GenericSourceConnection, SourceData, SourceEnvelope, Timeline};
53use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
54use mz_txn_wal::metrics::Metrics as TxnMetrics;
55use mz_txn_wal::txn_read::{DataSnapshot, TxnsRead};
56use mz_txn_wal::txns::TxnsHandle;
57use timely::PartialOrder;
58use timely::order::TotalOrder;
59use timely::progress::frontier::MutableAntichain;
60use timely::progress::{Antichain, ChangeBatch, Timestamp as TimelyTimestamp};
61use tokio::sync::{mpsc, oneshot};
62use tokio::time::MissedTickBehavior;
63use tracing::{debug, info, trace, warn};
64
65use crate::client::TimestamplessUpdateBuilder;
66use crate::controller::{
67 CollectionDescription, DataSource, PersistEpoch, StorageMetadata, StorageTxn,
68};
69use crate::storage_collections::metrics::{ShardIdSet, StorageCollectionsMetrics};
70
71mod metrics;
72
73#[async_trait]
87pub trait StorageCollections: Debug + Sync {
88 type Timestamp: TimelyTimestamp;
89
90 async fn initialize_state(
97 &self,
98 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
99 init_ids: BTreeSet<GlobalId>,
100 ) -> Result<(), StorageError<Self::Timestamp>>;
101
102 fn update_parameters(&self, config_params: StorageParameters);
104
105 fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing>;
107
108 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
114
115 fn collection_frontiers(
117 &self,
118 id: GlobalId,
119 ) -> Result<CollectionFrontiers<Self::Timestamp>, CollectionMissing> {
120 let frontiers = self
121 .collections_frontiers(vec![id])?
122 .expect_element(|| "known to exist");
123
124 Ok(frontiers)
125 }
126
127 fn collections_frontiers(
130 &self,
131 id: Vec<GlobalId>,
132 ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, CollectionMissing>;
133
134 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>>;
139
140 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
143
144 async fn snapshot_stats(
147 &self,
148 id: GlobalId,
149 as_of: Antichain<Self::Timestamp>,
150 ) -> Result<SnapshotStats, StorageError<Self::Timestamp>>;
151
152 async fn snapshot_parts_stats(
161 &self,
162 id: GlobalId,
163 as_of: Antichain<Self::Timestamp>,
164 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>;
165
166 fn snapshot(
168 &self,
169 id: GlobalId,
170 as_of: Self::Timestamp,
171 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>;
172
173 async fn snapshot_latest(
176 &self,
177 id: GlobalId,
178 ) -> Result<Vec<Row>, StorageError<Self::Timestamp>>;
179
180 fn snapshot_cursor(
182 &self,
183 id: GlobalId,
184 as_of: Self::Timestamp,
185 ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
186 where
187 Self::Timestamp: Codec64 + TimelyTimestamp + Lattice;
188
189 fn snapshot_and_stream(
194 &self,
195 id: GlobalId,
196 as_of: Self::Timestamp,
197 ) -> BoxFuture<
198 'static,
199 Result<
200 BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
201 StorageError<Self::Timestamp>,
202 >,
203 >;
204
205 fn create_update_builder(
208 &self,
209 id: GlobalId,
210 ) -> BoxFuture<
211 'static,
212 Result<
213 TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
214 StorageError<Self::Timestamp>,
215 >,
216 >
217 where
218 Self::Timestamp: Lattice + Codec64;
219
220 async fn prepare_state(
226 &self,
227 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
228 ids_to_add: BTreeSet<GlobalId>,
229 ids_to_drop: BTreeSet<GlobalId>,
230 ids_to_register: BTreeMap<GlobalId, ShardId>,
231 ) -> Result<(), StorageError<Self::Timestamp>>;
232
233 async fn create_collections_for_bootstrap(
259 &self,
260 storage_metadata: &StorageMetadata,
261 register_ts: Option<Self::Timestamp>,
262 collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
263 migrated_storage_collections: &BTreeSet<GlobalId>,
264 ) -> Result<(), StorageError<Self::Timestamp>>;
265
266 async fn alter_table_desc(
268 &self,
269 existing_collection: GlobalId,
270 new_collection: GlobalId,
271 new_desc: RelationDesc,
272 expected_version: RelationVersion,
273 ) -> Result<(), StorageError<Self::Timestamp>>;
274
275 fn drop_collections_unvalidated(
287 &self,
288 storage_metadata: &StorageMetadata,
289 identifiers: Vec<GlobalId>,
290 );
291
292 fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>);
306
307 fn acquire_read_holds(
310 &self,
311 desired_holds: Vec<GlobalId>,
312 ) -> Result<Vec<ReadHold<Self::Timestamp>>, CollectionMissing>;
313
314 fn determine_time_dependence(
317 &self,
318 id: GlobalId,
319 ) -> Result<Option<TimeDependence>, TimeDependenceError>;
320
321 fn dump(&self) -> Result<serde_json::Value, anyhow::Error>;
323}
324
325pub struct SnapshotCursor<T: Codec64 + TimelyTimestamp + Lattice> {
328 pub _read_handle: ReadHandle<SourceData, (), T, StorageDiff>,
331 pub cursor: Cursor<SourceData, (), T, StorageDiff>,
332}
333
334impl<T: Codec64 + TimelyTimestamp + Lattice + Sync> SnapshotCursor<T> {
335 pub async fn next(
336 &mut self,
337 ) -> Option<impl Iterator<Item = (SourceData, T, StorageDiff)> + Sized + '_> {
338 let iter = self.cursor.next().await?;
339 Some(iter.map(|((k, ()), t, d)| (k, t, d)))
340 }
341}
342
343#[derive(Debug)]
345pub struct CollectionFrontiers<T> {
346 pub id: GlobalId,
348
349 pub write_frontier: Antichain<T>,
351
352 pub implied_capability: Antichain<T>,
359
360 pub read_capabilities: Antichain<T>,
363}
364
365#[derive(Debug, Clone)]
368pub struct StorageCollectionsImpl<
369 T: TimelyTimestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
370> {
371 envd_epoch: NonZeroI64,
374
375 read_only: bool,
381
382 finalizable_shards: Arc<ShardIdSet>,
385
386 finalized_shards: Arc<ShardIdSet>,
391
392 collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
394
395 txns_read: TxnsRead<T>,
397
398 config: Arc<Mutex<StorageConfiguration>>,
400
401 initial_txn_upper: Antichain<T>,
410
411 persist_location: PersistLocation,
413
414 persist: Arc<PersistClientCache>,
416
417 cmd_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
419
420 holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<T>)>,
422
423 _background_task: Arc<AbortOnDropHandle<()>>,
425 _finalize_shards_task: Arc<AbortOnDropHandle<()>>,
426}
427
428type SourceDataStream<T> = BoxStream<'static, (SourceData, T, StorageDiff)>;
437
438impl<T> StorageCollectionsImpl<T>
441where
442 T: TimelyTimestamp
443 + Lattice
444 + Codec64
445 + From<EpochMillis>
446 + TimestampManipulation
447 + Into<mz_repr::Timestamp>
448 + Sync,
449{
450 pub async fn new(
458 persist_location: PersistLocation,
459 persist_clients: Arc<PersistClientCache>,
460 metrics_registry: &MetricsRegistry,
461 _now: NowFn,
462 txns_metrics: Arc<TxnMetrics>,
463 envd_epoch: NonZeroI64,
464 read_only: bool,
465 connection_context: ConnectionContext,
466 txn: &dyn StorageTxn<T>,
467 ) -> Self {
468 let metrics = StorageCollectionsMetrics::register_into(metrics_registry);
469
470 let txns_id = txn
474 .get_txn_wal_shard()
475 .expect("must call prepare initialization before creating StorageCollections");
476
477 let txns_client = persist_clients
478 .open(persist_location.clone())
479 .await
480 .expect("location should be valid");
481
482 let _txns_handle: TxnsHandle<SourceData, (), T, StorageDiff, TxnsCodecRow> =
485 TxnsHandle::open(
486 T::minimum(),
487 txns_client.clone(),
488 txns_client.dyncfgs().clone(),
489 Arc::clone(&txns_metrics),
490 txns_id,
491 Opaque::encode(&PersistEpoch::default()),
492 )
493 .await;
494
495 let (txns_key_schema, txns_val_schema) = TxnsCodecRow::schemas();
497 let mut txns_write = txns_client
498 .open_writer(
499 txns_id,
500 Arc::new(txns_key_schema),
501 Arc::new(txns_val_schema),
502 Diagnostics {
503 shard_name: "txns".to_owned(),
504 handle_purpose: "commit txns".to_owned(),
505 },
506 )
507 .await
508 .expect("txns schema shouldn't change");
509
510 let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
511
512 let collections = Arc::new(std::sync::Mutex::new(BTreeMap::default()));
513 let finalizable_shards =
514 Arc::new(ShardIdSet::new(metrics.finalization_outstanding.clone()));
515 let finalized_shards =
516 Arc::new(ShardIdSet::new(metrics.finalization_pending_commit.clone()));
517 let config = Arc::new(Mutex::new(StorageConfiguration::new(
518 connection_context,
519 mz_dyncfgs::all_dyncfgs(),
520 )));
521
522 let initial_txn_upper = txns_write.fetch_recent_upper().await.to_owned();
523
524 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
525 let (holds_tx, holds_rx) = mpsc::unbounded_channel();
526 let mut background_task = BackgroundTask {
527 config: Arc::clone(&config),
528 cmds_tx: cmd_tx.clone(),
529 cmds_rx: cmd_rx,
530 holds_rx,
531 collections: Arc::clone(&collections),
532 finalizable_shards: Arc::clone(&finalizable_shards),
533 shard_by_id: BTreeMap::new(),
534 since_handles: BTreeMap::new(),
535 txns_handle: Some(txns_write),
536 txns_shards: Default::default(),
537 };
538
539 let background_task =
540 mz_ore::task::spawn(|| "storage_collections::background_task", async move {
541 background_task.run().await
542 });
543
544 let finalize_shards_task = mz_ore::task::spawn(
545 || "storage_collections::finalize_shards_task",
546 finalize_shards_task::<T>(FinalizeShardsTaskConfig {
547 envd_epoch: envd_epoch.clone(),
548 config: Arc::clone(&config),
549 metrics,
550 finalizable_shards: Arc::clone(&finalizable_shards),
551 finalized_shards: Arc::clone(&finalized_shards),
552 persist_location: persist_location.clone(),
553 persist: Arc::clone(&persist_clients),
554 read_only,
555 }),
556 );
557
558 Self {
559 finalizable_shards,
560 finalized_shards,
561 collections,
562 txns_read,
563 envd_epoch,
564 read_only,
565 config,
566 initial_txn_upper,
567 persist_location,
568 persist: persist_clients,
569 cmd_tx,
570 holds_tx,
571 _background_task: Arc::new(background_task.abort_on_drop()),
572 _finalize_shards_task: Arc::new(finalize_shards_task.abort_on_drop()),
573 }
574 }
575
576 async fn open_data_handles(
584 &self,
585 id: &GlobalId,
586 shard: ShardId,
587 since: Option<&Antichain<T>>,
588 relation_desc: RelationDesc,
589 persist_client: &PersistClient,
590 ) -> (
591 WriteHandle<SourceData, (), T, StorageDiff>,
592 SinceHandleWrapper<T>,
593 ) {
594 let since_handle = if self.read_only {
595 let read_handle = self
596 .open_leased_handle(id, shard, relation_desc.clone(), since, persist_client)
597 .await;
598 SinceHandleWrapper::Leased(read_handle)
599 } else {
600 persist_client
603 .upgrade_version::<SourceData, (), T, StorageDiff>(
604 shard,
605 Diagnostics {
606 shard_name: id.to_string(),
607 handle_purpose: format!("controller data for {}", id),
608 },
609 )
610 .await
611 .expect("invalid persist usage");
612
613 let since_handle = self
614 .open_critical_handle(id, shard, since, persist_client)
615 .await;
616
617 SinceHandleWrapper::Critical(since_handle)
618 };
619
620 let mut write_handle = self
621 .open_write_handle(id, shard, relation_desc, persist_client)
622 .await;
623
624 write_handle.fetch_recent_upper().await;
635
636 (write_handle, since_handle)
637 }
638
639 async fn open_write_handle(
641 &self,
642 id: &GlobalId,
643 shard: ShardId,
644 relation_desc: RelationDesc,
645 persist_client: &PersistClient,
646 ) -> WriteHandle<SourceData, (), T, StorageDiff> {
647 let diagnostics = Diagnostics {
648 shard_name: id.to_string(),
649 handle_purpose: format!("controller data for {}", id),
650 };
651
652 let write = persist_client
653 .open_writer(
654 shard,
655 Arc::new(relation_desc),
656 Arc::new(UnitSchema),
657 diagnostics.clone(),
658 )
659 .await
660 .expect("invalid persist usage");
661
662 write
663 }
664
665 async fn open_critical_handle(
673 &self,
674 id: &GlobalId,
675 shard: ShardId,
676 since: Option<&Antichain<T>>,
677 persist_client: &PersistClient,
678 ) -> SinceHandle<SourceData, (), T, StorageDiff> {
679 tracing::debug!(%id, ?since, "opening critical handle");
680
681 assert!(
682 !self.read_only,
683 "attempting to open critical SinceHandle in read-only mode"
684 );
685
686 let diagnostics = Diagnostics {
687 shard_name: id.to_string(),
688 handle_purpose: format!("controller data for {}", id),
689 };
690
691 let since_handle = {
694 let mut handle = persist_client
697 .open_critical_since(
698 shard,
699 PersistClient::CONTROLLER_CRITICAL_SINCE,
700 Opaque::encode(&PersistEpoch::default()),
701 diagnostics.clone(),
702 )
703 .await
704 .expect("invalid persist usage");
705
706 let provided_since = match since {
710 Some(since) => since,
711 None => &Antichain::from_elem(T::minimum()),
712 };
713 let since = handle.since().join(provided_since);
714
715 let our_epoch = self.envd_epoch;
716
717 loop {
718 let current_epoch: PersistEpoch = handle.opaque().decode();
719
720 let unchecked_success = current_epoch.0.map(|e| e <= our_epoch).unwrap_or(true);
722
723 if unchecked_success {
724 let checked_success = handle
727 .compare_and_downgrade_since(
728 &Opaque::encode(¤t_epoch),
729 (&Opaque::encode(&PersistEpoch::from(our_epoch)), &since),
730 )
731 .await
732 .is_ok();
733 if checked_success {
734 break handle;
735 }
736 } else {
737 mz_ore::halt!("fenced by envd @ {current_epoch:?}. ours = {our_epoch}");
738 }
739 }
740 };
741
742 since_handle
743 }
744
745 async fn open_leased_handle(
751 &self,
752 id: &GlobalId,
753 shard: ShardId,
754 relation_desc: RelationDesc,
755 since: Option<&Antichain<T>>,
756 persist_client: &PersistClient,
757 ) -> ReadHandle<SourceData, (), T, StorageDiff> {
758 tracing::debug!(%id, ?since, "opening leased handle");
759
760 let diagnostics = Diagnostics {
761 shard_name: id.to_string(),
762 handle_purpose: format!("controller data for {}", id),
763 };
764
765 let use_critical_since = false;
766 let mut handle: ReadHandle<_, _, _, _> = persist_client
767 .open_leased_reader(
768 shard,
769 Arc::new(relation_desc),
770 Arc::new(UnitSchema),
771 diagnostics.clone(),
772 use_critical_since,
773 )
774 .await
775 .expect("invalid persist usage");
776
777 let provided_since = match since {
781 Some(since) => since,
782 None => &Antichain::from_elem(T::minimum()),
783 };
784 let since = handle.since().join(provided_since);
785
786 handle.downgrade_since(&since).await;
787
788 handle
789 }
790
791 fn register_handles(
792 &self,
793 id: GlobalId,
794 is_in_txns: bool,
795 since_handle: SinceHandleWrapper<T>,
796 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
797 ) {
798 self.send(BackgroundCmd::Register {
799 id,
800 is_in_txns,
801 since_handle,
802 write_handle,
803 });
804 }
805
806 fn send(&self, cmd: BackgroundCmd<T>) {
807 let _ = self.cmd_tx.send(cmd);
808 }
809
810 async fn snapshot_stats_inner(
811 &self,
812 id: GlobalId,
813 as_of: SnapshotStatsAsOf<T>,
814 ) -> Result<SnapshotStats, StorageError<T>> {
815 let (tx, rx) = oneshot::channel();
822 self.send(BackgroundCmd::SnapshotStats(id, as_of, tx));
823 rx.await.expect("BackgroundTask should be live").0.await
824 }
825
826 fn install_collection_dependency_read_holds_inner(
832 &self,
833 self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
834 id: GlobalId,
835 ) -> Result<(), StorageError<T>> {
836 let (deps, collection_implied_capability) = match self_collections.get(&id) {
837 Some(CollectionState {
838 storage_dependencies: deps,
839 implied_capability,
840 ..
841 }) => (deps.clone(), implied_capability),
842 _ => return Ok(()),
843 };
844
845 for dep in deps.iter() {
846 let dep_collection = self_collections
847 .get(dep)
848 .ok_or(StorageError::IdentifierMissing(id))?;
849
850 mz_ore::soft_assert_or_log!(
851 PartialOrder::less_equal(
852 &dep_collection.implied_capability,
853 collection_implied_capability
854 ),
855 "dependency since ({dep}@{:?}) cannot be in advance of dependent's since ({id}@{:?})",
856 dep_collection.implied_capability,
857 collection_implied_capability,
858 );
859 }
860
861 self.install_read_capabilities_inner(
862 self_collections,
863 id,
864 &deps,
865 collection_implied_capability.clone(),
866 )?;
867
868 Ok(())
869 }
870
871 fn determine_collection_dependencies(
873 self_collections: &BTreeMap<GlobalId, CollectionState<T>>,
874 source_id: GlobalId,
875 collection_desc: &CollectionDescription<T>,
876 ) -> Result<Vec<GlobalId>, StorageError<T>> {
877 let mut dependencies = Vec::new();
878
879 if let Some(id) = collection_desc.primary {
880 dependencies.push(id);
881 }
882
883 match &collection_desc.data_source {
884 DataSource::Introspection(_)
885 | DataSource::Webhook
886 | DataSource::Table
887 | DataSource::Progress
888 | DataSource::Other => (),
889 DataSource::IngestionExport {
890 ingestion_id,
891 data_config,
892 ..
893 } => {
894 let source = self_collections
897 .get(ingestion_id)
898 .ok_or(StorageError::IdentifierMissing(*ingestion_id))?;
899 let Some(remap_collection_id) = &source.ingestion_remap_collection_id else {
900 panic!("SourceExport must refer to a primary source that already exists");
901 };
902
903 match data_config.envelope {
904 SourceEnvelope::CdcV2 => (),
905 _ => dependencies.push(*remap_collection_id),
906 }
907 }
908 DataSource::Ingestion(ingestion) => {
910 if ingestion.remap_collection_id != source_id {
911 dependencies.push(ingestion.remap_collection_id);
912 }
913 }
914 DataSource::Sink { desc } => dependencies.push(desc.sink.from),
915 }
916
917 Ok(dependencies)
918 }
919
920 #[instrument(level = "debug")]
922 fn install_read_capabilities_inner(
923 &self,
924 self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
925 from_id: GlobalId,
926 storage_dependencies: &[GlobalId],
927 read_capability: Antichain<T>,
928 ) -> Result<(), StorageError<T>> {
929 let mut changes = ChangeBatch::new();
930 for time in read_capability.iter() {
931 changes.update(time.clone(), 1);
932 }
933
934 if tracing::span_enabled!(tracing::Level::TRACE) {
935 let user_capabilities = self_collections
937 .iter_mut()
938 .filter(|(id, _c)| id.is_user())
939 .map(|(id, c)| {
940 let updates = c.read_capabilities.updates().cloned().collect_vec();
941 (*id, c.implied_capability.clone(), updates)
942 })
943 .collect_vec();
944
945 trace!(
946 %from_id,
947 ?storage_dependencies,
948 ?read_capability,
949 ?user_capabilities,
950 "install_read_capabilities_inner");
951 }
952
953 let mut storage_read_updates = storage_dependencies
954 .iter()
955 .map(|id| (*id, changes.clone()))
956 .collect();
957
958 StorageCollectionsImpl::update_read_capabilities_inner(
959 &self.cmd_tx,
960 self_collections,
961 &mut storage_read_updates,
962 );
963
964 if tracing::span_enabled!(tracing::Level::TRACE) {
965 let user_capabilities = self_collections
967 .iter_mut()
968 .filter(|(id, _c)| id.is_user())
969 .map(|(id, c)| {
970 let updates = c.read_capabilities.updates().cloned().collect_vec();
971 (*id, c.implied_capability.clone(), updates)
972 })
973 .collect_vec();
974
975 trace!(
976 %from_id,
977 ?storage_dependencies,
978 ?read_capability,
979 ?user_capabilities,
980 "after install_read_capabilities_inner!");
981 }
982
983 Ok(())
984 }
985
986 async fn recent_upper(&self, id: GlobalId) -> Result<Antichain<T>, StorageError<T>> {
987 let metadata = &self.collection_metadata(id)?;
988 let persist_client = self
989 .persist
990 .open(metadata.persist_location.clone())
991 .await
992 .unwrap();
993 let diagnostics = Diagnostics {
996 shard_name: id.to_string(),
997 handle_purpose: format!("controller data for {}", id),
998 };
999 let write = persist_client
1002 .open_writer::<SourceData, (), T, StorageDiff>(
1003 metadata.data_shard,
1004 Arc::new(metadata.relation_desc.clone()),
1005 Arc::new(UnitSchema),
1006 diagnostics.clone(),
1007 )
1008 .await
1009 .expect("invalid persist usage");
1010 Ok(write.shared_upper())
1011 }
1012
1013 async fn read_handle_for_snapshot(
1014 persist: Arc<PersistClientCache>,
1015 metadata: &CollectionMetadata,
1016 id: GlobalId,
1017 ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
1018 let persist_client = persist
1019 .open(metadata.persist_location.clone())
1020 .await
1021 .unwrap();
1022
1023 let read_handle = persist_client
1029 .open_leased_reader::<SourceData, (), _, _>(
1030 metadata.data_shard,
1031 Arc::new(metadata.relation_desc.clone()),
1032 Arc::new(UnitSchema),
1033 Diagnostics {
1034 shard_name: id.to_string(),
1035 handle_purpose: format!("snapshot {}", id),
1036 },
1037 USE_CRITICAL_SINCE_SNAPSHOT.get(&persist.cfg),
1038 )
1039 .await
1040 .expect("invalid persist usage");
1041 Ok(read_handle)
1042 }
1043
1044 fn snapshot(
1050 &self,
1051 id: GlobalId,
1052 as_of: T,
1053 txns_read: &TxnsRead<T>,
1054 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<T>>>
1055 where
1056 T: Codec64 + From<EpochMillis> + TimestampManipulation,
1057 {
1058 let metadata = match self.collection_metadata(id) {
1059 Ok(metadata) => metadata.clone(),
1060 Err(e) => return async { Err(e.into()) }.boxed(),
1061 };
1062 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1063 assert_eq!(txns_id, txns_read.txns_id());
1064 txns_read.clone()
1065 });
1066 let persist = Arc::clone(&self.persist);
1067 async move {
1068 let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1069 let contents = match txns_read {
1070 None => {
1071 read_handle
1073 .snapshot_and_fetch(Antichain::from_elem(as_of))
1074 .await
1075 }
1076 Some(txns_read) => {
1077 txns_read.update_gt(as_of.clone()).await;
1091 let data_snapshot = txns_read
1092 .data_snapshot(metadata.data_shard, as_of.clone())
1093 .await;
1094 data_snapshot.snapshot_and_fetch(&mut read_handle).await
1095 }
1096 };
1097 match contents {
1098 Ok(contents) => {
1099 let mut snapshot = Vec::with_capacity(contents.len());
1100 for ((data, _), _, diff) in contents {
1101 let row = data.0?;
1104 snapshot.push((row, diff));
1105 }
1106 Ok(snapshot)
1107 }
1108 Err(_) => Err(StorageError::ReadBeforeSince(id)),
1109 }
1110 }
1111 .boxed()
1112 }
1113
1114 fn snapshot_and_stream(
1115 &self,
1116 id: GlobalId,
1117 as_of: T,
1118 txns_read: &TxnsRead<T>,
1119 ) -> BoxFuture<'static, Result<SourceDataStream<T>, StorageError<T>>> {
1120 use futures::stream::StreamExt;
1121
1122 let metadata = match self.collection_metadata(id) {
1123 Ok(metadata) => metadata.clone(),
1124 Err(e) => return async { Err(e.into()) }.boxed(),
1125 };
1126 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1127 assert_eq!(txns_id, txns_read.txns_id());
1128 txns_read.clone()
1129 });
1130 let persist = Arc::clone(&self.persist);
1131
1132 async move {
1133 let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1134 let stream = match txns_read {
1135 None => {
1136 read_handle
1138 .snapshot_and_stream(Antichain::from_elem(as_of))
1139 .await
1140 .map_err(|_| StorageError::ReadBeforeSince(id))?
1141 .boxed()
1142 }
1143 Some(txns_read) => {
1144 txns_read.update_gt(as_of.clone()).await;
1145 let data_snapshot = txns_read
1146 .data_snapshot(metadata.data_shard, as_of.clone())
1147 .await;
1148 data_snapshot
1149 .snapshot_and_stream(&mut read_handle)
1150 .await
1151 .map_err(|_| StorageError::ReadBeforeSince(id))?
1152 .boxed()
1153 }
1154 };
1155
1156 let stream = stream.map(|((data, _v), t, d)| (data, t, d)).boxed();
1158 Ok(stream)
1159 }
1160 .boxed()
1161 }
1162
1163 fn set_read_policies_inner(
1164 &self,
1165 collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1166 policies: Vec<(GlobalId, ReadPolicy<T>)>,
1167 ) {
1168 trace!("set_read_policies: {:?}", policies);
1169
1170 let mut read_capability_changes = BTreeMap::default();
1171
1172 for (id, policy) in policies.into_iter() {
1173 let collection = match collections.get_mut(&id) {
1174 Some(c) => c,
1175 None => {
1176 panic!("Reference to absent collection {id}");
1177 }
1178 };
1179
1180 let mut new_read_capability = policy.frontier(collection.write_frontier.borrow());
1181
1182 if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
1183 let mut update = ChangeBatch::new();
1184 update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
1185 std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
1186 update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
1187 if !update.is_empty() {
1188 read_capability_changes.insert(id, update);
1189 }
1190 }
1191
1192 collection.read_policy = policy;
1193 }
1194
1195 for (id, changes) in read_capability_changes.iter() {
1196 if id.is_user() {
1197 trace!(%id, ?changes, "in set_read_policies, capability changes");
1198 }
1199 }
1200
1201 if !read_capability_changes.is_empty() {
1202 StorageCollectionsImpl::update_read_capabilities_inner(
1203 &self.cmd_tx,
1204 collections,
1205 &mut read_capability_changes,
1206 );
1207 }
1208 }
1209
1210 fn update_read_capabilities_inner(
1214 cmd_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
1215 collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1216 updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>,
1217 ) {
1218 let mut collections_net = BTreeMap::new();
1220
1221 while let Some(id) = updates.keys().rev().next().cloned() {
1226 let mut update = updates.remove(&id).unwrap();
1227
1228 if id.is_user() {
1229 trace!(id = ?id, update = ?update, "update_read_capabilities");
1230 }
1231
1232 let collection = if let Some(c) = collections.get_mut(&id) {
1233 c
1234 } else {
1235 let has_positive_updates = update.iter().any(|(_ts, diff)| *diff > 0);
1236 if has_positive_updates {
1237 panic!(
1238 "reference to absent collection {id} but we have positive updates: {:?}",
1239 update
1240 );
1241 } else {
1242 continue;
1245 }
1246 };
1247
1248 let current_read_capabilities = collection.read_capabilities.frontier().to_owned();
1249 for (time, diff) in update.iter() {
1250 assert!(
1251 collection.read_capabilities.count_for(time) + diff >= 0,
1252 "update {:?} for collection {id} would lead to negative \
1253 read capabilities, read capabilities before applying: {:?}",
1254 update,
1255 collection.read_capabilities
1256 );
1257
1258 if collection.read_capabilities.count_for(time) + diff > 0 {
1259 assert!(
1260 current_read_capabilities.less_equal(time),
1261 "update {:?} for collection {id} is trying to \
1262 install read capabilities before the current \
1263 frontier of read capabilities, read capabilities before applying: {:?}",
1264 update,
1265 collection.read_capabilities
1266 );
1267 }
1268 }
1269
1270 let changes = collection.read_capabilities.update_iter(update.drain());
1271 update.extend(changes);
1272
1273 if id.is_user() {
1274 trace!(
1275 %id,
1276 ?collection.storage_dependencies,
1277 ?update,
1278 "forwarding update to storage dependencies");
1279 }
1280
1281 for id in collection.storage_dependencies.iter() {
1282 updates
1283 .entry(*id)
1284 .or_insert_with(ChangeBatch::new)
1285 .extend(update.iter().cloned());
1286 }
1287
1288 let (changes, frontier) = collections_net
1289 .entry(id)
1290 .or_insert_with(|| (<ChangeBatch<_>>::new(), Antichain::new()));
1291
1292 changes.extend(update.drain());
1293 *frontier = collection.read_capabilities.frontier().to_owned();
1294 }
1295
1296 let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
1299 for (key, (mut changes, frontier)) in collections_net {
1300 if !changes.is_empty() {
1301 let collection = collections.get(&key).expect("must still exist");
1303 let should_emit_persist_compaction = collection.primary.is_none();
1304
1305 if frontier.is_empty() {
1306 info!(id = %key, "removing collection state because the since advanced to []!");
1307 collections.remove(&key).expect("must still exist");
1308 }
1309
1310 if should_emit_persist_compaction {
1311 persist_compaction_commands.push((key, frontier));
1312 }
1313 }
1314 }
1315
1316 if !persist_compaction_commands.is_empty() {
1317 cmd_tx
1318 .send(BackgroundCmd::DowngradeSince(persist_compaction_commands))
1319 .expect("cannot fail to send");
1320 }
1321 }
1322
1323 fn synchronize_finalized_shards(&self, storage_metadata: &StorageMetadata) {
1325 self.finalized_shards
1326 .lock()
1327 .retain(|shard| storage_metadata.unfinalized_shards.contains(shard));
1328 }
1329}
1330
1331#[async_trait]
1333impl<T> StorageCollections for StorageCollectionsImpl<T>
1334where
1335 T: TimelyTimestamp
1336 + Lattice
1337 + Codec64
1338 + From<EpochMillis>
1339 + TimestampManipulation
1340 + Into<mz_repr::Timestamp>
1341 + Sync,
1342{
1343 type Timestamp = T;
1344
1345 async fn initialize_state(
1346 &self,
1347 txn: &mut (dyn StorageTxn<T> + Send),
1348 init_ids: BTreeSet<GlobalId>,
1349 ) -> Result<(), StorageError<T>> {
1350 let metadata = txn.get_collection_metadata();
1351 let existing_metadata: BTreeSet<_> = metadata.into_iter().map(|(id, _)| id).collect();
1352
1353 let new_collections: BTreeSet<GlobalId> =
1355 init_ids.difference(&existing_metadata).cloned().collect();
1356
1357 self.prepare_state(
1358 txn,
1359 new_collections,
1360 BTreeSet::default(),
1361 BTreeMap::default(),
1362 )
1363 .await?;
1364
1365 let unfinalized_shards = txn.get_unfinalized_shards().into_iter().collect_vec();
1373
1374 info!(?unfinalized_shards, "initializing finalizable_shards");
1375
1376 self.finalizable_shards.lock().extend(unfinalized_shards);
1377
1378 Ok(())
1379 }
1380
1381 fn update_parameters(&self, config_params: StorageParameters) {
1382 config_params.dyncfg_updates.apply(self.persist.cfg());
1385
1386 self.config
1387 .lock()
1388 .expect("lock poisoned")
1389 .update(config_params);
1390 }
1391
1392 fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing> {
1393 let collections = self.collections.lock().expect("lock poisoned");
1394
1395 collections
1396 .get(&id)
1397 .map(|c| c.collection_metadata.clone())
1398 .ok_or(CollectionMissing(id))
1399 }
1400
1401 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
1402 let collections = self.collections.lock().expect("lock poisoned");
1403
1404 collections
1405 .iter()
1406 .filter(|(_id, c)| !c.is_dropped())
1407 .map(|(id, c)| (*id, c.collection_metadata.clone()))
1408 .collect()
1409 }
1410
1411 fn collections_frontiers(
1412 &self,
1413 ids: Vec<GlobalId>,
1414 ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, CollectionMissing> {
1415 if ids.is_empty() {
1416 return Ok(vec![]);
1417 }
1418
1419 let collections = self.collections.lock().expect("lock poisoned");
1420
1421 let res = ids
1422 .into_iter()
1423 .map(|id| {
1424 collections
1425 .get(&id)
1426 .map(|c| CollectionFrontiers {
1427 id: id.clone(),
1428 write_frontier: c.write_frontier.clone(),
1429 implied_capability: c.implied_capability.clone(),
1430 read_capabilities: c.read_capabilities.frontier().to_owned(),
1431 })
1432 .ok_or(CollectionMissing(id))
1433 })
1434 .collect::<Result<Vec<_>, _>>()?;
1435
1436 Ok(res)
1437 }
1438
1439 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
1440 let collections = self.collections.lock().expect("lock poisoned");
1441
1442 let res = collections
1443 .iter()
1444 .filter(|(_id, c)| !c.is_dropped())
1445 .map(|(id, c)| CollectionFrontiers {
1446 id: id.clone(),
1447 write_frontier: c.write_frontier.clone(),
1448 implied_capability: c.implied_capability.clone(),
1449 read_capabilities: c.read_capabilities.frontier().to_owned(),
1450 })
1451 .collect_vec();
1452
1453 res
1454 }
1455
1456 async fn snapshot_stats(
1457 &self,
1458 id: GlobalId,
1459 as_of: Antichain<Self::Timestamp>,
1460 ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
1461 let metadata = self.collection_metadata(id)?;
1462
1463 let as_of = match metadata.txns_shard.as_ref() {
1466 None => SnapshotStatsAsOf::Direct(as_of),
1467 Some(txns_id) => {
1468 assert_eq!(txns_id, self.txns_read.txns_id());
1469 let as_of = as_of
1470 .into_option()
1471 .expect("cannot read as_of the empty antichain");
1472 self.txns_read.update_gt(as_of.clone()).await;
1473 let data_snapshot = self
1474 .txns_read
1475 .data_snapshot(metadata.data_shard, as_of.clone())
1476 .await;
1477 SnapshotStatsAsOf::Txns(data_snapshot)
1478 }
1479 };
1480 self.snapshot_stats_inner(id, as_of).await
1481 }
1482
1483 async fn snapshot_parts_stats(
1484 &self,
1485 id: GlobalId,
1486 as_of: Antichain<Self::Timestamp>,
1487 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
1488 let metadata = {
1489 let self_collections = self.collections.lock().expect("lock poisoned");
1490
1491 let collection_metadata = self_collections
1492 .get(&id)
1493 .ok_or(StorageError::IdentifierMissing(id))
1494 .map(|c| c.collection_metadata.clone());
1495
1496 match collection_metadata {
1497 Ok(m) => m,
1498 Err(e) => return Box::pin(async move { Err(e) }),
1499 }
1500 };
1501
1502 let persist = Arc::clone(&self.persist);
1505 let read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await;
1506
1507 let data_snapshot = match (metadata, as_of.as_option()) {
1508 (
1509 CollectionMetadata {
1510 txns_shard: Some(txns_id),
1511 data_shard,
1512 ..
1513 },
1514 Some(as_of),
1515 ) => {
1516 assert_eq!(txns_id, *self.txns_read.txns_id());
1517 self.txns_read.update_gt(as_of.clone()).await;
1518 let data_snapshot = self
1519 .txns_read
1520 .data_snapshot(data_shard, as_of.clone())
1521 .await;
1522 Some(data_snapshot)
1523 }
1524 _ => None,
1525 };
1526
1527 Box::pin(async move {
1528 let read_handle = read_handle?;
1529 let result = match data_snapshot {
1530 Some(data_snapshot) => data_snapshot.snapshot_parts_stats(&read_handle).await,
1531 None => read_handle.snapshot_parts_stats(as_of).await,
1532 };
1533 read_handle.expire().await;
1534 result.map_err(|_| StorageError::ReadBeforeSince(id))
1535 })
1536 }
1537
1538 fn snapshot(
1544 &self,
1545 id: GlobalId,
1546 as_of: Self::Timestamp,
1547 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>> {
1548 self.snapshot(id, as_of, &self.txns_read)
1549 }
1550
1551 async fn snapshot_latest(
1552 &self,
1553 id: GlobalId,
1554 ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
1555 let upper = self.recent_upper(id).await?;
1556 let res = match upper.as_option() {
1557 Some(f) if f > &T::minimum() => {
1558 let as_of = f.step_back().unwrap();
1559
1560 let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap();
1561 snapshot
1562 .into_iter()
1563 .map(|(row, diff)| {
1564 assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
1565 row
1566 })
1567 .collect()
1568 }
1569 Some(_min) => {
1570 Vec::new()
1572 }
1573 _ => {
1576 return Err(StorageError::InvalidUsage(
1577 "collection closed, cannot determine a read timestamp based on the upper"
1578 .to_string(),
1579 ));
1580 }
1581 };
1582
1583 Ok(res)
1584 }
1585
1586 fn snapshot_cursor(
1587 &self,
1588 id: GlobalId,
1589 as_of: Self::Timestamp,
1590 ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
1591 where
1592 Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
1593 {
1594 let metadata = match self.collection_metadata(id) {
1595 Ok(metadata) => metadata.clone(),
1596 Err(e) => return async { Err(e.into()) }.boxed(),
1597 };
1598 let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1599 assert_eq!(txns_id, self.txns_read.txns_id());
1602 self.txns_read.clone()
1603 });
1604 let persist = Arc::clone(&self.persist);
1605
1606 async move {
1608 let mut handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1609 let cursor = match txns_read {
1610 None => {
1611 let cursor = handle
1612 .snapshot_cursor(Antichain::from_elem(as_of), |_| true)
1613 .await
1614 .map_err(|_| StorageError::ReadBeforeSince(id))?;
1615 SnapshotCursor {
1616 _read_handle: handle,
1617 cursor,
1618 }
1619 }
1620 Some(txns_read) => {
1621 txns_read.update_gt(as_of.clone()).await;
1622 let data_snapshot = txns_read
1623 .data_snapshot(metadata.data_shard, as_of.clone())
1624 .await;
1625 let cursor = data_snapshot
1626 .snapshot_cursor(&mut handle, |_| true)
1627 .await
1628 .map_err(|_| StorageError::ReadBeforeSince(id))?;
1629 SnapshotCursor {
1630 _read_handle: handle,
1631 cursor,
1632 }
1633 }
1634 };
1635
1636 Ok(cursor)
1637 }
1638 .boxed()
1639 }
1640
1641 fn snapshot_and_stream(
1642 &self,
1643 id: GlobalId,
1644 as_of: Self::Timestamp,
1645 ) -> BoxFuture<
1646 'static,
1647 Result<
1648 BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
1649 StorageError<Self::Timestamp>,
1650 >,
1651 >
1652 where
1653 Self::Timestamp: TimelyTimestamp + Lattice + Codec64 + 'static,
1654 {
1655 self.snapshot_and_stream(id, as_of, &self.txns_read)
1656 }
1657
1658 fn create_update_builder(
1659 &self,
1660 id: GlobalId,
1661 ) -> BoxFuture<
1662 'static,
1663 Result<
1664 TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
1665 StorageError<Self::Timestamp>,
1666 >,
1667 > {
1668 let metadata = match self.collection_metadata(id) {
1669 Ok(m) => m,
1670 Err(e) => return Box::pin(async move { Err(e.into()) }),
1671 };
1672 let persist = Arc::clone(&self.persist);
1673
1674 async move {
1675 let persist_client = persist
1676 .open(metadata.persist_location.clone())
1677 .await
1678 .expect("invalid persist usage");
1679 let write_handle = persist_client
1680 .open_writer::<SourceData, (), Self::Timestamp, StorageDiff>(
1681 metadata.data_shard,
1682 Arc::new(metadata.relation_desc.clone()),
1683 Arc::new(UnitSchema),
1684 Diagnostics {
1685 shard_name: id.to_string(),
1686 handle_purpose: format!("create write batch {}", id),
1687 },
1688 )
1689 .await
1690 .expect("invalid persist usage");
1691 let builder = TimestamplessUpdateBuilder::new(&write_handle);
1692
1693 Ok(builder)
1694 }
1695 .boxed()
1696 }
1697
1698 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
1699 let collections = self.collections.lock().expect("lock poisoned");
1700
1701 if collections.contains_key(&id) {
1702 Ok(())
1703 } else {
1704 Err(StorageError::IdentifierMissing(id))
1705 }
1706 }
1707
1708 async fn prepare_state(
1709 &self,
1710 txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
1711 ids_to_add: BTreeSet<GlobalId>,
1712 ids_to_drop: BTreeSet<GlobalId>,
1713 ids_to_register: BTreeMap<GlobalId, ShardId>,
1714 ) -> Result<(), StorageError<T>> {
1715 txn.insert_collection_metadata(
1716 ids_to_add
1717 .into_iter()
1718 .map(|id| (id, ShardId::new()))
1719 .collect(),
1720 )?;
1721 txn.insert_collection_metadata(ids_to_register)?;
1722
1723 let dropped_mappings = txn.delete_collection_metadata(ids_to_drop);
1725
1726 let dropped_shards = dropped_mappings
1727 .into_iter()
1728 .map(|(_id, shard)| shard)
1729 .collect();
1730
1731 txn.insert_unfinalized_shards(dropped_shards)?;
1732
1733 let finalized_shards = self.finalized_shards.lock().iter().copied().collect();
1736 txn.mark_shards_as_finalized(finalized_shards);
1737
1738 Ok(())
1739 }
1740
1741 #[instrument(level = "debug")]
1744 async fn create_collections_for_bootstrap(
1745 &self,
1746 storage_metadata: &StorageMetadata,
1747 register_ts: Option<Self::Timestamp>,
1748 mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
1749 migrated_storage_collections: &BTreeSet<GlobalId>,
1750 ) -> Result<(), StorageError<Self::Timestamp>> {
1751 let is_in_txns = |id, metadata: &CollectionMetadata| {
1752 metadata.txns_shard.is_some()
1753 && !(self.read_only && migrated_storage_collections.contains(&id))
1754 };
1755
1756 collections.sort_by_key(|(id, _)| *id);
1761 collections.dedup();
1762 for pos in 1..collections.len() {
1763 if collections[pos - 1].0 == collections[pos].0 {
1764 return Err(StorageError::CollectionIdReused(collections[pos].0));
1765 }
1766 }
1767
1768 let enriched_with_metadata = collections
1771 .into_iter()
1772 .map(|(id, description)| {
1773 let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
1774
1775 let txns_shard = description
1779 .data_source
1780 .in_txns()
1781 .then(|| *self.txns_read.txns_id());
1782
1783 let metadata = CollectionMetadata {
1784 persist_location: self.persist_location.clone(),
1785 data_shard,
1786 relation_desc: description.desc.clone(),
1787 txns_shard,
1788 };
1789
1790 Ok((id, description, metadata))
1791 })
1792 .collect_vec();
1793
1794 let persist_client = self
1796 .persist
1797 .open(self.persist_location.clone())
1798 .await
1799 .unwrap();
1800 let persist_client = &persist_client;
1801 use futures::stream::{StreamExt, TryStreamExt};
1804 let this = &*self;
1805 let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
1806 .map(|data: Result<_, StorageError<Self::Timestamp>>| {
1807 let register_ts = register_ts.clone();
1808 async move {
1809 let (id, description, metadata) = data?;
1810
1811 debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
1816
1817 let since = if description.primary.is_some() {
1821 None
1822 } else {
1823 description.since.as_ref()
1824 };
1825
1826 let (write, mut since_handle) = this
1827 .open_data_handles(
1828 &id,
1829 metadata.data_shard,
1830 since,
1831 metadata.relation_desc.clone(),
1832 persist_client,
1833 )
1834 .await;
1835
1836 match description.data_source {
1845 DataSource::Introspection(_)
1846 | DataSource::IngestionExport { .. }
1847 | DataSource::Webhook
1848 | DataSource::Ingestion(_)
1849 | DataSource::Progress
1850 | DataSource::Other => {}
1851 DataSource::Sink { .. } => {}
1852 DataSource::Table => {
1853 let register_ts = register_ts.expect(
1854 "caller should have provided a register_ts when creating a table",
1855 );
1856 if since_handle.since().elements() == &[T::minimum()]
1857 && !migrated_storage_collections.contains(&id)
1858 {
1859 debug!("advancing {} to initial since of {:?}", id, register_ts);
1860 let token = since_handle.opaque();
1861 let _ = since_handle
1862 .compare_and_downgrade_since(
1863 &token,
1864 (&token, &Antichain::from_elem(register_ts.clone())),
1865 )
1866 .await;
1867 }
1868 }
1869 }
1870
1871 Ok::<_, StorageError<Self::Timestamp>>((
1872 id,
1873 description,
1874 write,
1875 since_handle,
1876 metadata,
1877 ))
1878 }
1879 })
1880 .buffer_unordered(50)
1882 .try_collect()
1896 .await?;
1897
1898 #[derive(Ord, PartialOrd, Eq, PartialEq)]
1900 enum DependencyOrder {
1901 Table(Reverse<GlobalId>),
1903 Collection(GlobalId),
1905 Sink(GlobalId),
1907 }
1908 to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
1909 DataSource::Table => DependencyOrder::Table(Reverse(*id)),
1910 DataSource::Sink { .. } => DependencyOrder::Sink(*id),
1911 _ => DependencyOrder::Collection(*id),
1912 });
1913
1914 let mut self_collections = self.collections.lock().expect("lock poisoned");
1917
1918 for (id, description, write_handle, since_handle, metadata) in to_register {
1919 let write_frontier = write_handle.upper();
1920 let data_shard_since = since_handle.since().clone();
1921
1922 let storage_dependencies =
1924 Self::determine_collection_dependencies(&*self_collections, id, &description)?;
1925
1926 let initial_since = match storage_dependencies
1928 .iter()
1929 .at_most_one()
1930 .expect("should have at most one dependency")
1931 {
1932 Some(dep) => {
1933 let dependency_collection = self_collections
1934 .get(dep)
1935 .ok_or(StorageError::IdentifierMissing(*dep))?;
1936 let dependency_since = dependency_collection.implied_capability.clone();
1937
1938 if PartialOrder::less_than(&data_shard_since, &dependency_since) {
1949 mz_ore::soft_assert_or_log!(
1968 write_frontier.elements() == &[T::minimum()]
1969 || write_frontier.is_empty()
1970 || PartialOrder::less_than(&dependency_since, write_frontier),
1971 "dependency ({dep}) since has advanced past dependent ({id}) upper \n
1972 dependent ({id}): since {:?}, upper {:?} \n
1973 dependency ({dep}): since {:?}",
1974 data_shard_since,
1975 write_frontier,
1976 dependency_since
1977 );
1978
1979 dependency_since
1980 } else {
1981 data_shard_since
1982 }
1983 }
1984 None => data_shard_since,
1985 };
1986
1987 let time_dependence = {
1989 use DataSource::*;
1990 if let Some(timeline) = &description.timeline
1991 && *timeline != Timeline::EpochMilliseconds
1992 {
1993 None
1995 } else {
1996 match &description.data_source {
1997 Ingestion(ingestion) => {
1998 use GenericSourceConnection::*;
1999 match ingestion.desc.connection {
2000 Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => {
2003 Some(TimeDependence::default())
2004 }
2005 LoadGenerator(_) => None,
2007 }
2008 }
2009 IngestionExport { ingestion_id, .. } => {
2010 let c = self_collections.get(ingestion_id).expect("known to exist");
2011 c.time_dependence.clone()
2012 }
2013 Introspection(_) | Progress | Table { .. } | Webhook { .. } => {
2015 Some(TimeDependence::default())
2016 }
2017 Other => None,
2019 Sink { .. } => None,
2020 }
2021 }
2022 };
2023
2024 let ingestion_remap_collection_id = match &description.data_source {
2025 DataSource::Ingestion(desc) => Some(desc.remap_collection_id),
2026 _ => None,
2027 };
2028
2029 let mut collection_state = CollectionState::new(
2030 description.primary,
2031 time_dependence,
2032 ingestion_remap_collection_id,
2033 initial_since,
2034 write_frontier.clone(),
2035 storage_dependencies,
2036 metadata.clone(),
2037 );
2038
2039 match &description.data_source {
2041 DataSource::Introspection(_) => {
2042 self_collections.insert(id, collection_state);
2043 }
2044 DataSource::Webhook => {
2045 self_collections.insert(id, collection_state);
2046 }
2047 DataSource::IngestionExport { .. } => {
2048 self_collections.insert(id, collection_state);
2049 }
2050 DataSource::Table => {
2051 if is_in_txns(id, &metadata)
2054 && PartialOrder::less_than(
2055 &collection_state.write_frontier,
2056 &self.initial_txn_upper,
2057 )
2058 {
2059 collection_state
2065 .write_frontier
2066 .clone_from(&self.initial_txn_upper);
2067 }
2068 self_collections.insert(id, collection_state);
2069 }
2070 DataSource::Progress | DataSource::Other => {
2071 self_collections.insert(id, collection_state);
2072 }
2073 DataSource::Ingestion(_) => {
2074 self_collections.insert(id, collection_state);
2075 }
2076 DataSource::Sink { .. } => {
2077 self_collections.insert(id, collection_state);
2078 }
2079 }
2080
2081 self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle);
2082
2083 self.install_collection_dependency_read_holds_inner(&mut *self_collections, id)?;
2085 }
2086
2087 drop(self_collections);
2088
2089 self.synchronize_finalized_shards(storage_metadata);
2090
2091 Ok(())
2092 }
2093
2094 async fn alter_table_desc(
2095 &self,
2096 existing_collection: GlobalId,
2097 new_collection: GlobalId,
2098 new_desc: RelationDesc,
2099 expected_version: RelationVersion,
2100 ) -> Result<(), StorageError<Self::Timestamp>> {
2101 let data_shard = {
2102 let self_collections = self.collections.lock().expect("lock poisoned");
2103 let existing = self_collections
2104 .get(&existing_collection)
2105 .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;
2106
2107 existing.collection_metadata.data_shard
2108 };
2109
2110 let persist_client = self
2111 .persist
2112 .open(self.persist_location.clone())
2113 .await
2114 .unwrap();
2115
2116 let diagnostics = Diagnostics {
2118 shard_name: existing_collection.to_string(),
2119 handle_purpose: "alter_table_desc".to_string(),
2120 };
2121 let expected_schema = expected_version.into();
2123 let schema_result = persist_client
2124 .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
2125 data_shard,
2126 expected_schema,
2127 &new_desc,
2128 &UnitSchema,
2129 diagnostics,
2130 )
2131 .await
2132 .map_err(|e| StorageError::InvalidUsage(e.to_string()))?;
2133 tracing::info!(
2134 ?existing_collection,
2135 ?new_collection,
2136 ?new_desc,
2137 "evolved schema"
2138 );
2139
2140 match schema_result {
2141 CaESchema::Ok(id) => id,
2142 CaESchema::ExpectedMismatch {
2144 schema_id,
2145 key,
2146 val,
2147 } => {
2148 mz_ore::soft_panic_or_log!(
2149 "schema expectation mismatch {schema_id:?}, {key:?}, {val:?}"
2150 );
2151 return Err(StorageError::Generic(anyhow::anyhow!(
2152 "schema expected mismatch, {existing_collection:?}",
2153 )));
2154 }
2155 CaESchema::Incompatible => {
2156 mz_ore::soft_panic_or_log!(
2157 "incompatible schema! {existing_collection} {new_desc:?}"
2158 );
2159 return Err(StorageError::Generic(anyhow::anyhow!(
2160 "schema incompatible, {existing_collection:?}"
2161 )));
2162 }
2163 };
2164
2165 let (write_handle, since_handle) = self
2167 .open_data_handles(
2168 &new_collection,
2169 data_shard,
2170 None,
2171 new_desc.clone(),
2172 &persist_client,
2173 )
2174 .await;
2175
2176 {
2182 let mut self_collections = self.collections.lock().expect("lock poisoned");
2183
2184 let existing = self_collections
2186 .get_mut(&existing_collection)
2187 .expect("existing collection missing");
2188
2189 assert_none!(existing.primary);
2191
2192 existing.primary = Some(new_collection);
2194 existing.storage_dependencies.push(new_collection);
2195
2196 let implied_capability = existing.read_capabilities.frontier().to_owned();
2200 let write_frontier = existing.write_frontier.clone();
2201
2202 let mut changes = ChangeBatch::new();
2209 changes.extend(implied_capability.iter().map(|t| (t.clone(), 1)));
2210
2211 let collection_meta = CollectionMetadata {
2213 persist_location: self.persist_location.clone(),
2214 relation_desc: new_desc.clone(),
2215 data_shard,
2216 txns_shard: Some(self.txns_read.txns_id().clone()),
2217 };
2218 let collection_state = CollectionState::new(
2219 None,
2220 existing.time_dependence.clone(),
2221 existing.ingestion_remap_collection_id.clone(),
2222 implied_capability,
2223 write_frontier,
2224 Vec::new(),
2225 collection_meta,
2226 );
2227
2228 self_collections.insert(new_collection, collection_state);
2230
2231 let mut updates = BTreeMap::from([(new_collection, changes)]);
2232 StorageCollectionsImpl::update_read_capabilities_inner(
2233 &self.cmd_tx,
2234 &mut *self_collections,
2235 &mut updates,
2236 );
2237 };
2238
2239 self.register_handles(new_collection, true, since_handle, write_handle);
2241
2242 info!(%existing_collection, %new_collection, ?new_desc, "altered table");
2243
2244 Ok(())
2245 }
2246
2247 fn drop_collections_unvalidated(
2248 &self,
2249 storage_metadata: &StorageMetadata,
2250 identifiers: Vec<GlobalId>,
2251 ) {
2252 debug!(?identifiers, "drop_collections_unvalidated");
2253
2254 let mut self_collections = self.collections.lock().expect("lock poisoned");
2255
2256 let mut finalized_policies = Vec::new();
2264
2265 for id in identifiers {
2266 let Some(collection) = self_collections.get(&id) else {
2268 continue;
2269 };
2270
2271 if collection.primary.is_none() {
2274 let metadata = storage_metadata.get_collection_shard::<T>(id);
2275 mz_ore::soft_assert_or_log!(
2276 matches!(metadata, Err(StorageError::IdentifierMissing(_))),
2277 "dropping {id}, but drop was not synchronized with storage \
2278 controller via `prepare_state`"
2279 );
2280 }
2281
2282 finalized_policies.push((id, ReadPolicy::ValidFrom(Antichain::new())));
2283 }
2284
2285 self.set_read_policies_inner(&mut self_collections, finalized_policies);
2286
2287 drop(self_collections);
2288
2289 self.synchronize_finalized_shards(storage_metadata);
2290 }
2291
2292 fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
2293 let mut collections = self.collections.lock().expect("lock poisoned");
2294
2295 if tracing::enabled!(tracing::Level::TRACE) {
2296 let user_capabilities = collections
2297 .iter_mut()
2298 .filter(|(id, _c)| id.is_user())
2299 .map(|(id, c)| {
2300 let updates = c.read_capabilities.updates().cloned().collect_vec();
2301 (*id, c.implied_capability.clone(), updates)
2302 })
2303 .collect_vec();
2304
2305 trace!(?policies, ?user_capabilities, "set_read_policies");
2306 }
2307
2308 self.set_read_policies_inner(&mut collections, policies);
2309
2310 if tracing::enabled!(tracing::Level::TRACE) {
2311 let user_capabilities = collections
2312 .iter_mut()
2313 .filter(|(id, _c)| id.is_user())
2314 .map(|(id, c)| {
2315 let updates = c.read_capabilities.updates().cloned().collect_vec();
2316 (*id, c.implied_capability.clone(), updates)
2317 })
2318 .collect_vec();
2319
2320 trace!(?user_capabilities, "after! set_read_policies");
2321 }
2322 }
2323
2324 fn acquire_read_holds(
2325 &self,
2326 desired_holds: Vec<GlobalId>,
2327 ) -> Result<Vec<ReadHold<Self::Timestamp>>, CollectionMissing> {
2328 if desired_holds.is_empty() {
2329 return Ok(vec![]);
2330 }
2331
2332 let mut collections = self.collections.lock().expect("lock poisoned");
2333
2334 let mut advanced_holds = Vec::new();
2335 for id in desired_holds.iter() {
2346 let collection = collections.get(id).ok_or(CollectionMissing(*id))?;
2347 let since = collection.read_capabilities.frontier().to_owned();
2348 advanced_holds.push((*id, since));
2349 }
2350
2351 let mut updates = advanced_holds
2352 .iter()
2353 .map(|(id, hold)| {
2354 let mut changes = ChangeBatch::new();
2355 changes.extend(hold.iter().map(|time| (time.clone(), 1)));
2356 (*id, changes)
2357 })
2358 .collect::<BTreeMap<_, _>>();
2359
2360 StorageCollectionsImpl::update_read_capabilities_inner(
2361 &self.cmd_tx,
2362 &mut collections,
2363 &mut updates,
2364 );
2365
2366 let acquired_holds = advanced_holds
2367 .into_iter()
2368 .map(|(id, since)| ReadHold::with_channel(id, since, self.holds_tx.clone()))
2369 .collect_vec();
2370
2371 trace!(?desired_holds, ?acquired_holds, "acquire_read_holds");
2372
2373 Ok(acquired_holds)
2374 }
2375
2376 fn determine_time_dependence(
2378 &self,
2379 id: GlobalId,
2380 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
2381 use TimeDependenceError::CollectionMissing;
2382 let collections = self.collections.lock().expect("lock poisoned");
2383 let state = collections.get(&id).ok_or(CollectionMissing(id))?;
2384 Ok(state.time_dependence.clone())
2385 }
2386
2387 fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2388 let Self {
2390 envd_epoch,
2391 read_only,
2392 finalizable_shards,
2393 finalized_shards,
2394 collections,
2395 txns_read: _,
2396 config,
2397 initial_txn_upper,
2398 persist_location,
2399 persist: _,
2400 cmd_tx: _,
2401 holds_tx: _,
2402 _background_task: _,
2403 _finalize_shards_task: _,
2404 } = self;
2405
2406 let finalizable_shards: Vec<_> = finalizable_shards
2407 .lock()
2408 .iter()
2409 .map(ToString::to_string)
2410 .collect();
2411 let finalized_shards: Vec<_> = finalized_shards
2412 .lock()
2413 .iter()
2414 .map(ToString::to_string)
2415 .collect();
2416 let collections: BTreeMap<_, _> = collections
2417 .lock()
2418 .expect("poisoned")
2419 .iter()
2420 .map(|(id, c)| (id.to_string(), format!("{c:?}")))
2421 .collect();
2422 let config = format!("{:?}", config.lock().expect("poisoned"));
2423
2424 Ok(serde_json::json!({
2425 "envd_epoch": envd_epoch,
2426 "read_only": read_only,
2427 "finalizable_shards": finalizable_shards,
2428 "finalized_shards": finalized_shards,
2429 "collections": collections,
2430 "config": config,
2431 "initial_txn_upper": initial_txn_upper,
2432 "persist_location": format!("{persist_location:?}"),
2433 }))
2434 }
2435}
2436
2437#[derive(Debug)]
2444enum SinceHandleWrapper<T>
2445where
2446 T: TimelyTimestamp + Lattice + Codec64,
2447{
2448 Critical(SinceHandle<SourceData, (), T, StorageDiff>),
2449 Leased(ReadHandle<SourceData, (), T, StorageDiff>),
2450}
2451
2452impl<T> SinceHandleWrapper<T>
2453where
2454 T: TimelyTimestamp + Lattice + Codec64 + TotalOrder + Sync,
2455{
2456 pub fn since(&self) -> &Antichain<T> {
2457 match self {
2458 Self::Critical(handle) => handle.since(),
2459 Self::Leased(handle) => handle.since(),
2460 }
2461 }
2462
2463 pub fn opaque(&self) -> PersistEpoch {
2464 match self {
2465 Self::Critical(handle) => handle.opaque().decode(),
2466 Self::Leased(_handle) => {
2467 PersistEpoch(None)
2472 }
2473 }
2474 }
2475
2476 pub async fn compare_and_downgrade_since(
2477 &mut self,
2478 expected: &PersistEpoch,
2479 (opaque, since): (&PersistEpoch, &Antichain<T>),
2480 ) -> Result<Antichain<T>, PersistEpoch> {
2481 match self {
2482 Self::Critical(handle) => handle
2483 .compare_and_downgrade_since(
2484 &Opaque::encode(expected),
2485 (&Opaque::encode(opaque), since),
2486 )
2487 .await
2488 .map_err(|e| e.decode()),
2489 Self::Leased(handle) => {
2490 assert_none!(opaque.0);
2491
2492 handle.downgrade_since(since).await;
2493
2494 Ok(since.clone())
2495 }
2496 }
2497 }
2498
2499 pub async fn maybe_compare_and_downgrade_since(
2500 &mut self,
2501 expected: &PersistEpoch,
2502 (opaque, since): (&PersistEpoch, &Antichain<T>),
2503 ) -> Option<Result<Antichain<T>, PersistEpoch>> {
2504 match self {
2505 Self::Critical(handle) => handle
2506 .maybe_compare_and_downgrade_since(
2507 &Opaque::encode(expected),
2508 (&Opaque::encode(opaque), since),
2509 )
2510 .await
2511 .map(|r| r.map_err(|o| o.decode())),
2512 Self::Leased(handle) => {
2513 assert_none!(opaque.0);
2514
2515 handle.maybe_downgrade_since(since).await;
2516
2517 Some(Ok(since.clone()))
2518 }
2519 }
2520 }
2521
2522 pub fn snapshot_stats(
2523 &self,
2524 id: GlobalId,
2525 as_of: Option<Antichain<T>>,
2526 ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2527 match self {
2528 Self::Critical(handle) => {
2529 let res = handle
2530 .snapshot_stats(as_of)
2531 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2532 Box::pin(res)
2533 }
2534 Self::Leased(handle) => {
2535 let res = handle
2536 .snapshot_stats(as_of)
2537 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2538 Box::pin(res)
2539 }
2540 }
2541 }
2542
2543 pub fn snapshot_stats_from_txn(
2544 &self,
2545 id: GlobalId,
2546 data_snapshot: DataSnapshot<T>,
2547 ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2548 match self {
2549 Self::Critical(handle) => Box::pin(
2550 data_snapshot
2551 .snapshot_stats_from_critical(handle)
2552 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2553 ),
2554 Self::Leased(handle) => Box::pin(
2555 data_snapshot
2556 .snapshot_stats_from_leased(handle)
2557 .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2558 ),
2559 }
2560 }
2561}
2562
2563#[derive(Debug, Clone)]
2565struct CollectionState<T> {
2566 primary: Option<GlobalId>,
2573
2574 time_dependence: Option<TimeDependence>,
2576 ingestion_remap_collection_id: Option<GlobalId>,
2578
2579 pub read_capabilities: MutableAntichain<T>,
2585
2586 pub implied_capability: Antichain<T>,
2590
2591 pub read_policy: ReadPolicy<T>,
2593
2594 pub storage_dependencies: Vec<GlobalId>,
2596
2597 pub write_frontier: Antichain<T>,
2599
2600 pub collection_metadata: CollectionMetadata,
2601}
2602
2603impl<T: TimelyTimestamp> CollectionState<T> {
2604 pub fn new(
2607 primary: Option<GlobalId>,
2608 time_dependence: Option<TimeDependence>,
2609 ingestion_remap_collection_id: Option<GlobalId>,
2610 since: Antichain<T>,
2611 write_frontier: Antichain<T>,
2612 storage_dependencies: Vec<GlobalId>,
2613 metadata: CollectionMetadata,
2614 ) -> Self {
2615 let mut read_capabilities = MutableAntichain::new();
2616 read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2617 Self {
2618 primary,
2619 time_dependence,
2620 ingestion_remap_collection_id,
2621 read_capabilities,
2622 implied_capability: since.clone(),
2623 read_policy: ReadPolicy::NoPolicy {
2624 initial_since: since,
2625 },
2626 storage_dependencies,
2627 write_frontier,
2628 collection_metadata: metadata,
2629 }
2630 }
2631
2632 pub fn is_dropped(&self) -> bool {
2634 self.read_capabilities.is_empty()
2635 }
2636}
2637
2638#[derive(Debug)]
2644struct BackgroundTask<T: TimelyTimestamp + Lattice + Codec64> {
2645 config: Arc<Mutex<StorageConfiguration>>,
2646 cmds_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
2647 cmds_rx: mpsc::UnboundedReceiver<BackgroundCmd<T>>,
2648 holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch<T>)>,
2649 finalizable_shards: Arc<ShardIdSet>,
2650 collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
2651 shard_by_id: BTreeMap<GlobalId, ShardId>,
2654 since_handles: BTreeMap<GlobalId, SinceHandleWrapper<T>>,
2655 txns_handle: Option<WriteHandle<SourceData, (), T, StorageDiff>>,
2656 txns_shards: BTreeSet<GlobalId>,
2657}
2658
2659#[derive(Debug)]
2660enum BackgroundCmd<T: TimelyTimestamp + Lattice + Codec64> {
2661 Register {
2662 id: GlobalId,
2663 is_in_txns: bool,
2664 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
2665 since_handle: SinceHandleWrapper<T>,
2666 },
2667 DowngradeSince(Vec<(GlobalId, Antichain<T>)>),
2668 SnapshotStats(
2669 GlobalId,
2670 SnapshotStatsAsOf<T>,
2671 oneshot::Sender<SnapshotStatsRes<T>>,
2672 ),
2673}
2674
2675pub(crate) struct SnapshotStatsRes<T>(BoxFuture<'static, Result<SnapshotStats, StorageError<T>>>);
2677
2678impl<T> Debug for SnapshotStatsRes<T> {
2679 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2680 f.debug_struct("SnapshotStatsRes").finish_non_exhaustive()
2681 }
2682}
2683
2684impl<T> BackgroundTask<T>
2685where
2686 T: TimelyTimestamp
2687 + Lattice
2688 + Codec64
2689 + From<EpochMillis>
2690 + TimestampManipulation
2691 + Into<mz_repr::Timestamp>
2692 + Sync,
2693{
2694 async fn run(&mut self) {
2695 let mut upper_futures: FuturesUnordered<
2697 std::pin::Pin<
2698 Box<
2699 dyn Future<
2700 Output = (
2701 GlobalId,
2702 WriteHandle<SourceData, (), T, StorageDiff>,
2703 Antichain<T>,
2704 ),
2705 > + Send,
2706 >,
2707 >,
2708 > = FuturesUnordered::new();
2709
2710 let gen_upper_future =
2711 |id, mut handle: WriteHandle<_, _, _, _>, prev_upper: Antichain<T>| {
2712 let fut = async move {
2713 soft_assert_or_log!(
2714 !prev_upper.is_empty(),
2715 "cannot await progress when upper is already empty"
2716 );
2717 handle.wait_for_upper_past(&prev_upper).await;
2718 let new_upper = handle.shared_upper();
2719 (id, handle, new_upper)
2720 };
2721
2722 fut
2723 };
2724
2725 let mut txns_upper_future = match self.txns_handle.take() {
2726 Some(txns_handle) => {
2727 let upper = txns_handle.upper().clone();
2728 let txns_upper_future =
2729 gen_upper_future(GlobalId::Transient(1), txns_handle, upper);
2730 txns_upper_future.boxed()
2731 }
2732 None => async { std::future::pending().await }.boxed(),
2733 };
2734
2735 loop {
2736 tokio::select! {
2737 (id, handle, upper) = &mut txns_upper_future => {
2738 trace!("new upper from txns shard: {:?}", upper);
2739 let mut uppers = Vec::new();
2740 for id in self.txns_shards.iter() {
2741 uppers.push((*id, &upper));
2742 }
2743 self.update_write_frontiers(&uppers).await;
2744
2745 let fut = gen_upper_future(id, handle, upper);
2746 txns_upper_future = fut.boxed();
2747 }
2748 Some((id, handle, upper)) = upper_futures.next() => {
2749 if id.is_user() {
2750 trace!("new upper for collection {id}: {:?}", upper);
2751 }
2752 let current_shard = self.shard_by_id.get(&id);
2753 if let Some(shard_id) = current_shard {
2754 if shard_id == &handle.shard_id() {
2755 let uppers = &[(id, &upper)];
2758 self.update_write_frontiers(uppers).await;
2759 if !upper.is_empty() {
2760 let fut = gen_upper_future(id, handle, upper);
2761 upper_futures.push(fut.boxed());
2762 }
2763 } else {
2764 handle.expire().await;
2768 }
2769 }
2770 }
2771 cmd = self.cmds_rx.recv() => {
2772 let Some(cmd) = cmd else {
2773 break;
2775 };
2776
2777 let commands = iter::once(cmd).chain(
2781 iter::from_fn(|| self.cmds_rx.try_recv().ok())
2782 );
2783 let mut downgrades = BTreeMap::<_, Antichain<_>>::new();
2784 for cmd in commands {
2785 match cmd {
2786 BackgroundCmd::Register{
2787 id,
2788 is_in_txns,
2789 write_handle,
2790 since_handle
2791 } => {
2792 debug!("registering handles for {}", id);
2793 let previous = self.shard_by_id.insert(id, write_handle.shard_id());
2794 if previous.is_some() {
2795 panic!("already registered a WriteHandle for collection {id}");
2796 }
2797
2798 let previous = self.since_handles.insert(id, since_handle);
2799 if previous.is_some() {
2800 panic!("already registered a SinceHandle for collection {id}");
2801 }
2802
2803 if is_in_txns {
2804 self.txns_shards.insert(id);
2805 } else {
2806 let upper = write_handle.upper().clone();
2807 if !upper.is_empty() {
2808 let fut = gen_upper_future(id, write_handle, upper);
2809 upper_futures.push(fut.boxed());
2810 }
2811 }
2812 }
2813 BackgroundCmd::DowngradeSince(cmds) => {
2814 for (id, new) in cmds {
2815 downgrades.entry(id)
2816 .and_modify(|since| since.join_assign(&new))
2817 .or_insert(new);
2818 }
2819 }
2820 BackgroundCmd::SnapshotStats(id, as_of, tx) => {
2821 let res = match self.since_handles.get(&id) {
2827 Some(x) => {
2828 let fut: BoxFuture<
2829 'static,
2830 Result<SnapshotStats, StorageError<T>>,
2831 > = match as_of {
2832 SnapshotStatsAsOf::Direct(as_of) => {
2833 x.snapshot_stats(id, Some(as_of))
2834 }
2835 SnapshotStatsAsOf::Txns(data_snapshot) => {
2836 x.snapshot_stats_from_txn(id, data_snapshot)
2837 }
2838 };
2839 SnapshotStatsRes(fut)
2840 }
2841 None => SnapshotStatsRes(Box::pin(futures::future::ready(Err(
2842 StorageError::IdentifierMissing(id),
2843 )))),
2844 };
2845 let _ = tx.send(res);
2847 }
2848 }
2849 }
2850
2851 if !downgrades.is_empty() {
2852 self.downgrade_sinces(downgrades).await;
2853 }
2854 }
2855 Some(holds_changes) = self.holds_rx.recv() => {
2856 let mut batched_changes = BTreeMap::new();
2857 batched_changes.insert(holds_changes.0, holds_changes.1);
2858
2859 while let Ok(mut holds_changes) = self.holds_rx.try_recv() {
2860 let entry = batched_changes.entry(holds_changes.0);
2861 entry
2862 .and_modify(|existing| existing.extend(holds_changes.1.drain()))
2863 .or_insert_with(|| holds_changes.1);
2864 }
2865
2866 let mut collections = self.collections.lock().expect("lock poisoned");
2867
2868 let user_changes = batched_changes
2869 .iter()
2870 .filter(|(id, _c)| id.is_user())
2871 .map(|(id, c)| {
2872 (id.clone(), c.clone())
2873 })
2874 .collect_vec();
2875
2876 if !user_changes.is_empty() {
2877 trace!(?user_changes, "applying holds changes from channel");
2878 }
2879
2880 StorageCollectionsImpl::update_read_capabilities_inner(
2881 &self.cmds_tx,
2882 &mut collections,
2883 &mut batched_changes,
2884 );
2885 }
2886 }
2887 }
2888
2889 warn!("BackgroundTask shutting down");
2890 }
2891
2892 #[instrument(level = "debug")]
2893 async fn update_write_frontiers(&self, updates: &[(GlobalId, &Antichain<T>)]) {
2894 let mut read_capability_changes = BTreeMap::default();
2895
2896 let mut self_collections = self.collections.lock().expect("lock poisoned");
2897
2898 for (id, new_upper) in updates.iter() {
2899 let collection = if let Some(c) = self_collections.get_mut(id) {
2900 c
2901 } else {
2902 trace!(
2903 "Reference to absent collection {id}, due to concurrent removal of that collection"
2904 );
2905 continue;
2906 };
2907
2908 if PartialOrder::less_than(&collection.write_frontier, *new_upper) {
2909 collection.write_frontier.clone_from(new_upper);
2910 }
2911
2912 let mut new_read_capability = collection
2913 .read_policy
2914 .frontier(collection.write_frontier.borrow());
2915
2916 if id.is_user() {
2917 trace!(
2918 %id,
2919 implied_capability = ?collection.implied_capability,
2920 policy = ?collection.read_policy,
2921 write_frontier = ?collection.write_frontier,
2922 ?new_read_capability,
2923 "update_write_frontiers");
2924 }
2925
2926 if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
2927 let mut update = ChangeBatch::new();
2928 update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
2929 std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
2930 update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
2931
2932 if !update.is_empty() {
2933 read_capability_changes.insert(*id, update);
2934 }
2935 }
2936 }
2937
2938 if !read_capability_changes.is_empty() {
2939 StorageCollectionsImpl::update_read_capabilities_inner(
2940 &self.cmds_tx,
2941 &mut self_collections,
2942 &mut read_capability_changes,
2943 );
2944 }
2945 }
2946
2947 async fn downgrade_sinces(&mut self, cmds: BTreeMap<GlobalId, Antichain<T>>) {
2948 let mut futures = Vec::with_capacity(cmds.len());
2950 for (id, new_since) in cmds {
2951 let Some(mut since_handle) = self.since_handles.remove(&id) else {
2954 trace!("downgrade_sinces: reference to absent collection {id}");
2956 continue;
2957 };
2958
2959 let fut = async move {
2960 if id.is_user() {
2961 trace!("downgrading since of {} to {:?}", id, new_since);
2962 }
2963
2964 let epoch = since_handle.opaque().clone();
2965 let result = if new_since.is_empty() {
2966 Some(
2970 since_handle
2971 .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2972 .await,
2973 )
2974 } else {
2975 since_handle
2976 .maybe_compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2977 .await
2978 };
2979 (id, since_handle, result)
2980 };
2981 futures.push(fut);
2982 }
2983
2984 for (id, since_handle, result) in futures::future::join_all(futures).await {
2985 let new_since = match result {
2986 Some(Ok(since)) => Some(since),
2987 Some(Err(other_epoch)) => mz_ore::halt!(
2988 "fenced by envd @ {other_epoch:?}. ours = {:?}",
2989 since_handle.opaque(),
2990 ),
2991 None => None,
2992 };
2993
2994 self.since_handles.insert(id, since_handle);
2995
2996 if new_since.is_some_and(|s| s.is_empty()) {
2997 info!(%id, "removing persist handles because the since advanced to []!");
2998
2999 let _since_handle = self.since_handles.remove(&id).expect("known to exist");
3000 let Some(dropped_shard_id) = self.shard_by_id.remove(&id) else {
3001 panic!("missing GlobalId -> ShardId mapping for id {id}");
3002 };
3003
3004 self.txns_shards.remove(&id);
3009
3010 if self
3011 .config
3012 .lock()
3013 .expect("lock poisoned")
3014 .parameters
3015 .finalize_shards
3016 {
3017 info!(
3018 %id, %dropped_shard_id,
3019 "enqueuing shard finalization due to dropped collection and dropped \
3020 persist handle",
3021 );
3022 self.finalizable_shards.lock().insert(dropped_shard_id);
3023 } else {
3024 info!(
3025 "not triggering shard finalization due to dropped storage object \
3026 because enable_storage_shard_finalization parameter is false"
3027 );
3028 }
3029 }
3030 }
3031 }
3032}
3033
3034struct FinalizeShardsTaskConfig {
3035 envd_epoch: NonZeroI64,
3036 config: Arc<Mutex<StorageConfiguration>>,
3037 metrics: StorageCollectionsMetrics,
3038 finalizable_shards: Arc<ShardIdSet>,
3039 finalized_shards: Arc<ShardIdSet>,
3040 persist_location: PersistLocation,
3041 persist: Arc<PersistClientCache>,
3042 read_only: bool,
3043}
3044
3045async fn finalize_shards_task<T>(
3046 FinalizeShardsTaskConfig {
3047 envd_epoch,
3048 config,
3049 metrics,
3050 finalizable_shards,
3051 finalized_shards,
3052 persist_location,
3053 persist,
3054 read_only,
3055 }: FinalizeShardsTaskConfig,
3056) where
3057 T: TimelyTimestamp + TotalOrder + Lattice + Codec64 + Sync,
3058{
3059 if read_only {
3060 info!("disabling shard finalization in read only mode");
3061 return;
3062 }
3063
3064 let mut interval = tokio::time::interval(Duration::from_secs(5));
3065 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
3066 loop {
3067 interval.tick().await;
3068
3069 if !config
3070 .lock()
3071 .expect("lock poisoned")
3072 .parameters
3073 .finalize_shards
3074 {
3075 debug!(
3076 "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3077 );
3078 continue;
3079 }
3080
3081 let current_finalizable_shards = {
3082 finalizable_shards.lock().iter().cloned().collect_vec()
3085 };
3086
3087 if current_finalizable_shards.is_empty() {
3088 debug!("no shards to finalize");
3089 continue;
3090 }
3091
3092 debug!(?current_finalizable_shards, "attempting to finalize shards");
3093
3094 let persist_client = persist.open(persist_location.clone()).await.unwrap();
3096
3097 let metrics = &metrics;
3098 let finalizable_shards = &finalizable_shards;
3099 let finalized_shards = &finalized_shards;
3100 let persist_client = &persist_client;
3101 let diagnostics = &Diagnostics::from_purpose("finalizing shards");
3102
3103 let force_downgrade_since = STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION
3104 .get(config.lock().expect("lock poisoned").config_set());
3105
3106 let epoch = &PersistEpoch::from(envd_epoch);
3107
3108 futures::stream::iter(current_finalizable_shards.clone())
3109 .map(|shard_id| async move {
3110 let persist_client = persist_client.clone();
3111 let diagnostics = diagnostics.clone();
3112 let epoch = epoch.clone();
3113
3114 metrics.finalization_started.inc();
3115
3116 let is_finalized = persist_client
3117 .is_finalized::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
3118 .await
3119 .expect("invalid persist usage");
3120
3121 if is_finalized {
3122 debug!(%shard_id, "shard is already finalized!");
3123 Some(shard_id)
3124 } else {
3125 debug!(%shard_id, "finalizing shard");
3126 let finalize = || async move {
3127 let diagnostics = Diagnostics::from_purpose("finalizing shards");
3129
3130 let mut write_handle: WriteHandle<SourceData, (), T, StorageDiff> =
3133 persist_client
3134 .open_writer(
3135 shard_id,
3136 Arc::new(RelationDesc::empty()),
3137 Arc::new(UnitSchema),
3138 diagnostics,
3139 )
3140 .await
3141 .expect("invalid persist usage");
3142 write_handle.advance_upper(&Antichain::new()).await;
3143 write_handle.expire().await;
3144
3145 if force_downgrade_since {
3146 let our_opaque = Opaque::encode(&epoch);
3147 let mut since_handle: SinceHandle<SourceData, (), T, StorageDiff> =
3148 persist_client
3149 .open_critical_since(
3150 shard_id,
3151 PersistClient::CONTROLLER_CRITICAL_SINCE,
3152 our_opaque.clone(),
3153 Diagnostics::from_purpose("finalizing shards"),
3154 )
3155 .await
3156 .expect("invalid persist usage");
3157 let handle_opaque = since_handle.opaque().clone();
3158 let opaque = if our_opaque.codec_name() == handle_opaque.codec_name()
3159 && epoch.0 > handle_opaque.decode::<PersistEpoch>().0
3160 {
3161 handle_opaque
3164 } else {
3165 our_opaque
3171 };
3172 let new_since = Antichain::new();
3173 let downgrade = since_handle
3174 .compare_and_downgrade_since(&opaque, (&opaque, &new_since))
3175 .await;
3176 if let Err(e) = downgrade {
3177 warn!("tried to finalize a shard with an advancing epoch: {e:?}");
3178 return Ok(());
3179 }
3180 }
3183
3184 persist_client
3185 .finalize_shard::<SourceData, (), T, StorageDiff>(
3186 shard_id,
3187 Diagnostics::from_purpose("finalizing shards"),
3188 )
3189 .await
3190 };
3191
3192 match finalize().await {
3193 Err(e) => {
3194 warn!("error during finalization of shard {shard_id}: {e:?}");
3197 None
3198 }
3199 Ok(()) => {
3200 debug!(%shard_id, "finalize success!");
3201 Some(shard_id)
3202 }
3203 }
3204 }
3205 })
3206 .buffer_unordered(10)
3211 .for_each(|shard_id| async move {
3215 match shard_id {
3216 None => metrics.finalization_failed.inc(),
3217 Some(shard_id) => {
3218 {
3225 let mut finalizable_shards = finalizable_shards.lock();
3226 let mut finalized_shards = finalized_shards.lock();
3227 finalizable_shards.remove(&shard_id);
3228 finalized_shards.insert(shard_id);
3229 }
3230
3231 metrics.finalization_succeeded.inc();
3232 }
3233 }
3234 })
3235 .await;
3236
3237 debug!("done finalizing shards");
3238 }
3239}
3240
3241#[derive(Debug)]
3242pub(crate) enum SnapshotStatsAsOf<T: TimelyTimestamp + Lattice + Codec64> {
3243 Direct(Antichain<T>),
3246 Txns(DataSnapshot<T>),
3249}
3250
3251#[cfg(test)]
3252mod tests {
3253 use std::str::FromStr;
3254 use std::sync::Arc;
3255
3256 use mz_build_info::DUMMY_BUILD_INFO;
3257 use mz_dyncfg::ConfigSet;
3258 use mz_ore::assert_err;
3259 use mz_ore::metrics::{MetricsRegistry, UIntGauge};
3260 use mz_ore::now::SYSTEM_TIME;
3261 use mz_ore::url::SensitiveUrl;
3262 use mz_persist_client::cache::PersistClientCache;
3263 use mz_persist_client::cfg::PersistConfig;
3264 use mz_persist_client::rpc::PubSubClientConnection;
3265 use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
3266 use mz_persist_types::codec_impls::UnitSchema;
3267 use mz_repr::{RelationDesc, Row};
3268 use mz_secrets::InMemorySecretsController;
3269
3270 use super::*;
3271
3272 #[mz_ore::test(tokio::test)]
3273 #[cfg_attr(miri, ignore)] async fn test_snapshot_stats(&self) {
3275 let persist_location = PersistLocation {
3276 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3277 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3278 };
3279 let persist_client = PersistClientCache::new(
3280 PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
3281 &MetricsRegistry::new(),
3282 |_, _| PubSubClientConnection::noop(),
3283 );
3284 let persist_client = Arc::new(persist_client);
3285
3286 let (cmds_tx, mut background_task) =
3287 BackgroundTask::new_for_test(persist_location.clone(), Arc::clone(&persist_client));
3288 let background_task =
3289 mz_ore::task::spawn(|| "storage_collections::background_task", async move {
3290 background_task.run().await
3291 });
3292
3293 let persist = persist_client.open(persist_location).await.unwrap();
3294
3295 let shard_id = ShardId::new();
3296 let since_handle = persist
3297 .open_critical_since(
3298 shard_id,
3299 PersistClient::CONTROLLER_CRITICAL_SINCE,
3300 Opaque::encode(&PersistEpoch::default()),
3301 Diagnostics::for_tests(),
3302 )
3303 .await
3304 .unwrap();
3305 let write_handle = persist
3306 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3307 shard_id,
3308 Arc::new(RelationDesc::empty()),
3309 Arc::new(UnitSchema),
3310 Diagnostics::for_tests(),
3311 )
3312 .await
3313 .unwrap();
3314
3315 cmds_tx
3316 .send(BackgroundCmd::Register {
3317 id: GlobalId::User(1),
3318 is_in_txns: false,
3319 since_handle: SinceHandleWrapper::Critical(since_handle),
3320 write_handle,
3321 })
3322 .unwrap();
3323
3324 let mut write_handle = persist
3325 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3326 shard_id,
3327 Arc::new(RelationDesc::empty()),
3328 Arc::new(UnitSchema),
3329 Diagnostics::for_tests(),
3330 )
3331 .await
3332 .unwrap();
3333
3334 let stats =
3336 snapshot_stats(&cmds_tx, GlobalId::User(2), Antichain::from_elem(0.into())).await;
3337 assert_err!(stats);
3338
3339 let stats_fut = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3341 assert_none!(stats_fut.now_or_never());
3342
3343 let stats_ts1_fut =
3345 snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3346
3347 let data = (
3349 (SourceData(Ok(Row::default())), ()),
3350 mz_repr::Timestamp::from(0),
3351 1i64,
3352 );
3353 let () = write_handle
3354 .compare_and_append(
3355 &[data],
3356 Antichain::from_elem(0.into()),
3357 Antichain::from_elem(1.into()),
3358 )
3359 .await
3360 .unwrap()
3361 .unwrap();
3362
3363 let stats = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(0.into()))
3365 .await
3366 .unwrap();
3367 assert_eq!(stats.num_updates, 1);
3368
3369 let data = (
3371 (SourceData(Ok(Row::default())), ()),
3372 mz_repr::Timestamp::from(1),
3373 1i64,
3374 );
3375 let () = write_handle
3376 .compare_and_append(
3377 &[data],
3378 Antichain::from_elem(1.into()),
3379 Antichain::from_elem(2.into()),
3380 )
3381 .await
3382 .unwrap()
3383 .unwrap();
3384
3385 let stats = stats_ts1_fut.await.unwrap();
3386 assert_eq!(stats.num_updates, 2);
3387
3388 drop(background_task);
3390 }
3391
3392 async fn snapshot_stats<T: TimelyTimestamp + Lattice + Codec64>(
3393 cmds_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
3394 id: GlobalId,
3395 as_of: Antichain<T>,
3396 ) -> Result<SnapshotStats, StorageError<T>> {
3397 let (tx, rx) = oneshot::channel();
3398 cmds_tx
3399 .send(BackgroundCmd::SnapshotStats(
3400 id,
3401 SnapshotStatsAsOf::Direct(as_of),
3402 tx,
3403 ))
3404 .unwrap();
3405 let res = rx.await.expect("BackgroundTask should be live").0;
3406
3407 res.await
3408 }
3409
3410 impl<T: TimelyTimestamp + Lattice + Codec64> BackgroundTask<T> {
3411 fn new_for_test(
3412 _persist_location: PersistLocation,
3413 _persist_client: Arc<PersistClientCache>,
3414 ) -> (mpsc::UnboundedSender<BackgroundCmd<T>>, Self) {
3415 let (cmds_tx, cmds_rx) = mpsc::unbounded_channel();
3416 let (_holds_tx, holds_rx) = mpsc::unbounded_channel();
3417 let connection_context =
3418 ConnectionContext::for_tests(Arc::new(InMemorySecretsController::new()));
3419
3420 let task = Self {
3421 config: Arc::new(Mutex::new(StorageConfiguration::new(
3422 connection_context,
3423 ConfigSet::default(),
3424 ))),
3425 cmds_tx: cmds_tx.clone(),
3426 cmds_rx,
3427 holds_rx,
3428 finalizable_shards: Arc::new(ShardIdSet::new(
3429 UIntGauge::new("finalizable_shards", "dummy gauge for tests").unwrap(),
3430 )),
3431 collections: Arc::new(Mutex::new(BTreeMap::new())),
3432 shard_by_id: BTreeMap::new(),
3433 since_handles: BTreeMap::new(),
3434 txns_handle: None,
3435 txns_shards: BTreeSet::new(),
3436 };
3437
3438 (cmds_tx, task)
3439 }
3440 }
3441}