1use std::cell::RefCell;
80use std::collections::BTreeMap;
81use std::fmt;
82use std::rc::Rc;
83
84use mz_compute_types::dataflows::DataflowDescription;
85use mz_compute_types::plan::Plan;
86use mz_ore::collections::CollectionExt;
87use mz_ore::soft_panic_or_log;
88use mz_repr::{GlobalId, TimestampManipulation};
89use mz_storage_client::storage_collections::StorageCollections;
90use mz_storage_types::read_holds::ReadHold;
91use mz_storage_types::read_policy::ReadPolicy;
92use timely::PartialOrder;
93use timely::progress::{Antichain, Timestamp};
94use tracing::info;
95
96pub fn run<T: TimestampManipulation>(
102 dataflows: &mut [DataflowDescription<Plan<T>, (), T>],
103 read_policies: &BTreeMap<GlobalId, ReadPolicy<T>>,
104 storage_collections: &dyn StorageCollections<Timestamp = T>,
105 current_time: T,
106) -> BTreeMap<GlobalId, ReadHold<T>> {
107 let mut storage_read_holds = BTreeMap::new();
110 for dataflow in &*dataflows {
111 for id in dataflow.source_imports.keys() {
112 if !storage_read_holds.contains_key(id) {
113 let read_hold = storage_collections
114 .acquire_read_holds(vec![*id])
115 .expect("storage collection exists")
116 .into_element();
117 storage_read_holds.insert(*id, read_hold);
118 }
119 }
120 }
121
122 let mut ctx = Context::new(dataflows, storage_collections, read_policies, current_time);
123
124 ctx.prune_sealed_persist_sinks();
128
129 ctx.apply_upstream_storage_constraints(&storage_read_holds);
131 ctx.apply_downstream_storage_constraints();
132
133 ctx.apply_warmup_constraints();
143
144 ctx.apply_index_read_policy_constraints();
146
147 ctx.apply_index_current_time_constraints();
150
151 for dataflow in dataflows {
153 let first_export = dataflow.export_ids().next();
156 let as_of = first_export.map_or_else(Antichain::new, |id| ctx.best_as_of(id));
157 dataflow.as_of = Some(as_of);
158 }
159
160 storage_read_holds
161}
162
163#[derive(Debug)]
165struct AsOfBounds<T> {
166 lower: Antichain<T>,
167 upper: Antichain<T>,
168 sealed: bool,
170}
171
172impl<T: Clone> AsOfBounds<T> {
173 fn single(frontier: Antichain<T>) -> Self {
175 Self {
176 lower: frontier.clone(),
177 upper: frontier,
178 sealed: false,
179 }
180 }
181
182 fn get(&self, type_: BoundType) -> &Antichain<T> {
184 match type_ {
185 BoundType::Lower => &self.lower,
186 BoundType::Upper => &self.upper,
187 }
188 }
189}
190
191impl<T: Timestamp> Default for AsOfBounds<T> {
192 fn default() -> Self {
193 Self {
194 lower: Antichain::from_elem(T::minimum()),
195 upper: Antichain::new(),
196 sealed: false,
197 }
198 }
199}
200
201impl<T: fmt::Debug> fmt::Display for AsOfBounds<T> {
202 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
203 write!(
204 f,
205 "[{:?} .. {:?}]",
206 self.lower.elements(),
207 self.upper.elements()
208 )
209 }
210}
211
212#[derive(Clone, Copy, Debug, PartialEq, Eq)]
214enum BoundType {
215 Lower,
216 Upper,
217}
218
219impl fmt::Display for BoundType {
220 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221 match self {
222 Self::Lower => f.write_str("lower"),
223 Self::Upper => f.write_str("upper"),
224 }
225 }
226}
227
228#[derive(Clone, Copy, Debug, PartialEq, Eq)]
230enum ConstraintType {
231 Hard,
234 Soft,
237}
238
239#[derive(Debug)]
241struct Constraint<'a, T> {
242 type_: ConstraintType,
243 bound_type: BoundType,
245 frontier: &'a Antichain<T>,
247 reason: &'a str,
251}
252
253impl<T: Timestamp> Constraint<'_, T> {
254 fn apply(&self, bounds: &mut AsOfBounds<T>) -> Result<bool, bool> {
264 if bounds.sealed {
265 return Ok(false);
266 }
267
268 match self.bound_type {
269 BoundType::Lower => {
270 if PartialOrder::less_than(&bounds.upper, self.frontier) {
271 bounds.sealed = true;
272 if PartialOrder::less_than(&bounds.lower, &bounds.upper) {
273 bounds.lower.clone_from(&bounds.upper);
274 Err(true)
275 } else {
276 Err(false)
277 }
278 } else if PartialOrder::less_equal(self.frontier, &bounds.lower) {
279 Ok(false)
280 } else {
281 bounds.lower.clone_from(self.frontier);
282 Ok(true)
283 }
284 }
285 BoundType::Upper => {
286 if PartialOrder::less_than(self.frontier, &bounds.lower) {
287 bounds.sealed = true;
288 if PartialOrder::less_than(&bounds.lower, &bounds.upper) {
289 bounds.upper.clone_from(&bounds.lower);
290 Err(true)
291 } else {
292 Err(false)
293 }
294 } else if PartialOrder::less_equal(&bounds.upper, self.frontier) {
295 Ok(false)
296 } else {
297 bounds.upper.clone_from(self.frontier);
298 Ok(true)
299 }
300 }
301 }
302 }
303}
304
305struct Collection<'a, T> {
307 storage_inputs: Vec<GlobalId>,
308 compute_inputs: Vec<GlobalId>,
309 read_policy: Option<&'a ReadPolicy<T>>,
310 bounds: Rc<RefCell<AsOfBounds<T>>>,
314 is_index: bool,
316}
317
318struct Context<'a, T> {
320 collections: BTreeMap<GlobalId, Collection<'a, T>>,
321 storage_collections: &'a dyn StorageCollections<Timestamp = T>,
322 current_time: T,
323}
324
325impl<'a, T: TimestampManipulation> Context<'a, T> {
326 fn new(
328 dataflows: &[DataflowDescription<Plan<T>, (), T>],
329 storage_collections: &'a dyn StorageCollections<Timestamp = T>,
330 read_policies: &'a BTreeMap<GlobalId, ReadPolicy<T>>,
331 current_time: T,
332 ) -> Self {
333 let mut collections = BTreeMap::new();
336 for dataflow in dataflows {
337 let storage_inputs: Vec<_> = dataflow.source_imports.keys().copied().collect();
338 let compute_inputs: Vec<_> = dataflow.index_imports.keys().copied().collect();
339
340 let bounds = match dataflow.as_of.clone() {
341 Some(frontier) => AsOfBounds::single(frontier),
342 None => AsOfBounds::default(),
343 };
344 let bounds = Rc::new(RefCell::new(bounds));
345
346 for id in dataflow.export_ids() {
347 let collection = Collection {
348 storage_inputs: storage_inputs.clone(),
349 compute_inputs: compute_inputs.clone(),
350 read_policy: read_policies.get(&id),
351 bounds: Rc::clone(&bounds),
352 is_index: dataflow.index_exports.contains_key(&id),
353 };
354 collections.insert(id, collection);
355 }
356 }
357
358 Self {
359 collections,
360 storage_collections,
361 current_time,
362 }
363 }
364
365 fn expect_collection(&self, id: GlobalId) -> &Collection<T> {
371 self.collections
372 .get(&id)
373 .unwrap_or_else(|| panic!("collection missing: {id}"))
374 }
375
376 fn apply_constraint(&self, id: GlobalId, constraint: Constraint<T>) -> bool {
380 let collection = self.expect_collection(id);
381 let mut bounds = collection.bounds.borrow_mut();
382 match constraint.apply(&mut bounds) {
383 Ok(changed) => {
384 if changed {
385 info!(%id, %bounds, reason = %constraint.reason, "applied as-of constraint");
386 }
387 changed
388 }
389 Err(changed) => {
390 match constraint.type_ {
391 ConstraintType::Hard => {
392 soft_panic_or_log!(
393 "failed to apply hard as-of constraint \
394 (id={id}, bounds={bounds}, constraint={constraint:?})"
395 );
396 }
397 ConstraintType::Soft => {
398 info!(%id, %bounds, ?constraint, "failed to apply soft as-of constraint");
399 }
400 }
401 changed
402 }
403 }
404 }
405
406 fn apply_upstream_storage_constraints(
414 &self,
415 storage_read_holds: &BTreeMap<GlobalId, ReadHold<T>>,
416 ) {
417 for (id, collection) in &self.collections {
419 for input_id in &collection.storage_inputs {
420 let read_hold = &storage_read_holds[input_id];
421 let constraint = Constraint {
422 type_: ConstraintType::Hard,
423 bound_type: BoundType::Lower,
424 frontier: read_hold.since(),
425 reason: &format!("storage input {input_id} read frontier"),
426 };
427 self.apply_constraint(*id, constraint);
428 }
429 }
430
431 self.propagate_bounds_downstream(BoundType::Lower);
433 }
434
435 fn apply_downstream_storage_constraints(&self) {
461 for id in self.collections.keys() {
463 let Ok(frontiers) = self.storage_collections.collection_frontiers(*id) else {
464 continue;
465 };
466
467 let collection_empty =
468 PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
469 let upper = if collection_empty {
470 frontiers.read_capabilities
471 } else {
472 Antichain::from_iter(
473 frontiers
474 .write_frontier
475 .iter()
476 .map(|t| t.step_back().unwrap_or_else(T::minimum)),
477 )
478 };
479
480 let constraint = Constraint {
481 type_: ConstraintType::Hard,
482 bound_type: BoundType::Upper,
483 frontier: &upper,
484 reason: &format!("storage export {id} write frontier"),
485 };
486 self.apply_constraint(*id, constraint);
487 }
488
489 self.propagate_bounds_upstream(BoundType::Upper);
491 }
492
493 fn apply_warmup_constraints(&self) {
502 for (id, collection) in &self.collections {
504 for input_id in &collection.storage_inputs {
505 let frontiers = self
506 .storage_collections
507 .collection_frontiers(*input_id)
508 .expect("storage collection exists");
509 let upper = step_back_frontier(&frontiers.write_frontier);
510 let constraint = Constraint {
511 type_: ConstraintType::Soft,
512 bound_type: BoundType::Upper,
513 frontier: &upper,
514 reason: &format!("storage input {input_id} warmup frontier"),
515 };
516 self.apply_constraint(*id, constraint);
517 }
518 }
519
520 self.propagate_bounds_downstream(BoundType::Upper);
523 }
524
525 fn apply_index_read_policy_constraints(&self) {
535 let mut write_frontiers = BTreeMap::new();
542 for (id, collection) in &self.collections {
543 let storage_frontiers = self
544 .storage_collections
545 .collections_frontiers(collection.storage_inputs.clone())
546 .expect("storage collections exist");
547
548 let mut write_frontier = Antichain::new();
549 for frontiers in storage_frontiers {
550 write_frontier.extend(frontiers.write_frontier);
551 }
552
553 write_frontiers.insert(*id, write_frontier);
554 }
555
556 fixpoint(|changed| {
558 for (id, collection) in &self.collections {
559 let write_frontier = write_frontiers.get_mut(id).expect("inserted above");
560 for input_id in &collection.compute_inputs {
561 let input_collection = self.expect_collection(*input_id);
562 let bounds = input_collection.bounds.borrow();
563 *changed |= write_frontier.extend(bounds.upper.iter().cloned());
564 }
565 }
566 });
567
568 for (id, collection) in &self.collections {
570 if let (true, Some(read_policy)) = (collection.is_index, &collection.read_policy) {
571 let mut write_frontier = write_frontiers.remove(id).expect("inserted above");
572 if write_frontier.is_empty() {
573 write_frontier = Antichain::from_elem(self.current_time.clone());
574 }
575 let upper = read_policy.frontier(write_frontier.borrow());
576 let constraint = Constraint {
577 type_: ConstraintType::Soft,
578 bound_type: BoundType::Upper,
579 frontier: &upper,
580 reason: &format!(
581 "read policy applied to write frontier {:?}",
582 write_frontier.elements()
583 ),
584 };
585 self.apply_constraint(*id, constraint);
586 }
587 }
588
589 self.propagate_bounds_upstream(BoundType::Upper);
591 }
592
593 fn apply_index_current_time_constraints(&self) {
600 let upper = Antichain::from_elem(self.current_time.clone());
602 for (id, collection) in &self.collections {
603 if collection.is_index {
604 let constraint = Constraint {
605 type_: ConstraintType::Soft,
606 bound_type: BoundType::Upper,
607 frontier: &upper,
608 reason: "index current time",
609 };
610 self.apply_constraint(*id, constraint);
611 }
612 }
613
614 self.propagate_bounds_upstream(BoundType::Upper);
616 }
617
618 fn propagate_bounds_downstream(&self, bound_type: BoundType) {
620 let constraint_type = match bound_type {
623 BoundType::Lower => ConstraintType::Hard,
624 BoundType::Upper => ConstraintType::Soft,
625 };
626
627 fixpoint(|changed| {
630 self.propagate_bounds_downstream_inner(bound_type, constraint_type, changed);
631
632 if bound_type == BoundType::Upper {
635 self.propagate_bounds_upstream_inner(
636 BoundType::Upper,
637 ConstraintType::Hard,
638 changed,
639 );
640 }
641 });
642 }
643
644 fn propagate_bounds_downstream_inner(
645 &self,
646 bound_type: BoundType,
647 constraint_type: ConstraintType,
648 changed: &mut bool,
649 ) {
650 for (id, collection) in &self.collections {
651 for input_id in &collection.compute_inputs {
652 let input_collection = self.expect_collection(*input_id);
653 let bounds = input_collection.bounds.borrow();
654 let constraint = Constraint {
655 type_: constraint_type,
656 bound_type,
657 frontier: bounds.get(bound_type),
658 reason: &format!("upstream {input_id} {bound_type} as-of bound"),
659 };
660 *changed |= self.apply_constraint(*id, constraint);
661 }
662 }
663 }
664
665 fn propagate_bounds_upstream(&self, bound_type: BoundType) {
667 let constraint_type = match bound_type {
670 BoundType::Lower => ConstraintType::Soft,
671 BoundType::Upper => ConstraintType::Hard,
672 };
673
674 fixpoint(|changed| {
677 self.propagate_bounds_upstream_inner(bound_type, constraint_type, changed);
678
679 if bound_type == BoundType::Lower {
682 self.propagate_bounds_downstream_inner(
683 BoundType::Lower,
684 ConstraintType::Hard,
685 changed,
686 );
687 }
688 });
689 }
690
691 fn propagate_bounds_upstream_inner(
692 &self,
693 bound_type: BoundType,
694 constraint_type: ConstraintType,
695 changed: &mut bool,
696 ) {
697 for (id, collection) in self.collections.iter().rev() {
698 let bounds = collection.bounds.borrow();
699 for input_id in &collection.compute_inputs {
700 let constraint = Constraint {
701 type_: constraint_type,
702 bound_type,
703 frontier: bounds.get(bound_type),
704 reason: &format!("downstream {id} {bound_type} as-of bound"),
705 };
706 *changed |= self.apply_constraint(*input_id, constraint);
707 }
708 }
709 }
710
711 fn best_as_of(&self, id: GlobalId) -> Antichain<T> {
718 if let Some(collection) = self.collections.get(&id) {
719 let bounds = collection.bounds.borrow();
720 bounds.upper.clone()
721 } else {
722 Antichain::new()
723 }
724 }
725
726 fn prune_sealed_persist_sinks(&mut self) {
734 self.collections.retain(|id, _| {
735 self.storage_collections
736 .collection_frontiers(*id)
737 .map_or(true, |f| !f.write_frontier.is_empty())
738 });
739 }
740}
741
742fn fixpoint(mut step: impl FnMut(&mut bool)) {
744 loop {
745 let mut changed = false;
746 step(&mut changed);
747 if !changed {
748 break;
749 }
750 }
751}
752
753fn step_back_frontier<T: TimestampManipulation>(frontier: &Antichain<T>) -> Antichain<T> {
758 frontier
759 .iter()
760 .map(|t| t.step_back().unwrap_or_else(T::minimum))
761 .collect()
762}
763
764#[cfg(test)]
765mod tests {
766 use std::collections::BTreeSet;
767
768 use async_trait::async_trait;
769 use differential_dataflow::lattice::Lattice;
770 use futures::future::BoxFuture;
771 use futures::stream::BoxStream;
772 use mz_compute_types::dataflows::{IndexDesc, IndexImport};
773 use mz_compute_types::sinks::ComputeSinkConnection;
774 use mz_compute_types::sinks::ComputeSinkDesc;
775 use mz_compute_types::sinks::MaterializedViewSinkConnection;
776 use mz_compute_types::sources::SourceInstanceArguments;
777 use mz_compute_types::sources::SourceInstanceDesc;
778 use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
779 use mz_persist_types::{Codec64, ShardId};
780 use mz_repr::Timestamp;
781 use mz_repr::{RelationDesc, RelationType, RelationVersion, Row};
782 use mz_storage_client::client::TimestamplessUpdateBuilder;
783 use mz_storage_client::controller::{CollectionDescription, StorageMetadata, StorageTxn};
784 use mz_storage_client::storage_collections::{CollectionFrontiers, SnapshotCursor};
785 use mz_storage_types::StorageDiff;
786 use mz_storage_types::connections::inline::InlinedConnection;
787 use mz_storage_types::controller::{CollectionMetadata, StorageError};
788 use mz_storage_types::parameters::StorageParameters;
789 use mz_storage_types::read_holds::ReadHoldError;
790 use mz_storage_types::sources::{GenericSourceConnection, SourceDesc};
791 use mz_storage_types::sources::{SourceData, SourceExportDataConfig};
792 use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
793 use timely::progress::Timestamp as TimelyTimestamp;
794
795 use super::*;
796
797 const SEALED: u64 = 0x5ea1ed;
798
799 fn ts_to_frontier(ts: u64) -> Antichain<Timestamp> {
800 if ts == SEALED {
801 Antichain::new()
802 } else {
803 Antichain::from_elem(ts.into())
804 }
805 }
806
807 #[derive(Debug)]
808 struct StorageFrontiers(BTreeMap<GlobalId, (Antichain<Timestamp>, Antichain<Timestamp>)>);
809
810 #[async_trait]
811 impl StorageCollections for StorageFrontiers {
812 type Timestamp = Timestamp;
813
814 async fn initialize_state(
815 &self,
816 _txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
817 _init_ids: BTreeSet<GlobalId>,
818 ) -> Result<(), StorageError<Self::Timestamp>> {
819 unimplemented!()
820 }
821
822 fn update_parameters(&self, _config_params: StorageParameters) {
823 unimplemented!()
824 }
825
826 fn collection_metadata(
827 &self,
828 _id: GlobalId,
829 ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>> {
830 unimplemented!()
831 }
832
833 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
834 unimplemented!()
835 }
836
837 fn collections_frontiers(
838 &self,
839 ids: Vec<GlobalId>,
840 ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>>
841 {
842 let mut frontiers = Vec::with_capacity(ids.len());
843 for id in ids {
844 let (read, write) = self.0.get(&id).ok_or(StorageError::IdentifierMissing(id))?;
845 frontiers.push(CollectionFrontiers {
846 id,
847 write_frontier: write.clone(),
848 implied_capability: read.clone(),
849 read_capabilities: read.clone(),
850 })
851 }
852 Ok(frontiers)
853 }
854
855 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
856 unimplemented!()
857 }
858
859 fn check_exists(&self, _id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
860 unimplemented!()
861 }
862
863 async fn snapshot_stats(
864 &self,
865 _id: GlobalId,
866 _as_of: Antichain<Self::Timestamp>,
867 ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
868 unimplemented!()
869 }
870
871 async fn snapshot_parts_stats(
872 &self,
873 _id: GlobalId,
874 _as_of: Antichain<Self::Timestamp>,
875 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
876 unimplemented!()
877 }
878
879 fn snapshot(
880 &self,
881 _id: GlobalId,
882 _as_of: Self::Timestamp,
883 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>
884 {
885 unimplemented!()
886 }
887
888 async fn snapshot_latest(
889 &self,
890 _id: GlobalId,
891 ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
892 unimplemented!()
893 }
894
895 fn snapshot_cursor(
896 &self,
897 _id: GlobalId,
898 _as_of: Self::Timestamp,
899 ) -> BoxFuture<
900 'static,
901 Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>,
902 >
903 where
904 Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
905 {
906 unimplemented!()
907 }
908
909 fn snapshot_and_stream(
910 &self,
911 _id: GlobalId,
912 _as_of: Self::Timestamp,
913 ) -> BoxFuture<
914 'static,
915 Result<
916 BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
917 StorageError<Self::Timestamp>,
918 >,
919 > {
920 unimplemented!()
921 }
922
923 fn create_update_builder(
926 &self,
927 _id: GlobalId,
928 ) -> BoxFuture<
929 'static,
930 Result<
931 TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
932 StorageError<Self::Timestamp>,
933 >,
934 >
935 where
936 Self::Timestamp: Lattice + Codec64,
937 {
938 unimplemented!()
939 }
940
941 async fn prepare_state(
942 &self,
943 _txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
944 _ids_to_add: BTreeSet<GlobalId>,
945 _ids_to_drop: BTreeSet<GlobalId>,
946 _ids_to_register: BTreeMap<GlobalId, ShardId>,
947 ) -> Result<(), StorageError<Self::Timestamp>> {
948 unimplemented!()
949 }
950
951 async fn create_collections_for_bootstrap(
952 &self,
953 _storage_metadata: &StorageMetadata,
954 _register_ts: Option<Self::Timestamp>,
955 _collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
956 _migrated_storage_collections: &BTreeSet<GlobalId>,
957 ) -> Result<(), StorageError<Self::Timestamp>> {
958 unimplemented!()
959 }
960
961 async fn alter_ingestion_source_desc(
962 &self,
963 _ingestion_id: GlobalId,
964 _source_desc: SourceDesc,
965 ) -> Result<(), StorageError<Self::Timestamp>> {
966 unimplemented!()
967 }
968
969 async fn alter_ingestion_export_data_configs(
970 &self,
971 _source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
972 ) -> Result<(), StorageError<Self::Timestamp>> {
973 unimplemented!()
974 }
975
976 async fn alter_ingestion_connections(
977 &self,
978 _source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
979 ) -> Result<(), StorageError<Self::Timestamp>> {
980 unimplemented!()
981 }
982
983 async fn alter_table_desc(
984 &self,
985 _existing_collection: GlobalId,
986 _new_collection: GlobalId,
987 _new_desc: RelationDesc,
988 _expected_version: RelationVersion,
989 ) -> Result<(), StorageError<Self::Timestamp>> {
990 unimplemented!()
991 }
992
993 fn drop_collections_unvalidated(
994 &self,
995 _storage_metadata: &StorageMetadata,
996 _identifiers: Vec<GlobalId>,
997 ) {
998 unimplemented!()
999 }
1000
1001 fn set_read_policies(&self, _policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
1002 unimplemented!()
1003 }
1004
1005 fn acquire_read_holds(
1006 &self,
1007 desired_holds: Vec<GlobalId>,
1008 ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError> {
1009 let mut holds = Vec::with_capacity(desired_holds.len());
1010 for id in desired_holds {
1011 let (read, _write) = self
1012 .0
1013 .get(&id)
1014 .ok_or(ReadHoldError::CollectionMissing(id))?;
1015 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
1016 holds.push(ReadHold::with_channel(id, read.clone(), tx));
1017 }
1018 Ok(holds)
1019 }
1020
1021 fn determine_time_dependence(
1022 &self,
1023 _id: GlobalId,
1024 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
1025 unimplemented!()
1026 }
1027 }
1028
1029 fn dataflow(
1030 export_id: &str,
1031 input_ids: &[&str],
1032 storage_ids: &BTreeSet<&str>,
1033 ) -> DataflowDescription<Plan> {
1034 let source_imports = input_ids
1035 .iter()
1036 .filter(|s| storage_ids.contains(*s))
1037 .map(|s| {
1038 let id = s.parse().unwrap();
1039 let desc = SourceInstanceDesc {
1040 arguments: SourceInstanceArguments {
1041 operators: Default::default(),
1042 },
1043 storage_metadata: Default::default(),
1044 typ: RelationType::empty(),
1045 };
1046 (id, (desc, Default::default(), Default::default()))
1047 })
1048 .collect();
1049 let index_imports = input_ids
1050 .iter()
1051 .filter(|s| !storage_ids.contains(*s))
1052 .map(|s| {
1053 let id = s.parse().unwrap();
1054 let import = IndexImport {
1055 desc: IndexDesc {
1056 on_id: GlobalId::Transient(0),
1057 key: Default::default(),
1058 },
1059 typ: RelationType::empty(),
1060 monotonic: Default::default(),
1061 };
1062 (id, import)
1063 })
1064 .collect();
1065 let index_exports = std::iter::once(export_id)
1066 .filter(|s| !storage_ids.contains(*s))
1067 .map(|sid| {
1068 let id = sid.parse().unwrap();
1069 let desc = IndexDesc {
1070 on_id: GlobalId::Transient(0),
1071 key: Default::default(),
1072 };
1073 let typ = RelationType::empty();
1074 (id, (desc, typ))
1075 })
1076 .collect();
1077 let sink_exports = std::iter::once(export_id)
1078 .filter(|s| storage_ids.contains(*s))
1079 .map(|sid| {
1080 let id = sid.parse().unwrap();
1081 let desc = ComputeSinkDesc {
1082 from: GlobalId::Transient(0),
1083 from_desc: RelationDesc::empty(),
1084 connection: ComputeSinkConnection::MaterializedView(
1085 MaterializedViewSinkConnection {
1086 value_desc: RelationDesc::empty(),
1087 storage_metadata: Default::default(),
1088 },
1089 ),
1090 with_snapshot: Default::default(),
1091 up_to: Default::default(),
1092 non_null_assertions: Default::default(),
1093 refresh_schedule: Default::default(),
1094 };
1095 (id, desc)
1096 })
1097 .collect();
1098
1099 DataflowDescription {
1100 source_imports,
1101 index_imports,
1102 objects_to_build: Default::default(),
1103 index_exports,
1104 sink_exports,
1105 as_of: None,
1106 until: Default::default(),
1107 initial_storage_as_of: Default::default(),
1108 refresh_schedule: Default::default(),
1109 debug_name: Default::default(),
1110 time_dependence: None,
1111 }
1112 }
1113
1114 macro_rules! testcase {
1115 ($name:ident, {
1116 storage: { $( $storage_id:literal: ($read:expr, $write:expr), )* },
1117 dataflows: [ $( $export_id:literal <- $inputs:expr => $as_of:expr, )* ],
1118 current_time: $current_time:literal,
1119 $( read_policies: { $( $policy_id:literal: $policy:expr, )* }, )?
1120 }) => {
1121 #[mz_ore::test]
1122 fn $name() {
1123 let storage_ids = [$( $storage_id, )*].into();
1124
1125 let storage_frontiers = StorageFrontiers(BTreeMap::from([
1126 $(
1127 (
1128 $storage_id.parse().unwrap(),
1129 (ts_to_frontier($read), ts_to_frontier($write)),
1130 ),
1131 )*
1132 ]));
1133
1134 let mut dataflows = [
1135 $(
1136 dataflow($export_id, &$inputs, &storage_ids),
1137 )*
1138 ];
1139
1140 let read_policies = BTreeMap::from([
1141 $($( ($policy_id.parse().unwrap(), $policy), )*)?
1142 ]);
1143
1144 super::run(
1145 &mut dataflows,
1146 &read_policies,
1147 &storage_frontiers,
1148 $current_time.into(),
1149 );
1150
1151 let actual_as_ofs: Vec<_> = dataflows
1152 .into_iter()
1153 .map(|d| d.as_of.unwrap())
1154 .collect();
1155 let expected_as_ofs = [ $( ts_to_frontier($as_of), )* ];
1156
1157 assert_eq!(actual_as_ofs, expected_as_ofs);
1158 }
1159 };
1160 }
1161
1162 testcase!(upstream_storage_constraints, {
1163 storage: {
1164 "s1": (10, 20),
1165 "s2": (20, 30),
1166 },
1167 dataflows: [
1168 "u1" <- ["s1"] => 10,
1169 "u2" <- ["s2"] => 20,
1170 "u3" <- ["s1", "s2"] => 20,
1171 "u4" <- ["u1", "u2"] => 20,
1172 ],
1173 current_time: 0,
1174 });
1175
1176 testcase!(downstream_storage_constraints, {
1177 storage: {
1178 "s1": (10, 20),
1179 "u3": (10, 15),
1180 "u4": (10, 13),
1181 },
1182 dataflows: [
1183 "u1" <- ["s1"] => 19,
1184 "u2" <- ["s1"] => 12,
1185 "u3" <- ["u2"] => 12,
1186 "u4" <- ["u2"] => 12,
1187 ],
1188 current_time: 100,
1189 });
1190
1191 testcase!(warmup_constraints, {
1192 storage: {
1193 "s1": (10, 20),
1194 "s2": (10, 30),
1195 "s3": (10, 40),
1196 "s4": (10, 50),
1197 },
1198 dataflows: [
1199 "u1" <- ["s1"] => 19,
1200 "u2" <- ["s2"] => 19,
1201 "u3" <- ["s3"] => 39,
1202 "u4" <- ["s4"] => 39,
1203 "u5" <- ["u1", "u2"] => 19,
1204 "u6" <- ["u3", "u4"] => 39,
1205 ],
1206 current_time: 100,
1207 });
1208
1209 testcase!(index_read_policy_constraints, {
1210 storage: {
1211 "s1": (10, 20),
1212 "u6": (10, 18),
1213 },
1214 dataflows: [
1215 "u1" <- ["s1"] => 15,
1216 "u2" <- ["s1"] => 10,
1217 "u3" <- ["s1"] => 13,
1218 "u4" <- ["s1"] => 10,
1219 "u5" <- [] => 95,
1220 "u6" <- ["s1"] => 17,
1221 ],
1222 current_time: 100,
1223 read_policies: {
1224 "u1": ReadPolicy::lag_writes_by(5.into(), 1.into()),
1225 "u2": ReadPolicy::lag_writes_by(15.into(), 1.into()),
1226 "u3": ReadPolicy::ValidFrom(Antichain::from_elem(13.into())),
1227 "u4": ReadPolicy::ValidFrom(Antichain::from_elem(5.into())),
1228 "u5": ReadPolicy::lag_writes_by(5.into(), 1.into()),
1229 "u6": ReadPolicy::ValidFrom(Antichain::from_elem(13.into())),
1230 },
1231 });
1232
1233 testcase!(index_current_time_constraints, {
1234 storage: {
1235 "s1": (10, 20),
1236 "s2": (20, 30),
1237 "u4": (10, 12),
1238 "u5": (10, 18),
1239 },
1240 dataflows: [
1241 "u1" <- ["s1"] => 15,
1242 "u2" <- ["s2"] => 20,
1243 "u3" <- ["s1"] => 11,
1244 "u4" <- ["u3"] => 11,
1245 "u5" <- ["s1"] => 17,
1246 "u6" <- [] => 15,
1247 ],
1248 current_time: 15,
1249 });
1250
1251 testcase!(sealed_storage_sink, {
1252 storage: {
1253 "s1": (10, 20),
1254 "u1": (10, SEALED),
1255 },
1256 dataflows: [
1257 "u1" <- ["s1"] => SEALED,
1258 ],
1259 current_time: 100,
1260 });
1261}