1use std::cell::RefCell;
80use std::collections::{BTreeMap, BTreeSet};
81use std::fmt;
82use std::rc::Rc;
83
84use mz_compute_types::dataflows::DataflowDescription;
85use mz_compute_types::plan::Plan;
86use mz_ore::collections::CollectionExt;
87use mz_ore::soft_panic_or_log;
88use mz_repr::{GlobalId, TimestampManipulation};
89use mz_storage_client::storage_collections::StorageCollections;
90use mz_storage_types::read_holds::ReadHold;
91use mz_storage_types::read_policy::ReadPolicy;
92use timely::PartialOrder;
93use timely::progress::{Antichain, Timestamp};
94use tracing::{info, warn};
95
96pub fn run<T: TimestampManipulation>(
102 dataflows: &mut [DataflowDescription<Plan<T>, (), T>],
103 read_policies: &BTreeMap<GlobalId, ReadPolicy<T>>,
104 storage_collections: &dyn StorageCollections<Timestamp = T>,
105 current_time: T,
106 read_only_mode: bool,
107) -> BTreeMap<GlobalId, ReadHold<T>> {
108 let mut storage_read_holds = BTreeMap::new();
111 for dataflow in &*dataflows {
112 for id in dataflow.source_imports.keys() {
113 if !storage_read_holds.contains_key(id) {
114 let read_hold = storage_collections
115 .acquire_read_holds(vec![*id])
116 .expect("storage collection exists")
117 .into_element();
118 storage_read_holds.insert(*id, read_hold);
119 }
120 }
121 }
122
123 let mut ctx = Context::new(dataflows, storage_collections, read_policies, current_time);
124
125 ctx.prune_sealed_persist_sinks();
129
130 if read_only_mode {
135 ctx.prune_dropped_collections();
136 }
137
138 ctx.apply_upstream_storage_constraints(&storage_read_holds);
140 ctx.apply_downstream_storage_constraints();
141
142 ctx.apply_warmup_constraints();
158
159 ctx.apply_index_read_policy_constraints();
161
162 ctx.apply_index_current_time_constraints();
165
166 for dataflow in dataflows {
168 let first_export = dataflow.export_ids().next();
171 let as_of = first_export.map_or_else(Antichain::new, |id| ctx.best_as_of(id));
172 dataflow.as_of = Some(as_of);
173 }
174
175 storage_read_holds
176}
177
178#[derive(Debug)]
180struct AsOfBounds<T> {
181 lower: Antichain<T>,
182 upper: Antichain<T>,
183 sealed: bool,
185}
186
187impl<T: Clone> AsOfBounds<T> {
188 fn single(frontier: Antichain<T>) -> Self {
190 Self {
191 lower: frontier.clone(),
192 upper: frontier,
193 sealed: false,
194 }
195 }
196
197 fn get(&self, type_: BoundType) -> &Antichain<T> {
199 match type_ {
200 BoundType::Lower => &self.lower,
201 BoundType::Upper => &self.upper,
202 }
203 }
204}
205
206impl<T: Timestamp> Default for AsOfBounds<T> {
207 fn default() -> Self {
208 Self {
209 lower: Antichain::from_elem(T::minimum()),
210 upper: Antichain::new(),
211 sealed: false,
212 }
213 }
214}
215
216impl<T: fmt::Debug> fmt::Display for AsOfBounds<T> {
217 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218 write!(
219 f,
220 "[{:?} .. {:?}]",
221 self.lower.elements(),
222 self.upper.elements()
223 )
224 }
225}
226
227#[derive(Clone, Copy, Debug, PartialEq, Eq)]
229enum BoundType {
230 Lower,
231 Upper,
232}
233
234impl fmt::Display for BoundType {
235 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236 match self {
237 Self::Lower => f.write_str("lower"),
238 Self::Upper => f.write_str("upper"),
239 }
240 }
241}
242
243#[derive(Clone, Copy, Debug, PartialEq, Eq)]
245enum ConstraintType {
246 Hard,
249 Soft,
252}
253
254#[derive(Debug)]
256struct Constraint<'a, T> {
257 type_: ConstraintType,
258 bound_type: BoundType,
260 frontier: &'a Antichain<T>,
262 reason: &'a str,
266}
267
268impl<T: Timestamp> Constraint<'_, T> {
269 fn apply(&self, bounds: &mut AsOfBounds<T>) -> Result<bool, bool> {
279 if bounds.sealed {
280 return Ok(false);
281 }
282
283 match self.bound_type {
284 BoundType::Lower => {
285 if PartialOrder::less_than(&bounds.upper, self.frontier) {
286 bounds.sealed = true;
287 if PartialOrder::less_than(&bounds.lower, &bounds.upper) {
288 bounds.lower.clone_from(&bounds.upper);
289 Err(true)
290 } else {
291 Err(false)
292 }
293 } else if PartialOrder::less_equal(self.frontier, &bounds.lower) {
294 Ok(false)
295 } else {
296 bounds.lower.clone_from(self.frontier);
297 Ok(true)
298 }
299 }
300 BoundType::Upper => {
301 if PartialOrder::less_than(self.frontier, &bounds.lower) {
302 bounds.sealed = true;
303 if PartialOrder::less_than(&bounds.lower, &bounds.upper) {
304 bounds.upper.clone_from(&bounds.lower);
305 Err(true)
306 } else {
307 Err(false)
308 }
309 } else if PartialOrder::less_equal(&bounds.upper, self.frontier) {
310 Ok(false)
311 } else {
312 bounds.upper.clone_from(self.frontier);
313 Ok(true)
314 }
315 }
316 }
317 }
318}
319
320struct Collection<'a, T> {
322 storage_inputs: Vec<GlobalId>,
323 compute_inputs: Vec<GlobalId>,
324 read_policy: Option<&'a ReadPolicy<T>>,
325 bounds: Rc<RefCell<AsOfBounds<T>>>,
329 is_index: bool,
331}
332
333struct Context<'a, T> {
335 collections: BTreeMap<GlobalId, Collection<'a, T>>,
336 storage_collections: &'a dyn StorageCollections<Timestamp = T>,
337 current_time: T,
338}
339
340impl<'a, T: TimestampManipulation> Context<'a, T> {
341 fn new(
343 dataflows: &[DataflowDescription<Plan<T>, (), T>],
344 storage_collections: &'a dyn StorageCollections<Timestamp = T>,
345 read_policies: &'a BTreeMap<GlobalId, ReadPolicy<T>>,
346 current_time: T,
347 ) -> Self {
348 let mut collections = BTreeMap::new();
351 for dataflow in dataflows {
352 let storage_inputs: Vec<_> = dataflow.source_imports.keys().copied().collect();
353 let compute_inputs: Vec<_> = dataflow.index_imports.keys().copied().collect();
354
355 let bounds = match dataflow.as_of.clone() {
356 Some(frontier) => AsOfBounds::single(frontier),
357 None => AsOfBounds::default(),
358 };
359 let bounds = Rc::new(RefCell::new(bounds));
360
361 for id in dataflow.export_ids() {
362 let collection = Collection {
363 storage_inputs: storage_inputs.clone(),
364 compute_inputs: compute_inputs.clone(),
365 read_policy: read_policies.get(&id),
366 bounds: Rc::clone(&bounds),
367 is_index: dataflow.index_exports.contains_key(&id),
368 };
369 collections.insert(id, collection);
370 }
371 }
372
373 Self {
374 collections,
375 storage_collections,
376 current_time,
377 }
378 }
379
380 fn expect_collection(&self, id: GlobalId) -> &Collection<'_, T> {
386 self.collections
387 .get(&id)
388 .unwrap_or_else(|| panic!("collection missing: {id}"))
389 }
390
391 fn apply_constraint(&self, id: GlobalId, constraint: Constraint<T>) -> bool {
395 let collection = self.expect_collection(id);
396 let mut bounds = collection.bounds.borrow_mut();
397 match constraint.apply(&mut bounds) {
398 Ok(changed) => {
399 if changed {
400 info!(%id, %bounds, reason = %constraint.reason, "applied as-of constraint");
401 }
402 changed
403 }
404 Err(changed) => {
405 match constraint.type_ {
406 ConstraintType::Hard => {
407 soft_panic_or_log!(
408 "failed to apply hard as-of constraint \
409 (id={id}, bounds={bounds}, constraint={constraint:?})"
410 );
411 }
412 ConstraintType::Soft => {
413 info!(%id, %bounds, ?constraint, "failed to apply soft as-of constraint");
414 }
415 }
416 changed
417 }
418 }
419 }
420
421 fn apply_upstream_storage_constraints(
429 &self,
430 storage_read_holds: &BTreeMap<GlobalId, ReadHold<T>>,
431 ) {
432 for (id, collection) in &self.collections {
434 for input_id in &collection.storage_inputs {
435 let read_hold = &storage_read_holds[input_id];
436 let constraint = Constraint {
437 type_: ConstraintType::Hard,
438 bound_type: BoundType::Lower,
439 frontier: read_hold.since(),
440 reason: &format!("storage input {input_id} read frontier"),
441 };
442 self.apply_constraint(*id, constraint);
443 }
444 }
445
446 self.propagate_bounds_downstream(BoundType::Lower);
448 }
449
450 fn apply_downstream_storage_constraints(&self) {
476 for id in self.collections.keys() {
478 let Ok(frontiers) = self.storage_collections.collection_frontiers(*id) else {
479 continue;
480 };
481
482 let collection_empty =
483 PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
484 let upper = if collection_empty {
485 frontiers.read_capabilities
486 } else {
487 Antichain::from_iter(
488 frontiers
489 .write_frontier
490 .iter()
491 .map(|t| t.step_back().unwrap_or_else(T::minimum)),
492 )
493 };
494
495 let constraint = Constraint {
496 type_: ConstraintType::Hard,
497 bound_type: BoundType::Upper,
498 frontier: &upper,
499 reason: &format!("storage export {id} write frontier"),
500 };
501 self.apply_constraint(*id, constraint);
502 }
503
504 self.propagate_bounds_upstream(BoundType::Upper);
506 }
507
508 fn apply_warmup_constraints(&self) {
517 let mut write_frontiers = BTreeMap::new();
519 for (id, collection) in &self.collections {
520 let storage_frontiers = self
521 .storage_collections
522 .collections_frontiers(collection.storage_inputs.clone())
523 .expect("storage collections exist");
524
525 let mut write_frontier = Antichain::new();
526 for frontiers in storage_frontiers {
527 write_frontier.extend(frontiers.write_frontier);
528 }
529
530 write_frontiers.insert(*id, write_frontier);
531 }
532
533 fixpoint(|changed| {
535 for (id, collection) in &self.collections {
536 let mut write_frontier = write_frontiers.remove(id).expect("inserted above");
537 for input_id in &collection.compute_inputs {
538 let frontier = &write_frontiers[input_id];
539 *changed |= write_frontier.extend(frontier.iter().cloned());
540 }
541 write_frontiers.insert(*id, write_frontier);
542 }
543 });
544
545 for (id, write_frontier) in write_frontiers {
547 let upper = step_back_frontier(&write_frontier);
548 let constraint = Constraint {
549 type_: ConstraintType::Soft,
550 bound_type: BoundType::Upper,
551 frontier: &upper,
552 reason: &format!(
553 "warmup frontier derived from storage write frontier {:?}",
554 write_frontier.elements()
555 ),
556 };
557 self.apply_constraint(id, constraint);
558 }
559
560 self.propagate_bounds_upstream(BoundType::Upper);
562 }
563
564 fn apply_index_read_policy_constraints(&self) {
574 let mut write_frontiers = BTreeMap::new();
581 for (id, collection) in &self.collections {
582 let storage_frontiers = self
583 .storage_collections
584 .collections_frontiers(collection.storage_inputs.clone())
585 .expect("storage collections exist");
586
587 let mut write_frontier = Antichain::new();
588 for frontiers in storage_frontiers {
589 write_frontier.extend(frontiers.write_frontier);
590 }
591
592 write_frontiers.insert(*id, write_frontier);
593 }
594
595 fixpoint(|changed| {
597 for (id, collection) in &self.collections {
598 let mut write_frontier = write_frontiers.remove(id).expect("inserted above");
599 for input_id in &collection.compute_inputs {
600 let frontier = &write_frontiers[input_id];
601 *changed |= write_frontier.extend(frontier.iter().cloned());
602 }
603 write_frontiers.insert(*id, write_frontier);
604 }
605 });
606
607 for (id, collection) in &self.collections {
609 if let (true, Some(read_policy)) = (collection.is_index, &collection.read_policy) {
610 let mut write_frontier = write_frontiers.remove(id).expect("inserted above");
611 if write_frontier.is_empty() {
612 write_frontier = Antichain::from_elem(self.current_time.clone());
613 }
614 let upper = read_policy.frontier(write_frontier.borrow());
615 let constraint = Constraint {
616 type_: ConstraintType::Soft,
617 bound_type: BoundType::Upper,
618 frontier: &upper,
619 reason: &format!(
620 "read policy applied to write frontier {:?}",
621 write_frontier.elements()
622 ),
623 };
624 self.apply_constraint(*id, constraint);
625 }
626 }
627
628 self.propagate_bounds_upstream(BoundType::Upper);
630 }
631
632 fn apply_index_current_time_constraints(&self) {
639 let upper = Antichain::from_elem(self.current_time.clone());
641 for (id, collection) in &self.collections {
642 if collection.is_index {
643 let constraint = Constraint {
644 type_: ConstraintType::Soft,
645 bound_type: BoundType::Upper,
646 frontier: &upper,
647 reason: "index current time",
648 };
649 self.apply_constraint(*id, constraint);
650 }
651 }
652
653 self.propagate_bounds_upstream(BoundType::Upper);
655 }
656
657 fn propagate_bounds_downstream(&self, bound_type: BoundType) {
659 let constraint_type = match bound_type {
662 BoundType::Lower => ConstraintType::Hard,
663 BoundType::Upper => ConstraintType::Soft,
664 };
665
666 fixpoint(|changed| {
669 self.propagate_bounds_downstream_inner(bound_type, constraint_type, changed);
670
671 if bound_type == BoundType::Upper {
674 self.propagate_bounds_upstream_inner(
675 BoundType::Upper,
676 ConstraintType::Hard,
677 changed,
678 );
679 }
680 });
681 }
682
683 fn propagate_bounds_downstream_inner(
684 &self,
685 bound_type: BoundType,
686 constraint_type: ConstraintType,
687 changed: &mut bool,
688 ) {
689 for (id, collection) in &self.collections {
690 for input_id in &collection.compute_inputs {
691 let input_collection = self.expect_collection(*input_id);
692 let bounds = input_collection.bounds.borrow();
693 let constraint = Constraint {
694 type_: constraint_type,
695 bound_type,
696 frontier: bounds.get(bound_type),
697 reason: &format!("upstream {input_id} {bound_type} as-of bound"),
698 };
699 *changed |= self.apply_constraint(*id, constraint);
700 }
701 }
702 }
703
704 fn propagate_bounds_upstream(&self, bound_type: BoundType) {
706 let constraint_type = match bound_type {
709 BoundType::Lower => ConstraintType::Soft,
710 BoundType::Upper => ConstraintType::Hard,
711 };
712
713 fixpoint(|changed| {
716 self.propagate_bounds_upstream_inner(bound_type, constraint_type, changed);
717
718 if bound_type == BoundType::Lower {
721 self.propagate_bounds_downstream_inner(
722 BoundType::Lower,
723 ConstraintType::Hard,
724 changed,
725 );
726 }
727 });
728 }
729
730 fn propagate_bounds_upstream_inner(
731 &self,
732 bound_type: BoundType,
733 constraint_type: ConstraintType,
734 changed: &mut bool,
735 ) {
736 for (id, collection) in self.collections.iter().rev() {
737 let bounds = collection.bounds.borrow();
738 for input_id in &collection.compute_inputs {
739 let constraint = Constraint {
740 type_: constraint_type,
741 bound_type,
742 frontier: bounds.get(bound_type),
743 reason: &format!("downstream {id} {bound_type} as-of bound"),
744 };
745 *changed |= self.apply_constraint(*input_id, constraint);
746 }
747 }
748 }
749
750 fn best_as_of(&self, id: GlobalId) -> Antichain<T> {
757 if let Some(collection) = self.collections.get(&id) {
758 let bounds = collection.bounds.borrow();
759 bounds.upper.clone()
760 } else {
761 Antichain::new()
762 }
763 }
764
765 fn prune_sealed_persist_sinks(&mut self) {
773 self.collections.retain(|id, _| {
774 self.storage_collections
775 .collection_frontiers(*id)
776 .map_or(true, |f| !f.write_frontier.is_empty())
777 });
778 }
779
780 fn prune_dropped_collections(&mut self) {
787 let mut pruned = BTreeSet::new();
789 self.collections.retain(|id, c| {
790 let input_dropped = c.storage_inputs.iter().any(|id| {
791 let frontiers = self
792 .storage_collections
793 .collection_frontiers(*id)
794 .expect("storage collection exists");
795 frontiers.read_capabilities.is_empty()
796 });
797
798 if input_dropped {
799 pruned.insert(*id);
800 false
801 } else {
802 true
803 }
804 });
805
806 warn!(?pruned, "pruned dependants of dropped storage collections");
807
808 while !pruned.is_empty() {
810 let pruned_inputs = std::mem::take(&mut pruned);
811
812 self.collections.retain(|id, c| {
813 if c.compute_inputs.iter().any(|id| pruned_inputs.contains(id)) {
814 pruned.insert(*id);
815 false
816 } else {
817 true
818 }
819 });
820
821 warn!(?pruned, "pruned collections with pruned inputs");
822 }
823 }
824}
825
826fn fixpoint(mut step: impl FnMut(&mut bool)) {
828 loop {
829 let mut changed = false;
830 step(&mut changed);
831 if !changed {
832 break;
833 }
834 }
835}
836
837fn step_back_frontier<T: TimestampManipulation>(frontier: &Antichain<T>) -> Antichain<T> {
842 frontier
843 .iter()
844 .map(|t| t.step_back().unwrap_or_else(T::minimum))
845 .collect()
846}
847
848#[cfg(test)]
849mod tests {
850 use std::collections::BTreeSet;
851
852 use async_trait::async_trait;
853 use differential_dataflow::lattice::Lattice;
854 use futures::future::BoxFuture;
855 use futures::stream::BoxStream;
856 use mz_compute_types::dataflows::{IndexDesc, IndexImport};
857 use mz_compute_types::sinks::ComputeSinkConnection;
858 use mz_compute_types::sinks::ComputeSinkDesc;
859 use mz_compute_types::sinks::MaterializedViewSinkConnection;
860 use mz_compute_types::sources::SourceInstanceArguments;
861 use mz_compute_types::sources::SourceInstanceDesc;
862 use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
863 use mz_persist_types::{Codec64, ShardId};
864 use mz_repr::Timestamp;
865 use mz_repr::{RelationDesc, RelationVersion, Row, SqlRelationType};
866 use mz_storage_client::client::TimestamplessUpdateBuilder;
867 use mz_storage_client::controller::{CollectionDescription, StorageMetadata, StorageTxn};
868 use mz_storage_client::storage_collections::{CollectionFrontiers, SnapshotCursor};
869 use mz_storage_types::StorageDiff;
870 use mz_storage_types::connections::inline::InlinedConnection;
871 use mz_storage_types::controller::{CollectionMetadata, StorageError};
872 use mz_storage_types::parameters::StorageParameters;
873 use mz_storage_types::read_holds::ReadHoldError;
874 use mz_storage_types::sources::{GenericSourceConnection, SourceDesc};
875 use mz_storage_types::sources::{SourceData, SourceExportDataConfig};
876 use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
877 use timely::progress::Timestamp as TimelyTimestamp;
878
879 use super::*;
880
881 const SEALED: u64 = 0x5ea1ed;
882
883 fn ts_to_frontier(ts: u64) -> Antichain<Timestamp> {
884 if ts == SEALED {
885 Antichain::new()
886 } else {
887 Antichain::from_elem(ts.into())
888 }
889 }
890
891 #[derive(Debug)]
892 struct StorageFrontiers(BTreeMap<GlobalId, (Antichain<Timestamp>, Antichain<Timestamp>)>);
893
894 #[async_trait]
895 impl StorageCollections for StorageFrontiers {
896 type Timestamp = Timestamp;
897
898 async fn initialize_state(
899 &self,
900 _txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
901 _init_ids: BTreeSet<GlobalId>,
902 ) -> Result<(), StorageError<Self::Timestamp>> {
903 unimplemented!()
904 }
905
906 fn update_parameters(&self, _config_params: StorageParameters) {
907 unimplemented!()
908 }
909
910 fn collection_metadata(
911 &self,
912 _id: GlobalId,
913 ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>> {
914 unimplemented!()
915 }
916
917 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
918 unimplemented!()
919 }
920
921 fn collections_frontiers(
922 &self,
923 ids: Vec<GlobalId>,
924 ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>>
925 {
926 let mut frontiers = Vec::with_capacity(ids.len());
927 for id in ids {
928 let (read, write) = self.0.get(&id).ok_or(StorageError::IdentifierMissing(id))?;
929 frontiers.push(CollectionFrontiers {
930 id,
931 write_frontier: write.clone(),
932 implied_capability: read.clone(),
933 read_capabilities: read.clone(),
934 })
935 }
936 Ok(frontiers)
937 }
938
939 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
940 unimplemented!()
941 }
942
943 fn check_exists(&self, _id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
944 unimplemented!()
945 }
946
947 async fn snapshot_stats(
948 &self,
949 _id: GlobalId,
950 _as_of: Antichain<Self::Timestamp>,
951 ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
952 unimplemented!()
953 }
954
955 async fn snapshot_parts_stats(
956 &self,
957 _id: GlobalId,
958 _as_of: Antichain<Self::Timestamp>,
959 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
960 unimplemented!()
961 }
962
963 fn snapshot(
964 &self,
965 _id: GlobalId,
966 _as_of: Self::Timestamp,
967 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>
968 {
969 unimplemented!()
970 }
971
972 async fn snapshot_latest(
973 &self,
974 _id: GlobalId,
975 ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
976 unimplemented!()
977 }
978
979 fn snapshot_cursor(
980 &self,
981 _id: GlobalId,
982 _as_of: Self::Timestamp,
983 ) -> BoxFuture<
984 'static,
985 Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>,
986 >
987 where
988 Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
989 {
990 unimplemented!()
991 }
992
993 fn snapshot_and_stream(
994 &self,
995 _id: GlobalId,
996 _as_of: Self::Timestamp,
997 ) -> BoxFuture<
998 'static,
999 Result<
1000 BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
1001 StorageError<Self::Timestamp>,
1002 >,
1003 > {
1004 unimplemented!()
1005 }
1006
1007 fn create_update_builder(
1010 &self,
1011 _id: GlobalId,
1012 ) -> BoxFuture<
1013 'static,
1014 Result<
1015 TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
1016 StorageError<Self::Timestamp>,
1017 >,
1018 >
1019 where
1020 Self::Timestamp: Lattice + Codec64,
1021 {
1022 unimplemented!()
1023 }
1024
1025 async fn prepare_state(
1026 &self,
1027 _txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
1028 _ids_to_add: BTreeSet<GlobalId>,
1029 _ids_to_drop: BTreeSet<GlobalId>,
1030 _ids_to_register: BTreeMap<GlobalId, ShardId>,
1031 ) -> Result<(), StorageError<Self::Timestamp>> {
1032 unimplemented!()
1033 }
1034
1035 async fn create_collections_for_bootstrap(
1036 &self,
1037 _storage_metadata: &StorageMetadata,
1038 _register_ts: Option<Self::Timestamp>,
1039 _collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
1040 _migrated_storage_collections: &BTreeSet<GlobalId>,
1041 ) -> Result<(), StorageError<Self::Timestamp>> {
1042 unimplemented!()
1043 }
1044
1045 async fn alter_ingestion_source_desc(
1046 &self,
1047 _ingestion_id: GlobalId,
1048 _source_desc: SourceDesc,
1049 ) -> Result<(), StorageError<Self::Timestamp>> {
1050 unimplemented!()
1051 }
1052
1053 async fn alter_ingestion_export_data_configs(
1054 &self,
1055 _source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
1056 ) -> Result<(), StorageError<Self::Timestamp>> {
1057 unimplemented!()
1058 }
1059
1060 async fn alter_ingestion_connections(
1061 &self,
1062 _source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
1063 ) -> Result<(), StorageError<Self::Timestamp>> {
1064 unimplemented!()
1065 }
1066
1067 async fn alter_table_desc(
1068 &self,
1069 _existing_collection: GlobalId,
1070 _new_collection: GlobalId,
1071 _new_desc: RelationDesc,
1072 _expected_version: RelationVersion,
1073 ) -> Result<(), StorageError<Self::Timestamp>> {
1074 unimplemented!()
1075 }
1076
1077 fn drop_collections_unvalidated(
1078 &self,
1079 _storage_metadata: &StorageMetadata,
1080 _identifiers: Vec<GlobalId>,
1081 ) {
1082 unimplemented!()
1083 }
1084
1085 fn set_read_policies(&self, _policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
1086 unimplemented!()
1087 }
1088
1089 fn acquire_read_holds(
1090 &self,
1091 desired_holds: Vec<GlobalId>,
1092 ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError> {
1093 let mut holds = Vec::with_capacity(desired_holds.len());
1094 for id in desired_holds {
1095 let (read, _write) = self
1096 .0
1097 .get(&id)
1098 .ok_or(ReadHoldError::CollectionMissing(id))?;
1099 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
1100 holds.push(ReadHold::with_channel(id, read.clone(), tx));
1101 }
1102 Ok(holds)
1103 }
1104
1105 fn determine_time_dependence(
1106 &self,
1107 _id: GlobalId,
1108 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
1109 unimplemented!()
1110 }
1111 }
1112
1113 fn dataflow(
1114 export_id: &str,
1115 input_ids: &[&str],
1116 storage_ids: &BTreeSet<&str>,
1117 ) -> DataflowDescription<Plan> {
1118 let source_imports = input_ids
1119 .iter()
1120 .filter(|s| storage_ids.contains(*s))
1121 .map(|s| {
1122 let id = s.parse().unwrap();
1123 let desc = SourceInstanceDesc {
1124 arguments: SourceInstanceArguments {
1125 operators: Default::default(),
1126 },
1127 storage_metadata: Default::default(),
1128 typ: SqlRelationType::empty(),
1129 };
1130 (id, (desc, Default::default(), Default::default()))
1131 })
1132 .collect();
1133 let index_imports = input_ids
1134 .iter()
1135 .filter(|s| !storage_ids.contains(*s))
1136 .map(|s| {
1137 let id = s.parse().unwrap();
1138 let import = IndexImport {
1139 desc: IndexDesc {
1140 on_id: GlobalId::Transient(0),
1141 key: Default::default(),
1142 },
1143 typ: SqlRelationType::empty(),
1144 monotonic: Default::default(),
1145 };
1146 (id, import)
1147 })
1148 .collect();
1149 let index_exports = std::iter::once(export_id)
1150 .filter(|s| !storage_ids.contains(*s))
1151 .map(|sid| {
1152 let id = sid.parse().unwrap();
1153 let desc = IndexDesc {
1154 on_id: GlobalId::Transient(0),
1155 key: Default::default(),
1156 };
1157 let typ = SqlRelationType::empty();
1158 (id, (desc, typ))
1159 })
1160 .collect();
1161 let sink_exports = std::iter::once(export_id)
1162 .filter(|s| storage_ids.contains(*s))
1163 .map(|sid| {
1164 let id = sid.parse().unwrap();
1165 let desc = ComputeSinkDesc {
1166 from: GlobalId::Transient(0),
1167 from_desc: RelationDesc::empty(),
1168 connection: ComputeSinkConnection::MaterializedView(
1169 MaterializedViewSinkConnection {
1170 value_desc: RelationDesc::empty(),
1171 storage_metadata: Default::default(),
1172 },
1173 ),
1174 with_snapshot: Default::default(),
1175 up_to: Default::default(),
1176 non_null_assertions: Default::default(),
1177 refresh_schedule: Default::default(),
1178 };
1179 (id, desc)
1180 })
1181 .collect();
1182
1183 DataflowDescription {
1184 source_imports,
1185 index_imports,
1186 objects_to_build: Default::default(),
1187 index_exports,
1188 sink_exports,
1189 as_of: None,
1190 until: Default::default(),
1191 initial_storage_as_of: Default::default(),
1192 refresh_schedule: Default::default(),
1193 debug_name: Default::default(),
1194 time_dependence: None,
1195 }
1196 }
1197
1198 macro_rules! testcase {
1199 ($name:ident, {
1200 storage: { $( $storage_id:literal: ($read:expr, $write:expr), )* },
1201 dataflows: [ $( $export_id:literal <- $inputs:expr => $as_of:expr, )* ],
1202 current_time: $current_time:literal,
1203 $( read_policies: { $( $policy_id:literal: $policy:expr, )* }, )?
1204 $( read_only: $read_only:expr, )?
1205 }) => {
1206 #[mz_ore::test]
1207 fn $name() {
1208 let storage_ids = [$( $storage_id, )*].into();
1209
1210 let storage_frontiers = StorageFrontiers(BTreeMap::from([
1211 $(
1212 (
1213 $storage_id.parse().unwrap(),
1214 (ts_to_frontier($read), ts_to_frontier($write)),
1215 ),
1216 )*
1217 ]));
1218
1219 let mut dataflows = [
1220 $(
1221 dataflow($export_id, &$inputs, &storage_ids),
1222 )*
1223 ];
1224
1225 let read_policies = BTreeMap::from([
1226 $($( ($policy_id.parse().unwrap(), $policy), )*)?
1227 ]);
1228
1229 #[allow(unused_variables)]
1230 let read_only = false;
1231 $( let read_only = $read_only; )?
1232
1233 super::run(
1234 &mut dataflows,
1235 &read_policies,
1236 &storage_frontiers,
1237 $current_time.into(),
1238 read_only,
1239 );
1240
1241 let actual_as_ofs: Vec<_> = dataflows
1242 .into_iter()
1243 .map(|d| d.as_of.unwrap())
1244 .collect();
1245 let expected_as_ofs = [ $( ts_to_frontier($as_of), )* ];
1246
1247 assert_eq!(actual_as_ofs, expected_as_ofs);
1248 }
1249 };
1250 }
1251
1252 testcase!(upstream_storage_constraints, {
1253 storage: {
1254 "s1": (10, 20),
1255 "s2": (20, 30),
1256 },
1257 dataflows: [
1258 "u1" <- ["s1"] => 10,
1259 "u2" <- ["s2"] => 20,
1260 "u3" <- ["s1", "s2"] => 20,
1261 "u4" <- ["u1", "u2"] => 20,
1262 ],
1263 current_time: 0,
1264 });
1265
1266 testcase!(downstream_storage_constraints, {
1267 storage: {
1268 "s1": (10, 20),
1269 "u3": (10, 15),
1270 "u4": (10, 13),
1271 },
1272 dataflows: [
1273 "u1" <- ["s1"] => 19,
1274 "u2" <- ["s1"] => 12,
1275 "u3" <- ["u2"] => 14,
1276 "u4" <- ["u2"] => 12,
1277 ],
1278 current_time: 100,
1279 });
1280
1281 testcase!(warmup_constraints, {
1282 storage: {
1283 "s1": (10, 20),
1284 "s2": (10, 30),
1285 "s3": (10, 40),
1286 "s4": (10, 50),
1287 },
1288 dataflows: [
1289 "u1" <- ["s1"] => 19,
1290 "u2" <- ["s2"] => 19,
1291 "u3" <- ["s3"] => 39,
1292 "u4" <- ["s4"] => 39,
1293 "u5" <- ["u1", "u2"] => 19,
1294 "u6" <- ["u3", "u4"] => 39,
1295 ],
1296 current_time: 100,
1297 });
1298
1299 testcase!(index_read_policy_constraints, {
1300 storage: {
1301 "s1": (10, 20),
1302 "u6": (10, 18),
1303 },
1304 dataflows: [
1305 "u1" <- ["s1"] => 15,
1306 "u2" <- ["s1"] => 10,
1307 "u3" <- ["s1"] => 13,
1308 "u4" <- ["s1"] => 10,
1309 "u5" <- [] => 95,
1310 "u6" <- ["s1"] => 17,
1311 ],
1312 current_time: 100,
1313 read_policies: {
1314 "u1": ReadPolicy::lag_writes_by(5.into(), 1.into()),
1315 "u2": ReadPolicy::lag_writes_by(15.into(), 1.into()),
1316 "u3": ReadPolicy::ValidFrom(Antichain::from_elem(13.into())),
1317 "u4": ReadPolicy::ValidFrom(Antichain::from_elem(5.into())),
1318 "u5": ReadPolicy::lag_writes_by(5.into(), 1.into()),
1319 "u6": ReadPolicy::ValidFrom(Antichain::from_elem(13.into())),
1320 },
1321 });
1322
1323 testcase!(index_current_time_constraints, {
1324 storage: {
1325 "s1": (10, 20),
1326 "s2": (20, 30),
1327 "u4": (10, 12),
1328 "u5": (10, 18),
1329 },
1330 dataflows: [
1331 "u1" <- ["s1"] => 15,
1332 "u2" <- ["s2"] => 20,
1333 "u3" <- ["s1"] => 11,
1334 "u4" <- ["u3"] => 11,
1335 "u5" <- ["s1"] => 17,
1336 "u6" <- [] => 15,
1337 ],
1338 current_time: 15,
1339 });
1340
1341 testcase!(sealed_storage_sink, {
1342 storage: {
1343 "s1": (10, 20),
1344 "u1": (10, SEALED),
1345 },
1346 dataflows: [
1347 "u1" <- ["s1"] => SEALED,
1348 ],
1349 current_time: 100,
1350 });
1351
1352 testcase!(read_only_dropped_storage_inputs, {
1353 storage: {
1354 "s1": (10, 20),
1355 "s2": (SEALED, SEALED),
1356 "u4": (10, 20),
1357 },
1358 dataflows: [
1359 "u1" <- ["s1"] => 15,
1360 "u2" <- ["s2"] => SEALED,
1361 "u3" <- ["s1", "s2"] => SEALED,
1362 "u4" <- ["u2"] => SEALED,
1363 ],
1364 current_time: 15,
1365 read_only: true,
1366 });
1367
1368 testcase!(github_9273, {
1370 storage: {
1371 "s1": (10, 20),
1372 "u3": (14, 15),
1373 },
1374 dataflows: [
1375 "u1" <- ["s1"] => 14,
1376 "u2" <- ["u1"] => 19,
1377 "u3" <- ["u1"] => 14,
1378 ],
1379 current_time: 100,
1380 read_policies: {
1381 "u1": ReadPolicy::lag_writes_by(1.into(), 1.into()),
1382 "u2": ReadPolicy::lag_writes_by(1.into(), 1.into()),
1383 },
1384 });
1385}