1use std::cell::RefCell;
80use std::collections::{BTreeMap, BTreeSet};
81use std::fmt;
82use std::rc::Rc;
83
84use mz_compute_types::dataflows::DataflowDescription;
85use mz_compute_types::plan::Plan;
86use mz_ore::collections::CollectionExt;
87use mz_ore::soft_panic_or_log;
88use mz_repr::{GlobalId, TimestampManipulation};
89use mz_storage_client::storage_collections::StorageCollections;
90use mz_storage_types::read_holds::ReadHold;
91use mz_storage_types::read_policy::ReadPolicy;
92use timely::PartialOrder;
93use timely::progress::{Antichain, Timestamp};
94use tracing::{info, warn};
95
96pub fn run<T: TimestampManipulation>(
102 dataflows: &mut [DataflowDescription<Plan<T>, (), T>],
103 read_policies: &BTreeMap<GlobalId, ReadPolicy<T>>,
104 storage_collections: &dyn StorageCollections<Timestamp = T>,
105 current_time: T,
106 read_only_mode: bool,
107) -> BTreeMap<GlobalId, ReadHold<T>> {
108 let mut storage_read_holds = BTreeMap::new();
111 for dataflow in &*dataflows {
112 for id in dataflow.source_imports.keys() {
113 if !storage_read_holds.contains_key(id) {
114 let read_hold = storage_collections
115 .acquire_read_holds(vec![*id])
116 .expect("storage collection exists")
117 .into_element();
118 storage_read_holds.insert(*id, read_hold);
119 }
120 }
121 }
122
123 let mut ctx = Context::new(dataflows, storage_collections, read_policies, current_time);
124
125 ctx.prune_sealed_persist_sinks();
129
130 if read_only_mode {
135 ctx.prune_dropped_collections();
136 }
137
138 ctx.apply_upstream_storage_constraints(&storage_read_holds);
140 ctx.apply_downstream_storage_constraints();
141
142 ctx.apply_warmup_constraints();
158
159 ctx.apply_index_read_policy_constraints();
161
162 ctx.apply_index_current_time_constraints();
165
166 for dataflow in dataflows {
168 let first_export = dataflow.export_ids().next();
171 let as_of = first_export.map_or_else(Antichain::new, |id| ctx.best_as_of(id));
172 dataflow.as_of = Some(as_of);
173 }
174
175 storage_read_holds
176}
177
178#[derive(Debug)]
180struct AsOfBounds<T> {
181 lower: Antichain<T>,
182 upper: Antichain<T>,
183 sealed: bool,
185}
186
187impl<T: Clone> AsOfBounds<T> {
188 fn single(frontier: Antichain<T>) -> Self {
190 Self {
191 lower: frontier.clone(),
192 upper: frontier,
193 sealed: false,
194 }
195 }
196
197 fn get(&self, type_: BoundType) -> &Antichain<T> {
199 match type_ {
200 BoundType::Lower => &self.lower,
201 BoundType::Upper => &self.upper,
202 }
203 }
204}
205
206impl<T: Timestamp> Default for AsOfBounds<T> {
207 fn default() -> Self {
208 Self {
209 lower: Antichain::from_elem(T::minimum()),
210 upper: Antichain::new(),
211 sealed: false,
212 }
213 }
214}
215
216impl<T: fmt::Debug> fmt::Display for AsOfBounds<T> {
217 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218 write!(
219 f,
220 "[{:?} .. {:?}]",
221 self.lower.elements(),
222 self.upper.elements()
223 )
224 }
225}
226
227#[derive(Clone, Copy, Debug, PartialEq, Eq)]
229enum BoundType {
230 Lower,
231 Upper,
232}
233
234impl fmt::Display for BoundType {
235 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236 match self {
237 Self::Lower => f.write_str("lower"),
238 Self::Upper => f.write_str("upper"),
239 }
240 }
241}
242
243#[derive(Clone, Copy, Debug, PartialEq, Eq)]
245enum ConstraintType {
246 Hard,
249 Soft,
252}
253
254#[derive(Debug)]
256struct Constraint<'a, T> {
257 type_: ConstraintType,
258 bound_type: BoundType,
260 frontier: &'a Antichain<T>,
262 reason: &'a str,
266}
267
268impl<T: Timestamp> Constraint<'_, T> {
269 fn apply(&self, bounds: &mut AsOfBounds<T>) -> Result<bool, bool> {
279 if bounds.sealed {
280 return Ok(false);
281 }
282
283 match self.bound_type {
284 BoundType::Lower => {
285 if PartialOrder::less_than(&bounds.upper, self.frontier) {
286 bounds.sealed = true;
287 if PartialOrder::less_than(&bounds.lower, &bounds.upper) {
288 bounds.lower.clone_from(&bounds.upper);
289 Err(true)
290 } else {
291 Err(false)
292 }
293 } else if PartialOrder::less_equal(self.frontier, &bounds.lower) {
294 Ok(false)
295 } else {
296 bounds.lower.clone_from(self.frontier);
297 Ok(true)
298 }
299 }
300 BoundType::Upper => {
301 if PartialOrder::less_than(self.frontier, &bounds.lower) {
302 bounds.sealed = true;
303 if PartialOrder::less_than(&bounds.lower, &bounds.upper) {
304 bounds.upper.clone_from(&bounds.lower);
305 Err(true)
306 } else {
307 Err(false)
308 }
309 } else if PartialOrder::less_equal(&bounds.upper, self.frontier) {
310 Ok(false)
311 } else {
312 bounds.upper.clone_from(self.frontier);
313 Ok(true)
314 }
315 }
316 }
317 }
318}
319
320struct Collection<'a, T> {
322 storage_inputs: Vec<GlobalId>,
323 compute_inputs: Vec<GlobalId>,
324 read_policy: Option<&'a ReadPolicy<T>>,
325 bounds: Rc<RefCell<AsOfBounds<T>>>,
329 is_index: bool,
331}
332
333struct Context<'a, T> {
335 collections: BTreeMap<GlobalId, Collection<'a, T>>,
336 storage_collections: &'a dyn StorageCollections<Timestamp = T>,
337 current_time: T,
338}
339
340impl<'a, T: TimestampManipulation> Context<'a, T> {
341 fn new(
343 dataflows: &[DataflowDescription<Plan<T>, (), T>],
344 storage_collections: &'a dyn StorageCollections<Timestamp = T>,
345 read_policies: &'a BTreeMap<GlobalId, ReadPolicy<T>>,
346 current_time: T,
347 ) -> Self {
348 let mut collections = BTreeMap::new();
351 for dataflow in dataflows {
352 let storage_inputs: Vec<_> = dataflow.source_imports.keys().copied().collect();
353 let compute_inputs: Vec<_> = dataflow.index_imports.keys().copied().collect();
354
355 let bounds = match dataflow.as_of.clone() {
356 Some(frontier) => AsOfBounds::single(frontier),
357 None => AsOfBounds::default(),
358 };
359 let bounds = Rc::new(RefCell::new(bounds));
360
361 for id in dataflow.export_ids() {
362 let collection = Collection {
363 storage_inputs: storage_inputs.clone(),
364 compute_inputs: compute_inputs.clone(),
365 read_policy: read_policies.get(&id),
366 bounds: Rc::clone(&bounds),
367 is_index: dataflow.index_exports.contains_key(&id),
368 };
369 collections.insert(id, collection);
370 }
371 }
372
373 Self {
374 collections,
375 storage_collections,
376 current_time,
377 }
378 }
379
380 fn expect_collection(&self, id: GlobalId) -> &Collection<'_, T> {
386 self.collections
387 .get(&id)
388 .unwrap_or_else(|| panic!("collection missing: {id}"))
389 }
390
391 fn apply_constraint(&self, id: GlobalId, constraint: Constraint<T>) -> bool {
395 let collection = self.expect_collection(id);
396 let mut bounds = collection.bounds.borrow_mut();
397 match constraint.apply(&mut bounds) {
398 Ok(changed) => {
399 if changed {
400 info!(%id, %bounds, reason = %constraint.reason, "applied as-of constraint");
401 }
402 changed
403 }
404 Err(changed) => {
405 match constraint.type_ {
406 ConstraintType::Hard => {
407 soft_panic_or_log!(
408 "failed to apply hard as-of constraint \
409 (id={id}, bounds={bounds}, constraint={constraint:?})"
410 );
411 }
412 ConstraintType::Soft => {
413 info!(%id, %bounds, ?constraint, "failed to apply soft as-of constraint");
414 }
415 }
416 changed
417 }
418 }
419 }
420
421 fn apply_upstream_storage_constraints(
429 &self,
430 storage_read_holds: &BTreeMap<GlobalId, ReadHold<T>>,
431 ) {
432 for (id, collection) in &self.collections {
434 for input_id in &collection.storage_inputs {
435 let read_hold = &storage_read_holds[input_id];
436 let constraint = Constraint {
437 type_: ConstraintType::Hard,
438 bound_type: BoundType::Lower,
439 frontier: read_hold.since(),
440 reason: &format!("storage input {input_id} read frontier"),
441 };
442 self.apply_constraint(*id, constraint);
443 }
444 }
445
446 self.propagate_bounds_downstream(BoundType::Lower);
448 }
449
450 fn apply_downstream_storage_constraints(&self) {
476 for id in self.collections.keys() {
478 let Ok(frontiers) = self.storage_collections.collection_frontiers(*id) else {
479 continue;
480 };
481
482 let collection_empty =
483 PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
484 let upper = if collection_empty {
485 frontiers.read_capabilities
486 } else {
487 Antichain::from_iter(
488 frontiers
489 .write_frontier
490 .iter()
491 .map(|t| t.step_back().unwrap_or_else(T::minimum)),
492 )
493 };
494
495 let constraint = Constraint {
496 type_: ConstraintType::Hard,
497 bound_type: BoundType::Upper,
498 frontier: &upper,
499 reason: &format!("storage export {id} write frontier"),
500 };
501 self.apply_constraint(*id, constraint);
502 }
503
504 self.propagate_bounds_upstream(BoundType::Upper);
506 }
507
508 fn apply_warmup_constraints(&self) {
517 let mut write_frontiers = BTreeMap::new();
519 for (id, collection) in &self.collections {
520 let storage_frontiers = self
521 .storage_collections
522 .collections_frontiers(collection.storage_inputs.clone())
523 .expect("storage collections exist");
524
525 let mut write_frontier = Antichain::new();
526 for frontiers in storage_frontiers {
527 write_frontier.extend(frontiers.write_frontier);
528 }
529
530 write_frontiers.insert(*id, write_frontier);
531 }
532
533 fixpoint(|changed| {
535 for (id, collection) in &self.collections {
536 let mut write_frontier = write_frontiers.remove(id).expect("inserted above");
537 for input_id in &collection.compute_inputs {
538 let frontier = &write_frontiers[input_id];
539 *changed |= write_frontier.extend(frontier.iter().cloned());
540 }
541 write_frontiers.insert(*id, write_frontier);
542 }
543 });
544
545 for (id, write_frontier) in write_frontiers {
547 let upper = step_back_frontier(&write_frontier);
548 let constraint = Constraint {
549 type_: ConstraintType::Soft,
550 bound_type: BoundType::Upper,
551 frontier: &upper,
552 reason: &format!(
553 "warmup frontier derived from storage write frontier {:?}",
554 write_frontier.elements()
555 ),
556 };
557 self.apply_constraint(id, constraint);
558 }
559
560 self.propagate_bounds_upstream(BoundType::Upper);
562 }
563
564 fn apply_index_read_policy_constraints(&self) {
574 let mut write_frontiers = BTreeMap::new();
581 for (id, collection) in &self.collections {
582 let storage_frontiers = self
583 .storage_collections
584 .collections_frontiers(collection.storage_inputs.clone())
585 .expect("storage collections exist");
586
587 let mut write_frontier = Antichain::new();
588 for frontiers in storage_frontiers {
589 write_frontier.extend(frontiers.write_frontier);
590 }
591
592 write_frontiers.insert(*id, write_frontier);
593 }
594
595 fixpoint(|changed| {
597 for (id, collection) in &self.collections {
598 let mut write_frontier = write_frontiers.remove(id).expect("inserted above");
599 for input_id in &collection.compute_inputs {
600 let frontier = &write_frontiers[input_id];
601 *changed |= write_frontier.extend(frontier.iter().cloned());
602 }
603 write_frontiers.insert(*id, write_frontier);
604 }
605 });
606
607 for (id, collection) in &self.collections {
609 if let (true, Some(read_policy)) = (collection.is_index, &collection.read_policy) {
610 let mut write_frontier = write_frontiers.remove(id).expect("inserted above");
611 if write_frontier.is_empty() {
612 write_frontier = Antichain::from_elem(self.current_time.clone());
613 }
614 let upper = read_policy.frontier(write_frontier.borrow());
615 let constraint = Constraint {
616 type_: ConstraintType::Soft,
617 bound_type: BoundType::Upper,
618 frontier: &upper,
619 reason: &format!(
620 "read policy applied to write frontier {:?}",
621 write_frontier.elements()
622 ),
623 };
624 self.apply_constraint(*id, constraint);
625 }
626 }
627
628 self.propagate_bounds_upstream(BoundType::Upper);
630 }
631
632 fn apply_index_current_time_constraints(&self) {
639 let upper = Antichain::from_elem(self.current_time.clone());
641 for (id, collection) in &self.collections {
642 if collection.is_index {
643 let constraint = Constraint {
644 type_: ConstraintType::Soft,
645 bound_type: BoundType::Upper,
646 frontier: &upper,
647 reason: "index current time",
648 };
649 self.apply_constraint(*id, constraint);
650 }
651 }
652
653 self.propagate_bounds_upstream(BoundType::Upper);
655 }
656
657 fn propagate_bounds_downstream(&self, bound_type: BoundType) {
659 let constraint_type = match bound_type {
662 BoundType::Lower => ConstraintType::Hard,
663 BoundType::Upper => ConstraintType::Soft,
664 };
665
666 fixpoint(|changed| {
669 self.propagate_bounds_downstream_inner(bound_type, constraint_type, changed);
670
671 if bound_type == BoundType::Upper {
674 self.propagate_bounds_upstream_inner(
675 BoundType::Upper,
676 ConstraintType::Hard,
677 changed,
678 );
679 }
680 });
681 }
682
683 fn propagate_bounds_downstream_inner(
684 &self,
685 bound_type: BoundType,
686 constraint_type: ConstraintType,
687 changed: &mut bool,
688 ) {
689 for (id, collection) in &self.collections {
690 for input_id in &collection.compute_inputs {
691 let input_collection = self.expect_collection(*input_id);
692 let bounds = input_collection.bounds.borrow();
693 let constraint = Constraint {
694 type_: constraint_type,
695 bound_type,
696 frontier: bounds.get(bound_type),
697 reason: &format!("upstream {input_id} {bound_type} as-of bound"),
698 };
699 *changed |= self.apply_constraint(*id, constraint);
700 }
701 }
702 }
703
704 fn propagate_bounds_upstream(&self, bound_type: BoundType) {
706 let constraint_type = match bound_type {
709 BoundType::Lower => ConstraintType::Soft,
710 BoundType::Upper => ConstraintType::Hard,
711 };
712
713 fixpoint(|changed| {
716 self.propagate_bounds_upstream_inner(bound_type, constraint_type, changed);
717
718 if bound_type == BoundType::Lower {
721 self.propagate_bounds_downstream_inner(
722 BoundType::Lower,
723 ConstraintType::Hard,
724 changed,
725 );
726 }
727 });
728 }
729
730 fn propagate_bounds_upstream_inner(
731 &self,
732 bound_type: BoundType,
733 constraint_type: ConstraintType,
734 changed: &mut bool,
735 ) {
736 for (id, collection) in self.collections.iter().rev() {
737 let bounds = collection.bounds.borrow();
738 for input_id in &collection.compute_inputs {
739 let constraint = Constraint {
740 type_: constraint_type,
741 bound_type,
742 frontier: bounds.get(bound_type),
743 reason: &format!("downstream {id} {bound_type} as-of bound"),
744 };
745 *changed |= self.apply_constraint(*input_id, constraint);
746 }
747 }
748 }
749
750 fn best_as_of(&self, id: GlobalId) -> Antichain<T> {
757 if let Some(collection) = self.collections.get(&id) {
758 let bounds = collection.bounds.borrow();
759 bounds.upper.clone()
760 } else {
761 Antichain::new()
762 }
763 }
764
765 fn prune_sealed_persist_sinks(&mut self) {
773 self.collections.retain(|id, _| {
774 self.storage_collections
775 .collection_frontiers(*id)
776 .map_or(true, |f| !f.write_frontier.is_empty())
777 });
778 }
779
780 fn prune_dropped_collections(&mut self) {
787 let mut pruned = BTreeSet::new();
789 self.collections.retain(|id, c| {
790 let input_dropped = c.storage_inputs.iter().any(|id| {
791 let frontiers = self
792 .storage_collections
793 .collection_frontiers(*id)
794 .expect("storage collection exists");
795 frontiers.read_capabilities.is_empty()
796 });
797
798 if input_dropped {
799 pruned.insert(*id);
800 false
801 } else {
802 true
803 }
804 });
805
806 warn!(?pruned, "pruned dependants of dropped storage collections");
807
808 while !pruned.is_empty() {
810 let pruned_inputs = std::mem::take(&mut pruned);
811
812 self.collections.retain(|id, c| {
813 if c.compute_inputs.iter().any(|id| pruned_inputs.contains(id)) {
814 pruned.insert(*id);
815 false
816 } else {
817 true
818 }
819 });
820
821 warn!(?pruned, "pruned collections with pruned inputs");
822 }
823 }
824}
825
826fn fixpoint(mut step: impl FnMut(&mut bool)) {
828 loop {
829 let mut changed = false;
830 step(&mut changed);
831 if !changed {
832 break;
833 }
834 }
835}
836
837fn step_back_frontier<T: TimestampManipulation>(frontier: &Antichain<T>) -> Antichain<T> {
842 frontier
843 .iter()
844 .map(|t| t.step_back().unwrap_or_else(T::minimum))
845 .collect()
846}
847
848#[cfg(test)]
849mod tests {
850 use std::collections::BTreeSet;
851
852 use async_trait::async_trait;
853 use differential_dataflow::lattice::Lattice;
854 use futures::future::BoxFuture;
855 use futures::stream::BoxStream;
856 use mz_compute_types::dataflows::{IndexDesc, IndexImport};
857 use mz_compute_types::sinks::ComputeSinkConnection;
858 use mz_compute_types::sinks::ComputeSinkDesc;
859 use mz_compute_types::sinks::MaterializedViewSinkConnection;
860 use mz_compute_types::sources::SourceInstanceArguments;
861 use mz_compute_types::sources::SourceInstanceDesc;
862 use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
863 use mz_persist_types::{Codec64, ShardId};
864 use mz_repr::Timestamp;
865 use mz_repr::{RelationDesc, RelationVersion, Row, SqlRelationType};
866 use mz_storage_client::client::TimestamplessUpdateBuilder;
867 use mz_storage_client::controller::{CollectionDescription, StorageMetadata, StorageTxn};
868 use mz_storage_client::storage_collections::{CollectionFrontiers, SnapshotCursor};
869 use mz_storage_types::StorageDiff;
870 use mz_storage_types::connections::inline::InlinedConnection;
871 use mz_storage_types::controller::{CollectionMetadata, StorageError};
872 use mz_storage_types::errors::CollectionMissing;
873 use mz_storage_types::parameters::StorageParameters;
874 use mz_storage_types::sources::{GenericSourceConnection, SourceDesc};
875 use mz_storage_types::sources::{SourceData, SourceExportDataConfig};
876 use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
877 use timely::progress::Timestamp as TimelyTimestamp;
878
879 use super::*;
880
881 const SEALED: u64 = 0x5ea1ed;
882
883 fn ts_to_frontier(ts: u64) -> Antichain<Timestamp> {
884 if ts == SEALED {
885 Antichain::new()
886 } else {
887 Antichain::from_elem(ts.into())
888 }
889 }
890
891 #[derive(Debug)]
892 struct StorageFrontiers(BTreeMap<GlobalId, (Antichain<Timestamp>, Antichain<Timestamp>)>);
893
894 #[async_trait]
895 impl StorageCollections for StorageFrontiers {
896 type Timestamp = Timestamp;
897
898 async fn initialize_state(
899 &self,
900 _txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
901 _init_ids: BTreeSet<GlobalId>,
902 ) -> Result<(), StorageError<Self::Timestamp>> {
903 unimplemented!()
904 }
905
906 fn update_parameters(&self, _config_params: StorageParameters) {
907 unimplemented!()
908 }
909
910 fn collection_metadata(
911 &self,
912 _id: GlobalId,
913 ) -> Result<CollectionMetadata, CollectionMissing> {
914 unimplemented!()
915 }
916
917 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
918 unimplemented!()
919 }
920
921 fn collections_frontiers(
922 &self,
923 ids: Vec<GlobalId>,
924 ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, CollectionMissing> {
925 let mut frontiers = Vec::with_capacity(ids.len());
926 for id in ids {
927 let (read, write) = self.0.get(&id).ok_or(CollectionMissing(id))?;
928 frontiers.push(CollectionFrontiers {
929 id,
930 write_frontier: write.clone(),
931 implied_capability: read.clone(),
932 read_capabilities: read.clone(),
933 })
934 }
935 Ok(frontiers)
936 }
937
938 fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
939 unimplemented!()
940 }
941
942 fn check_exists(&self, _id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
943 unimplemented!()
944 }
945
946 async fn snapshot_stats(
947 &self,
948 _id: GlobalId,
949 _as_of: Antichain<Self::Timestamp>,
950 ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
951 unimplemented!()
952 }
953
954 async fn snapshot_parts_stats(
955 &self,
956 _id: GlobalId,
957 _as_of: Antichain<Self::Timestamp>,
958 ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
959 unimplemented!()
960 }
961
962 fn snapshot(
963 &self,
964 _id: GlobalId,
965 _as_of: Self::Timestamp,
966 ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>
967 {
968 unimplemented!()
969 }
970
971 async fn snapshot_latest(
972 &self,
973 _id: GlobalId,
974 ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
975 unimplemented!()
976 }
977
978 fn snapshot_cursor(
979 &self,
980 _id: GlobalId,
981 _as_of: Self::Timestamp,
982 ) -> BoxFuture<
983 'static,
984 Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>,
985 >
986 where
987 Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
988 {
989 unimplemented!()
990 }
991
992 fn snapshot_and_stream(
993 &self,
994 _id: GlobalId,
995 _as_of: Self::Timestamp,
996 ) -> BoxFuture<
997 'static,
998 Result<
999 BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
1000 StorageError<Self::Timestamp>,
1001 >,
1002 > {
1003 unimplemented!()
1004 }
1005
1006 fn create_update_builder(
1009 &self,
1010 _id: GlobalId,
1011 ) -> BoxFuture<
1012 'static,
1013 Result<
1014 TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
1015 StorageError<Self::Timestamp>,
1016 >,
1017 >
1018 where
1019 Self::Timestamp: Lattice + Codec64,
1020 {
1021 unimplemented!()
1022 }
1023
1024 async fn prepare_state(
1025 &self,
1026 _txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
1027 _ids_to_add: BTreeSet<GlobalId>,
1028 _ids_to_drop: BTreeSet<GlobalId>,
1029 _ids_to_register: BTreeMap<GlobalId, ShardId>,
1030 ) -> Result<(), StorageError<Self::Timestamp>> {
1031 unimplemented!()
1032 }
1033
1034 async fn create_collections_for_bootstrap(
1035 &self,
1036 _storage_metadata: &StorageMetadata,
1037 _register_ts: Option<Self::Timestamp>,
1038 _collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
1039 _migrated_storage_collections: &BTreeSet<GlobalId>,
1040 ) -> Result<(), StorageError<Self::Timestamp>> {
1041 unimplemented!()
1042 }
1043
1044 async fn alter_ingestion_source_desc(
1045 &self,
1046 _ingestion_id: GlobalId,
1047 _source_desc: SourceDesc,
1048 ) -> Result<(), StorageError<Self::Timestamp>> {
1049 unimplemented!()
1050 }
1051
1052 async fn alter_ingestion_export_data_configs(
1053 &self,
1054 _source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
1055 ) -> Result<(), StorageError<Self::Timestamp>> {
1056 unimplemented!()
1057 }
1058
1059 async fn alter_ingestion_connections(
1060 &self,
1061 _source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
1062 ) -> Result<(), StorageError<Self::Timestamp>> {
1063 unimplemented!()
1064 }
1065
1066 async fn alter_table_desc(
1067 &self,
1068 _existing_collection: GlobalId,
1069 _new_collection: GlobalId,
1070 _new_desc: RelationDesc,
1071 _expected_version: RelationVersion,
1072 ) -> Result<(), StorageError<Self::Timestamp>> {
1073 unimplemented!()
1074 }
1075
1076 fn drop_collections_unvalidated(
1077 &self,
1078 _storage_metadata: &StorageMetadata,
1079 _identifiers: Vec<GlobalId>,
1080 ) {
1081 unimplemented!()
1082 }
1083
1084 fn set_read_policies(&self, _policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
1085 unimplemented!()
1086 }
1087
1088 fn acquire_read_holds(
1089 &self,
1090 desired_holds: Vec<GlobalId>,
1091 ) -> Result<Vec<ReadHold<Self::Timestamp>>, CollectionMissing> {
1092 let mut holds = Vec::with_capacity(desired_holds.len());
1093 for id in desired_holds {
1094 let (read, _write) = self.0.get(&id).ok_or(CollectionMissing(id))?;
1095 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
1096 holds.push(ReadHold::with_channel(id, read.clone(), tx));
1097 }
1098 Ok(holds)
1099 }
1100
1101 fn determine_time_dependence(
1102 &self,
1103 _id: GlobalId,
1104 ) -> Result<Option<TimeDependence>, TimeDependenceError> {
1105 unimplemented!()
1106 }
1107 }
1108
1109 fn dataflow(
1110 export_id: &str,
1111 input_ids: &[&str],
1112 storage_ids: &BTreeSet<&str>,
1113 ) -> DataflowDescription<Plan> {
1114 let source_imports = input_ids
1115 .iter()
1116 .filter(|s| storage_ids.contains(*s))
1117 .map(|s| {
1118 let id = s.parse().unwrap();
1119 let desc = SourceInstanceDesc {
1120 arguments: SourceInstanceArguments {
1121 operators: Default::default(),
1122 },
1123 storage_metadata: Default::default(),
1124 typ: SqlRelationType::empty(),
1125 };
1126 (id, (desc, Default::default(), Default::default()))
1127 })
1128 .collect();
1129 let index_imports = input_ids
1130 .iter()
1131 .filter(|s| !storage_ids.contains(*s))
1132 .map(|s| {
1133 let id = s.parse().unwrap();
1134 let import = IndexImport {
1135 desc: IndexDesc {
1136 on_id: GlobalId::Transient(0),
1137 key: Default::default(),
1138 },
1139 typ: SqlRelationType::empty(),
1140 monotonic: Default::default(),
1141 };
1142 (id, import)
1143 })
1144 .collect();
1145 let index_exports = std::iter::once(export_id)
1146 .filter(|s| !storage_ids.contains(*s))
1147 .map(|sid| {
1148 let id = sid.parse().unwrap();
1149 let desc = IndexDesc {
1150 on_id: GlobalId::Transient(0),
1151 key: Default::default(),
1152 };
1153 let typ = SqlRelationType::empty();
1154 (id, (desc, typ))
1155 })
1156 .collect();
1157 let sink_exports = std::iter::once(export_id)
1158 .filter(|s| storage_ids.contains(*s))
1159 .map(|sid| {
1160 let id = sid.parse().unwrap();
1161 let desc = ComputeSinkDesc {
1162 from: GlobalId::Transient(0),
1163 from_desc: RelationDesc::empty(),
1164 connection: ComputeSinkConnection::MaterializedView(
1165 MaterializedViewSinkConnection {
1166 value_desc: RelationDesc::empty(),
1167 storage_metadata: Default::default(),
1168 },
1169 ),
1170 with_snapshot: Default::default(),
1171 up_to: Default::default(),
1172 non_null_assertions: Default::default(),
1173 refresh_schedule: Default::default(),
1174 };
1175 (id, desc)
1176 })
1177 .collect();
1178
1179 DataflowDescription {
1180 source_imports,
1181 index_imports,
1182 objects_to_build: Default::default(),
1183 index_exports,
1184 sink_exports,
1185 as_of: None,
1186 until: Default::default(),
1187 initial_storage_as_of: Default::default(),
1188 refresh_schedule: Default::default(),
1189 debug_name: Default::default(),
1190 time_dependence: None,
1191 }
1192 }
1193
1194 macro_rules! testcase {
1195 ($name:ident, {
1196 storage: { $( $storage_id:literal: ($read:expr, $write:expr), )* },
1197 dataflows: [ $( $export_id:literal <- $inputs:expr => $as_of:expr, )* ],
1198 current_time: $current_time:literal,
1199 $( read_policies: { $( $policy_id:literal: $policy:expr, )* }, )?
1200 $( read_only: $read_only:expr, )?
1201 }) => {
1202 #[mz_ore::test]
1203 fn $name() {
1204 let storage_ids = [$( $storage_id, )*].into();
1205
1206 let storage_frontiers = StorageFrontiers(BTreeMap::from([
1207 $(
1208 (
1209 $storage_id.parse().unwrap(),
1210 (ts_to_frontier($read), ts_to_frontier($write)),
1211 ),
1212 )*
1213 ]));
1214
1215 let mut dataflows = [
1216 $(
1217 dataflow($export_id, &$inputs, &storage_ids),
1218 )*
1219 ];
1220
1221 let read_policies = BTreeMap::from([
1222 $($( ($policy_id.parse().unwrap(), $policy), )*)?
1223 ]);
1224
1225 #[allow(unused_variables)]
1226 let read_only = false;
1227 $( let read_only = $read_only; )?
1228
1229 super::run(
1230 &mut dataflows,
1231 &read_policies,
1232 &storage_frontiers,
1233 $current_time.into(),
1234 read_only,
1235 );
1236
1237 let actual_as_ofs: Vec<_> = dataflows
1238 .into_iter()
1239 .map(|d| d.as_of.unwrap())
1240 .collect();
1241 let expected_as_ofs = [ $( ts_to_frontier($as_of), )* ];
1242
1243 assert_eq!(actual_as_ofs, expected_as_ofs);
1244 }
1245 };
1246 }
1247
1248 testcase!(upstream_storage_constraints, {
1249 storage: {
1250 "s1": (10, 20),
1251 "s2": (20, 30),
1252 },
1253 dataflows: [
1254 "u1" <- ["s1"] => 10,
1255 "u2" <- ["s2"] => 20,
1256 "u3" <- ["s1", "s2"] => 20,
1257 "u4" <- ["u1", "u2"] => 20,
1258 ],
1259 current_time: 0,
1260 });
1261
1262 testcase!(downstream_storage_constraints, {
1263 storage: {
1264 "s1": (10, 20),
1265 "u3": (10, 15),
1266 "u4": (10, 13),
1267 },
1268 dataflows: [
1269 "u1" <- ["s1"] => 19,
1270 "u2" <- ["s1"] => 12,
1271 "u3" <- ["u2"] => 14,
1272 "u4" <- ["u2"] => 12,
1273 ],
1274 current_time: 100,
1275 });
1276
1277 testcase!(warmup_constraints, {
1278 storage: {
1279 "s1": (10, 20),
1280 "s2": (10, 30),
1281 "s3": (10, 40),
1282 "s4": (10, 50),
1283 },
1284 dataflows: [
1285 "u1" <- ["s1"] => 19,
1286 "u2" <- ["s2"] => 19,
1287 "u3" <- ["s3"] => 39,
1288 "u4" <- ["s4"] => 39,
1289 "u5" <- ["u1", "u2"] => 19,
1290 "u6" <- ["u3", "u4"] => 39,
1291 ],
1292 current_time: 100,
1293 });
1294
1295 testcase!(index_read_policy_constraints, {
1296 storage: {
1297 "s1": (10, 20),
1298 "u6": (10, 18),
1299 },
1300 dataflows: [
1301 "u1" <- ["s1"] => 15,
1302 "u2" <- ["s1"] => 10,
1303 "u3" <- ["s1"] => 13,
1304 "u4" <- ["s1"] => 10,
1305 "u5" <- [] => 95,
1306 "u6" <- ["s1"] => 17,
1307 ],
1308 current_time: 100,
1309 read_policies: {
1310 "u1": ReadPolicy::lag_writes_by(5.into(), 1.into()),
1311 "u2": ReadPolicy::lag_writes_by(15.into(), 1.into()),
1312 "u3": ReadPolicy::ValidFrom(Antichain::from_elem(13.into())),
1313 "u4": ReadPolicy::ValidFrom(Antichain::from_elem(5.into())),
1314 "u5": ReadPolicy::lag_writes_by(5.into(), 1.into()),
1315 "u6": ReadPolicy::ValidFrom(Antichain::from_elem(13.into())),
1316 },
1317 });
1318
1319 testcase!(index_current_time_constraints, {
1320 storage: {
1321 "s1": (10, 20),
1322 "s2": (20, 30),
1323 "u4": (10, 12),
1324 "u5": (10, 18),
1325 },
1326 dataflows: [
1327 "u1" <- ["s1"] => 15,
1328 "u2" <- ["s2"] => 20,
1329 "u3" <- ["s1"] => 11,
1330 "u4" <- ["u3"] => 11,
1331 "u5" <- ["s1"] => 17,
1332 "u6" <- [] => 15,
1333 ],
1334 current_time: 15,
1335 });
1336
1337 testcase!(sealed_storage_sink, {
1338 storage: {
1339 "s1": (10, 20),
1340 "u1": (10, SEALED),
1341 },
1342 dataflows: [
1343 "u1" <- ["s1"] => SEALED,
1344 ],
1345 current_time: 100,
1346 });
1347
1348 testcase!(read_only_dropped_storage_inputs, {
1349 storage: {
1350 "s1": (10, 20),
1351 "s2": (SEALED, SEALED),
1352 "u4": (10, 20),
1353 },
1354 dataflows: [
1355 "u1" <- ["s1"] => 15,
1356 "u2" <- ["s2"] => SEALED,
1357 "u3" <- ["s1", "s2"] => SEALED,
1358 "u4" <- ["u2"] => SEALED,
1359 ],
1360 current_time: 15,
1361 read_only: true,
1362 });
1363
1364 testcase!(github_9273, {
1366 storage: {
1367 "s1": (10, 20),
1368 "u3": (14, 15),
1369 },
1370 dataflows: [
1371 "u1" <- ["s1"] => 14,
1372 "u2" <- ["u1"] => 19,
1373 "u3" <- ["u1"] => 14,
1374 ],
1375 current_time: 100,
1376 read_policies: {
1377 "u1": ReadPolicy::lag_writes_by(1.into(), 1.into()),
1378 "u2": ReadPolicy::lag_writes_by(1.into(), 1.into()),
1379 },
1380 });
1381}