1use std::cell::RefCell;
80use std::collections::{BTreeMap, BTreeSet};
81use std::fmt;
82use std::rc::Rc;
83
84use mz_compute_types::dataflows::DataflowDescription;
85use mz_compute_types::plan::Plan;
86use mz_ore::collections::CollectionExt;
87use mz_ore::soft_panic_or_log;
88use mz_repr::{GlobalId, Timestamp};
89use mz_storage_client::storage_collections::StorageCollections;
90use mz_storage_types::read_holds::ReadHold;
91use mz_storage_types::read_policy::ReadPolicy;
92use timely::PartialOrder;
93use timely::progress::Antichain;
94use tracing::{info, warn};
95
96pub fn run(
102 dataflows: &mut [DataflowDescription<Plan, ()>],
103 read_policies: &BTreeMap<GlobalId, ReadPolicy>,
104 storage_collections: &dyn StorageCollections,
105 current_time: Timestamp,
106 read_only_mode: bool,
107) -> BTreeMap<GlobalId, ReadHold> {
108 let mut storage_read_holds = BTreeMap::new();
111 for dataflow in &*dataflows {
112 for id in dataflow.source_imports.keys() {
113 if !storage_read_holds.contains_key(id) {
114 let read_hold = storage_collections
115 .acquire_read_holds(vec![*id])
116 .expect("storage collection exists")
117 .into_element();
118 storage_read_holds.insert(*id, read_hold);
119 }
120 }
121 }
122
123 let mut ctx = Context::new(dataflows, storage_collections, read_policies, current_time);
124
125 ctx.prune_sealed_persist_sinks();
129
130 if read_only_mode {
135 ctx.prune_dropped_collections();
136 }
137
138 ctx.apply_upstream_storage_constraints(&storage_read_holds);
140 ctx.apply_downstream_storage_constraints();
141
142 ctx.apply_warmup_constraints();
158
159 ctx.apply_index_read_policy_constraints();
161
162 ctx.apply_index_current_time_constraints();
165
166 for dataflow in dataflows {
168 let first_export = dataflow.export_ids().next();
171 let as_of = first_export.map_or_else(Antichain::new, |id| ctx.best_as_of(id));
172 dataflow.as_of = Some(as_of);
173 }
174
175 storage_read_holds
176}
177
178#[derive(Debug)]
180struct AsOfBounds {
181 lower: Antichain<Timestamp>,
182 upper: Antichain<Timestamp>,
183 sealed: bool,
185}
186
187impl AsOfBounds {
188 fn single(frontier: Antichain<Timestamp>) -> Self {
190 Self {
191 lower: frontier.clone(),
192 upper: frontier,
193 sealed: false,
194 }
195 }
196
197 fn get(&self, type_: BoundType) -> &Antichain<Timestamp> {
199 match type_ {
200 BoundType::Lower => &self.lower,
201 BoundType::Upper => &self.upper,
202 }
203 }
204}
205
206impl Default for AsOfBounds {
207 fn default() -> Self {
208 Self {
209 lower: Antichain::from_elem(Timestamp::MIN),
210 upper: Antichain::new(),
211 sealed: false,
212 }
213 }
214}
215
216impl fmt::Display for AsOfBounds {
217 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218 write!(
219 f,
220 "[{:?} .. {:?}]",
221 self.lower.elements(),
222 self.upper.elements()
223 )
224 }
225}
226
227#[derive(Clone, Copy, Debug, PartialEq, Eq)]
229enum BoundType {
230 Lower,
231 Upper,
232}
233
234impl fmt::Display for BoundType {
235 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236 match self {
237 Self::Lower => f.write_str("lower"),
238 Self::Upper => f.write_str("upper"),
239 }
240 }
241}
242
243#[derive(Clone, Copy, Debug, PartialEq, Eq)]
245enum ConstraintType {
246 Hard,
249 Soft,
252}
253
254#[derive(Debug)]
256struct Constraint<'a> {
257 type_: ConstraintType,
258 bound_type: BoundType,
260 frontier: &'a Antichain<Timestamp>,
262 reason: &'a str,
266}
267
268impl Constraint<'_> {
269 fn apply(&self, bounds: &mut AsOfBounds) -> Result<bool, bool> {
279 if bounds.sealed {
280 return Ok(false);
281 }
282
283 match self.bound_type {
284 BoundType::Lower => {
285 if PartialOrder::less_than(&bounds.upper, self.frontier) {
286 bounds.sealed = true;
287 if PartialOrder::less_than(&bounds.lower, &bounds.upper) {
288 bounds.lower.clone_from(&bounds.upper);
289 Err(true)
290 } else {
291 Err(false)
292 }
293 } else if PartialOrder::less_equal(self.frontier, &bounds.lower) {
294 Ok(false)
295 } else {
296 bounds.lower.clone_from(self.frontier);
297 Ok(true)
298 }
299 }
300 BoundType::Upper => {
301 if PartialOrder::less_than(self.frontier, &bounds.lower) {
302 bounds.sealed = true;
303 if PartialOrder::less_than(&bounds.lower, &bounds.upper) {
304 bounds.upper.clone_from(&bounds.lower);
305 Err(true)
306 } else {
307 Err(false)
308 }
309 } else if PartialOrder::less_equal(&bounds.upper, self.frontier) {
310 Ok(false)
311 } else {
312 bounds.upper.clone_from(self.frontier);
313 Ok(true)
314 }
315 }
316 }
317 }
318}
319
320struct Collection<'a> {
322 storage_inputs: Vec<GlobalId>,
323 compute_inputs: Vec<GlobalId>,
324 read_policy: Option<&'a ReadPolicy>,
325 bounds: Rc<RefCell<AsOfBounds>>,
329 is_index: bool,
331}
332
333struct Context<'a> {
335 collections: BTreeMap<GlobalId, Collection<'a>>,
336 storage_collections: &'a dyn StorageCollections,
337 current_time: Timestamp,
338}
339
340impl<'a> Context<'a> {
341 fn new(
343 dataflows: &[DataflowDescription<Plan, ()>],
344 storage_collections: &'a dyn StorageCollections,
345 read_policies: &'a BTreeMap<GlobalId, ReadPolicy>,
346 current_time: Timestamp,
347 ) -> Self {
348 let mut collections = BTreeMap::new();
351 for dataflow in dataflows {
352 let storage_inputs: Vec<_> = dataflow.source_imports.keys().copied().collect();
353 let compute_inputs: Vec<_> = dataflow.index_imports.keys().copied().collect();
354
355 let bounds = match dataflow.as_of.clone() {
356 Some(frontier) => AsOfBounds::single(frontier),
357 None => AsOfBounds::default(),
358 };
359 let bounds = Rc::new(RefCell::new(bounds));
360
361 for id in dataflow.export_ids() {
362 let collection = Collection {
363 storage_inputs: storage_inputs.clone(),
364 compute_inputs: compute_inputs.clone(),
365 read_policy: read_policies.get(&id),
366 bounds: Rc::clone(&bounds),
367 is_index: dataflow.index_exports.contains_key(&id),
368 };
369 collections.insert(id, collection);
370 }
371 }
372
373 Self {
374 collections,
375 storage_collections,
376 current_time,
377 }
378 }
379
380 fn expect_collection(&self, id: GlobalId) -> &Collection<'_> {
386 self.collections
387 .get(&id)
388 .unwrap_or_else(|| panic!("collection missing: {id}"))
389 }
390
391 fn apply_constraint(&self, id: GlobalId, constraint: Constraint) -> bool {
395 let collection = self.expect_collection(id);
396 let mut bounds = collection.bounds.borrow_mut();
397 match constraint.apply(&mut bounds) {
398 Ok(changed) => {
399 if changed {
400 info!(%id, %bounds, reason = %constraint.reason, "applied as-of constraint");
401 }
402 changed
403 }
404 Err(changed) => {
405 match constraint.type_ {
406 ConstraintType::Hard => {
407 soft_panic_or_log!(
408 "failed to apply hard as-of constraint \
409 (id={id}, bounds={bounds}, constraint={constraint:?})"
410 );
411 }
412 ConstraintType::Soft => {
413 info!(%id, %bounds, ?constraint, "failed to apply soft as-of constraint");
414 }
415 }
416 changed
417 }
418 }
419 }
420
421 fn apply_upstream_storage_constraints(
429 &self,
430 storage_read_holds: &BTreeMap<GlobalId, ReadHold>,
431 ) {
432 for (id, collection) in &self.collections {
434 for input_id in &collection.storage_inputs {
435 let read_hold = &storage_read_holds[input_id];
436 let constraint = Constraint {
437 type_: ConstraintType::Hard,
438 bound_type: BoundType::Lower,
439 frontier: read_hold.since(),
440 reason: &format!("storage input {input_id} read frontier"),
441 };
442 self.apply_constraint(*id, constraint);
443 }
444 }
445
446 self.propagate_bounds_downstream(BoundType::Lower);
448 }
449
450 fn apply_downstream_storage_constraints(&self) {
476 for id in self.collections.keys() {
478 let Ok(frontiers) = self.storage_collections.collection_frontiers(*id) else {
479 continue;
480 };
481
482 let collection_empty =
483 PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
484 let upper = if collection_empty {
485 frontiers.read_capabilities
486 } else {
487 step_back_frontier(&frontiers.write_frontier)
488 };
489
490 let constraint = Constraint {
491 type_: ConstraintType::Hard,
492 bound_type: BoundType::Upper,
493 frontier: &upper,
494 reason: &format!("storage export {id} write frontier"),
495 };
496 self.apply_constraint(*id, constraint);
497 }
498
499 self.propagate_bounds_upstream(BoundType::Upper);
501 }
502
503 fn apply_warmup_constraints(&self) {
512 let mut write_frontiers = BTreeMap::new();
514 for (id, collection) in &self.collections {
515 let storage_frontiers = self
516 .storage_collections
517 .collections_frontiers(collection.storage_inputs.clone())
518 .expect("storage collections exist");
519
520 let mut write_frontier = Antichain::new();
521 for frontiers in storage_frontiers {
522 write_frontier.extend(frontiers.write_frontier);
523 }
524
525 write_frontiers.insert(*id, write_frontier);
526 }
527
528 fixpoint(|changed| {
530 for (id, collection) in &self.collections {
531 let mut write_frontier = write_frontiers.remove(id).expect("inserted above");
532 for input_id in &collection.compute_inputs {
533 let frontier = &write_frontiers[input_id];
534 *changed |= write_frontier.extend(frontier.iter().cloned());
535 }
536 write_frontiers.insert(*id, write_frontier);
537 }
538 });
539
540 for (id, write_frontier) in write_frontiers {
542 let upper = step_back_frontier(&write_frontier);
543 let constraint = Constraint {
544 type_: ConstraintType::Soft,
545 bound_type: BoundType::Upper,
546 frontier: &upper,
547 reason: &format!(
548 "warmup frontier derived from storage write frontier {:?}",
549 write_frontier.elements()
550 ),
551 };
552 self.apply_constraint(id, constraint);
553 }
554
555 self.propagate_bounds_upstream(BoundType::Upper);
557 }
558
559 fn apply_index_read_policy_constraints(&self) {
569 let mut write_frontiers = BTreeMap::new();
576 for (id, collection) in &self.collections {
577 let storage_frontiers = self
578 .storage_collections
579 .collections_frontiers(collection.storage_inputs.clone())
580 .expect("storage collections exist");
581
582 let mut write_frontier = Antichain::new();
583 for frontiers in storage_frontiers {
584 write_frontier.extend(frontiers.write_frontier);
585 }
586
587 write_frontiers.insert(*id, write_frontier);
588 }
589
590 fixpoint(|changed| {
592 for (id, collection) in &self.collections {
593 let mut write_frontier = write_frontiers.remove(id).expect("inserted above");
594 for input_id in &collection.compute_inputs {
595 let frontier = &write_frontiers[input_id];
596 *changed |= write_frontier.extend(frontier.iter().cloned());
597 }
598 write_frontiers.insert(*id, write_frontier);
599 }
600 });
601
602 for (id, collection) in &self.collections {
604 if let (true, Some(read_policy)) = (collection.is_index, &collection.read_policy) {
605 let mut write_frontier = write_frontiers.remove(id).expect("inserted above");
606 if write_frontier.is_empty() {
607 write_frontier = Antichain::from_elem(self.current_time.clone());
608 }
609 let upper = read_policy.frontier(write_frontier.borrow());
610 let constraint = Constraint {
611 type_: ConstraintType::Soft,
612 bound_type: BoundType::Upper,
613 frontier: &upper,
614 reason: &format!(
615 "read policy applied to write frontier {:?}",
616 write_frontier.elements()
617 ),
618 };
619 self.apply_constraint(*id, constraint);
620 }
621 }
622
623 self.propagate_bounds_upstream(BoundType::Upper);
625 }
626
627 fn apply_index_current_time_constraints(&self) {
634 let upper = Antichain::from_elem(self.current_time.clone());
636 for (id, collection) in &self.collections {
637 if collection.is_index {
638 let constraint = Constraint {
639 type_: ConstraintType::Soft,
640 bound_type: BoundType::Upper,
641 frontier: &upper,
642 reason: "index current time",
643 };
644 self.apply_constraint(*id, constraint);
645 }
646 }
647
648 self.propagate_bounds_upstream(BoundType::Upper);
650 }
651
652 fn propagate_bounds_downstream(&self, bound_type: BoundType) {
654 let constraint_type = match bound_type {
657 BoundType::Lower => ConstraintType::Hard,
658 BoundType::Upper => ConstraintType::Soft,
659 };
660
661 fixpoint(|changed| {
664 self.propagate_bounds_downstream_inner(bound_type, constraint_type, changed);
665
666 if bound_type == BoundType::Upper {
669 self.propagate_bounds_upstream_inner(
670 BoundType::Upper,
671 ConstraintType::Hard,
672 changed,
673 );
674 }
675 });
676 }
677
678 fn propagate_bounds_downstream_inner(
679 &self,
680 bound_type: BoundType,
681 constraint_type: ConstraintType,
682 changed: &mut bool,
683 ) {
684 for (id, collection) in &self.collections {
685 for input_id in &collection.compute_inputs {
686 let input_collection = self.expect_collection(*input_id);
687 let bounds = input_collection.bounds.borrow();
688 let constraint = Constraint {
689 type_: constraint_type,
690 bound_type,
691 frontier: bounds.get(bound_type),
692 reason: &format!("upstream {input_id} {bound_type} as-of bound"),
693 };
694 *changed |= self.apply_constraint(*id, constraint);
695 }
696 }
697 }
698
699 fn propagate_bounds_upstream(&self, bound_type: BoundType) {
701 let constraint_type = match bound_type {
704 BoundType::Lower => ConstraintType::Soft,
705 BoundType::Upper => ConstraintType::Hard,
706 };
707
708 fixpoint(|changed| {
711 self.propagate_bounds_upstream_inner(bound_type, constraint_type, changed);
712
713 if bound_type == BoundType::Lower {
716 self.propagate_bounds_downstream_inner(
717 BoundType::Lower,
718 ConstraintType::Hard,
719 changed,
720 );
721 }
722 });
723 }
724
725 fn propagate_bounds_upstream_inner(
726 &self,
727 bound_type: BoundType,
728 constraint_type: ConstraintType,
729 changed: &mut bool,
730 ) {
731 for (id, collection) in self.collections.iter().rev() {
732 let bounds = collection.bounds.borrow();
733 for input_id in &collection.compute_inputs {
734 let constraint = Constraint {
735 type_: constraint_type,
736 bound_type,
737 frontier: bounds.get(bound_type),
738 reason: &format!("downstream {id} {bound_type} as-of bound"),
739 };
740 *changed |= self.apply_constraint(*input_id, constraint);
741 }
742 }
743 }
744
745 fn best_as_of(&self, id: GlobalId) -> Antichain<Timestamp> {
752 if let Some(collection) = self.collections.get(&id) {
753 let bounds = collection.bounds.borrow();
754 bounds.upper.clone()
755 } else {
756 Antichain::new()
757 }
758 }
759
760 fn prune_sealed_persist_sinks(&mut self) {
768 self.collections.retain(|id, _| {
769 self.storage_collections
770 .collection_frontiers(*id)
771 .map_or(true, |f| !f.write_frontier.is_empty())
772 });
773 }
774
775 fn prune_dropped_collections(&mut self) {
782 let mut pruned = BTreeSet::new();
784 self.collections.retain(|id, c| {
785 let input_dropped = c.storage_inputs.iter().any(|id| {
786 let frontiers = self
787 .storage_collections
788 .collection_frontiers(*id)
789 .expect("storage collection exists");
790 frontiers.read_capabilities.is_empty()
791 });
792
793 if input_dropped {
794 pruned.insert(*id);
795 false
796 } else {
797 true
798 }
799 });
800
801 warn!(?pruned, "pruned dependants of dropped storage collections");
802
803 while !pruned.is_empty() {
805 let pruned_inputs = std::mem::take(&mut pruned);
806
807 self.collections.retain(|id, c| {
808 if c.compute_inputs.iter().any(|id| pruned_inputs.contains(id)) {
809 pruned.insert(*id);
810 false
811 } else {
812 true
813 }
814 });
815
816 warn!(?pruned, "pruned collections with pruned inputs");
817 }
818 }
819}
820
821fn fixpoint(mut step: impl FnMut(&mut bool)) {
823 loop {
824 let mut changed = false;
825 step(&mut changed);
826 if !changed {
827 break;
828 }
829 }
830}
831
832fn step_back_frontier(frontier: &Antichain<Timestamp>) -> Antichain<Timestamp> {
837 frontier
838 .iter()
839 .map(|t| t.step_back().unwrap_or(Timestamp::MIN))
840 .collect()
841}
842
843#[cfg(test)]
844mod tests {
845 use std::collections::BTreeSet;
846
847 use async_trait::async_trait;
848 use futures::future::BoxFuture;
849 use futures::stream::BoxStream;
850 use mz_compute_types::dataflows::{IndexDesc, IndexImport};
851 use mz_compute_types::sinks::ComputeSinkConnection;
852 use mz_compute_types::sinks::ComputeSinkDesc;
853 use mz_compute_types::sinks::MaterializedViewSinkConnection;
854 use mz_compute_types::sources::SourceInstanceArguments;
855 use mz_compute_types::sources::SourceInstanceDesc;
856 use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
857 use mz_persist_types::ShardId;
858 use mz_repr::{RelationDesc, RelationVersion, Row, SqlRelationType};
859 use mz_repr::{ReprRelationType, Timestamp};
860 use mz_storage_client::client::TimestamplessUpdateBuilder;
861 use mz_storage_client::controller::{CollectionDescription, StorageMetadata, StorageTxn};
862 use mz_storage_client::storage_collections::{CollectionFrontiers, SnapshotCursor};
863 use mz_storage_types::StorageDiff;
864 use mz_storage_types::controller::{CollectionMetadata, StorageError};
865 use mz_storage_types::errors::CollectionMissing;
866 use mz_storage_types::parameters::StorageParameters;
867 use mz_storage_types::sources::SourceData;
868 use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
869
870 use super::*;
871
872 const SEALED: u64 = 0x5ea1ed;
873
874 fn ts_to_frontier(ts: u64) -> Antichain<Timestamp> {
875 if ts == SEALED {
876 Antichain::new()
877 } else {
878 Antichain::from_elem(ts.into())
879 }
880 }
881
882 #[derive(Debug)]
883 struct StorageFrontiers(BTreeMap<GlobalId, (Antichain<Timestamp>, Antichain<Timestamp>)>);
884
885 #[async_trait]
886 impl StorageCollections for StorageFrontiers {
887 async fn initialize_state(
888 &self,
889 _txn: &mut (dyn StorageTxn + Send),
890 _init_ids: BTreeSet<GlobalId>,
891 ) -> Result<(), StorageError> {
892 unimplemented!()
893 }
894
895 fn update_parameters(&self, _config_params: StorageParameters) {
896 unimplemented!()
897 }
898
899 fn collection_metadata(
900 &self,
901 _id: GlobalId,
902 ) -> Result<CollectionMetadata, CollectionMissing> {
903 unimplemented!()
904 }
905
906 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
907 unimplemented!()
908 }
909
910 fn collections_frontiers(
911 &self,
912 ids: Vec<GlobalId>,
913 ) -> Result<Vec<CollectionFrontiers>, CollectionMissing> {
914 let mut frontiers = Vec::with_capacity(ids.len());
915 for id in ids {
916 let (read, write) = self.0.get(&id).ok_or(CollectionMissing(id))?;
917 frontiers.push(CollectionFrontiers {
918 id,
919 write_frontier: write.clone(),
920 implied_capability: read.clone(),
921 read_capabilities: read.clone(),
922 })
923 }
924 Ok(frontiers)
925 }
926
927 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers> {
928 unimplemented!()
929 }
930
931 fn check_exists(&self, _id: GlobalId) -> Result<(), StorageError> {
932 unimplemented!()
933 }
934
935 async fn snapshot_stats(
936 &self,
937 _id: GlobalId,
938 _as_of: Antichain<Timestamp>,
939 ) -> Result<SnapshotStats, StorageError> {
940 unimplemented!()
941 }
942
943 async fn snapshot_parts_stats(
944 &self,
945 _id: GlobalId,
946 _as_of: Antichain<Timestamp>,
947 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError>> {
948 unimplemented!()
949 }
950
951 fn snapshot(
952 &self,
953 _id: GlobalId,
954 _as_of: Timestamp,
955 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError>> {
956 unimplemented!()
957 }
958
959 async fn snapshot_latest(&self, _id: GlobalId) -> Result<Vec<Row>, StorageError> {
960 unimplemented!()
961 }
962
963 fn snapshot_cursor(
964 &self,
965 _id: GlobalId,
966 _as_of: Timestamp,
967 ) -> BoxFuture<'static, Result<SnapshotCursor, StorageError>> {
968 unimplemented!()
969 }
970
971 fn snapshot_and_stream(
972 &self,
973 _id: GlobalId,
974 _as_of: Timestamp,
975 ) -> BoxFuture<
976 'static,
977 Result<BoxStream<'static, (SourceData, Timestamp, StorageDiff)>, StorageError>,
978 > {
979 unimplemented!()
980 }
981
982 fn create_update_builder(
983 &self,
984 _id: GlobalId,
985 ) -> BoxFuture<
986 'static,
987 Result<TimestamplessUpdateBuilder<SourceData, (), StorageDiff>, StorageError>,
988 > {
989 unimplemented!()
990 }
991
992 async fn prepare_state(
993 &self,
994 _txn: &mut (dyn StorageTxn + Send),
995 _ids_to_add: BTreeSet<GlobalId>,
996 _ids_to_drop: BTreeSet<GlobalId>,
997 _ids_to_register: BTreeMap<GlobalId, ShardId>,
998 ) -> Result<(), StorageError> {
999 unimplemented!()
1000 }
1001
1002 async fn create_collections_for_bootstrap(
1003 &self,
1004 _storage_metadata: &StorageMetadata,
1005 _register_ts: Option<Timestamp>,
1006 _collections: Vec<(GlobalId, CollectionDescription)>,
1007 _migrated_storage_collections: &BTreeSet<GlobalId>,
1008 ) -> Result<(), StorageError> {
1009 unimplemented!()
1010 }
1011
1012 async fn alter_table_desc(
1013 &self,
1014 _existing_collection: GlobalId,
1015 _new_collection: GlobalId,
1016 _new_desc: RelationDesc,
1017 _expected_version: RelationVersion,
1018 ) -> Result<(), StorageError> {
1019 unimplemented!()
1020 }
1021
1022 fn drop_collections_unvalidated(
1023 &self,
1024 _storage_metadata: &StorageMetadata,
1025 _identifiers: Vec<GlobalId>,
1026 ) {
1027 unimplemented!()
1028 }
1029
1030 fn set_read_policies(&self, _policies: Vec<(GlobalId, ReadPolicy)>) {
1031 unimplemented!()
1032 }
1033
1034 fn acquire_read_holds(
1035 &self,
1036 desired_holds: Vec<GlobalId>,
1037 ) -> Result<Vec<ReadHold>, CollectionMissing> {
1038 let mut holds = Vec::with_capacity(desired_holds.len());
1039 for id in desired_holds {
1040 let (read, _write) = self.0.get(&id).ok_or(CollectionMissing(id))?;
1041 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
1042 holds.push(ReadHold::with_channel(id, read.clone(), tx));
1043 }
1044 Ok(holds)
1045 }
1046
1047 fn determine_time_dependence(
1048 &self,
1049 _id: GlobalId,
1050 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
1051 unimplemented!()
1052 }
1053
1054 fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
1055 unimplemented!()
1056 }
1057 }
1058
1059 fn dataflow(
1060 export_id: &str,
1061 input_ids: &[&str],
1062 storage_ids: &BTreeSet<&str>,
1063 ) -> DataflowDescription<Plan> {
1064 let source_imports = input_ids
1065 .iter()
1066 .filter(|s| storage_ids.contains(*s))
1067 .map(|s| {
1068 let id = s.parse().unwrap();
1069 let desc = SourceInstanceDesc {
1070 arguments: SourceInstanceArguments {
1071 operators: Default::default(),
1072 },
1073 storage_metadata: Default::default(),
1074 typ: SqlRelationType::empty(),
1075 };
1076 (
1077 id,
1078 mz_compute_types::dataflows::SourceImport {
1079 desc,
1080 monotonic: Default::default(),
1081 with_snapshot: true,
1082 upper: Default::default(),
1083 },
1084 )
1085 })
1086 .collect();
1087 let index_imports = input_ids
1088 .iter()
1089 .filter(|s| !storage_ids.contains(*s))
1090 .map(|s| {
1091 let id = s.parse().unwrap();
1092 let import = IndexImport {
1093 desc: IndexDesc {
1094 on_id: GlobalId::Transient(0),
1095 key: Default::default(),
1096 },
1097 typ: ReprRelationType::empty(),
1098 monotonic: Default::default(),
1099 with_snapshot: true,
1100 };
1101 (id, import)
1102 })
1103 .collect();
1104 let index_exports = std::iter::once(export_id)
1105 .filter(|s| !storage_ids.contains(*s))
1106 .map(|sid| {
1107 let id = sid.parse().unwrap();
1108 let desc = IndexDesc {
1109 on_id: GlobalId::Transient(0),
1110 key: Default::default(),
1111 };
1112 let typ = ReprRelationType::empty();
1113 (id, (desc, typ))
1114 })
1115 .collect();
1116 let sink_exports = std::iter::once(export_id)
1117 .filter(|s| storage_ids.contains(*s))
1118 .map(|sid| {
1119 let id = sid.parse().unwrap();
1120 let desc = ComputeSinkDesc {
1121 from: GlobalId::Transient(0),
1122 from_desc: RelationDesc::empty(),
1123 connection: ComputeSinkConnection::MaterializedView(
1124 MaterializedViewSinkConnection {
1125 value_desc: RelationDesc::empty(),
1126 storage_metadata: Default::default(),
1127 },
1128 ),
1129 with_snapshot: Default::default(),
1130 up_to: Default::default(),
1131 non_null_assertions: Default::default(),
1132 refresh_schedule: Default::default(),
1133 };
1134 (id, desc)
1135 })
1136 .collect();
1137
1138 DataflowDescription {
1139 source_imports,
1140 index_imports,
1141 objects_to_build: Default::default(),
1142 index_exports,
1143 sink_exports,
1144 as_of: None,
1145 until: Default::default(),
1146 initial_storage_as_of: Default::default(),
1147 refresh_schedule: Default::default(),
1148 debug_name: Default::default(),
1149 time_dependence: None,
1150 }
1151 }
1152
1153 macro_rules! testcase {
1154 ($name:ident, {
1155 storage: { $( $storage_id:literal: ($read:expr, $write:expr), )* },
1156 dataflows: [ $( $export_id:literal <- $inputs:expr => $as_of:expr, )* ],
1157 current_time: $current_time:literal,
1158 $( read_policies: { $( $policy_id:literal: $policy:expr, )* }, )?
1159 $( read_only: $read_only:expr, )?
1160 }) => {
1161 #[mz_ore::test]
1162 fn $name() {
1163 let storage_ids = [$( $storage_id, )*].into();
1164
1165 let storage_frontiers = StorageFrontiers(BTreeMap::from([
1166 $(
1167 (
1168 $storage_id.parse().unwrap(),
1169 (ts_to_frontier($read), ts_to_frontier($write)),
1170 ),
1171 )*
1172 ]));
1173
1174 let mut dataflows = [
1175 $(
1176 dataflow($export_id, &$inputs, &storage_ids),
1177 )*
1178 ];
1179
1180 let read_policies = BTreeMap::from([
1181 $($( ($policy_id.parse().unwrap(), $policy), )*)?
1182 ]);
1183
1184 #[allow(unused_variables)]
1185 let read_only = false;
1186 $( let read_only = $read_only; )?
1187
1188 super::run(
1189 &mut dataflows,
1190 &read_policies,
1191 &storage_frontiers,
1192 $current_time.into(),
1193 read_only,
1194 );
1195
1196 let actual_as_ofs: Vec<_> = dataflows
1197 .into_iter()
1198 .map(|d| d.as_of.unwrap())
1199 .collect();
1200 let expected_as_ofs = [ $( ts_to_frontier($as_of), )* ];
1201
1202 assert_eq!(actual_as_ofs, expected_as_ofs);
1203 }
1204 };
1205 }
1206
1207 testcase!(upstream_storage_constraints, {
1208 storage: {
1209 "s1": (10, 20),
1210 "s2": (20, 30),
1211 },
1212 dataflows: [
1213 "u1" <- ["s1"] => 10,
1214 "u2" <- ["s2"] => 20,
1215 "u3" <- ["s1", "s2"] => 20,
1216 "u4" <- ["u1", "u2"] => 20,
1217 ],
1218 current_time: 0,
1219 });
1220
1221 testcase!(downstream_storage_constraints, {
1222 storage: {
1223 "s1": (10, 20),
1224 "u3": (10, 15),
1225 "u4": (10, 13),
1226 },
1227 dataflows: [
1228 "u1" <- ["s1"] => 19,
1229 "u2" <- ["s1"] => 12,
1230 "u3" <- ["u2"] => 14,
1231 "u4" <- ["u2"] => 12,
1232 ],
1233 current_time: 100,
1234 });
1235
1236 testcase!(warmup_constraints, {
1237 storage: {
1238 "s1": (10, 20),
1239 "s2": (10, 30),
1240 "s3": (10, 40),
1241 "s4": (10, 50),
1242 },
1243 dataflows: [
1244 "u1" <- ["s1"] => 19,
1245 "u2" <- ["s2"] => 19,
1246 "u3" <- ["s3"] => 39,
1247 "u4" <- ["s4"] => 39,
1248 "u5" <- ["u1", "u2"] => 19,
1249 "u6" <- ["u3", "u4"] => 39,
1250 ],
1251 current_time: 100,
1252 });
1253
1254 testcase!(index_read_policy_constraints, {
1255 storage: {
1256 "s1": (10, 20),
1257 "u6": (10, 18),
1258 },
1259 dataflows: [
1260 "u1" <- ["s1"] => 15,
1261 "u2" <- ["s1"] => 10,
1262 "u3" <- ["s1"] => 13,
1263 "u4" <- ["s1"] => 10,
1264 "u5" <- [] => 95,
1265 "u6" <- ["s1"] => 17,
1266 ],
1267 current_time: 100,
1268 read_policies: {
1269 "u1": ReadPolicy::lag_writes_by(5.into(), 1.into()),
1270 "u2": ReadPolicy::lag_writes_by(15.into(), 1.into()),
1271 "u3": ReadPolicy::ValidFrom(Antichain::from_elem(13.into())),
1272 "u4": ReadPolicy::ValidFrom(Antichain::from_elem(5.into())),
1273 "u5": ReadPolicy::lag_writes_by(5.into(), 1.into()),
1274 "u6": ReadPolicy::ValidFrom(Antichain::from_elem(13.into())),
1275 },
1276 });
1277
1278 testcase!(index_current_time_constraints, {
1279 storage: {
1280 "s1": (10, 20),
1281 "s2": (20, 30),
1282 "u4": (10, 12),
1283 "u5": (10, 18),
1284 },
1285 dataflows: [
1286 "u1" <- ["s1"] => 15,
1287 "u2" <- ["s2"] => 20,
1288 "u3" <- ["s1"] => 11,
1289 "u4" <- ["u3"] => 11,
1290 "u5" <- ["s1"] => 17,
1291 "u6" <- [] => 15,
1292 ],
1293 current_time: 15,
1294 });
1295
1296 testcase!(sealed_storage_sink, {
1297 storage: {
1298 "s1": (10, 20),
1299 "u1": (10, SEALED),
1300 },
1301 dataflows: [
1302 "u1" <- ["s1"] => SEALED,
1303 ],
1304 current_time: 100,
1305 });
1306
1307 testcase!(read_only_dropped_storage_inputs, {
1308 storage: {
1309 "s1": (10, 20),
1310 "s2": (SEALED, SEALED),
1311 "u4": (10, 20),
1312 },
1313 dataflows: [
1314 "u1" <- ["s1"] => 15,
1315 "u2" <- ["s2"] => SEALED,
1316 "u3" <- ["s1", "s2"] => SEALED,
1317 "u4" <- ["u2"] => SEALED,
1318 ],
1319 current_time: 15,
1320 read_only: true,
1321 });
1322
1323 testcase!(github_9273, {
1325 storage: {
1326 "s1": (10, 20),
1327 "u3": (14, 15),
1328 },
1329 dataflows: [
1330 "u1" <- ["s1"] => 14,
1331 "u2" <- ["u1"] => 19,
1332 "u3" <- ["u1"] => 14,
1333 ],
1334 current_time: 100,
1335 read_policies: {
1336 "u1": ReadPolicy::lag_writes_by(1.into(), 1.into()),
1337 "u2": ReadPolicy::lag_writes_by(1.into(), 1.into()),
1338 },
1339 });
1340}