1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14
15use mz_expr::{CollectionPlan, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr};
16use mz_ore::soft_assert_or_log;
17use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
18use mz_repr::refresh_schedule::RefreshSchedule;
19use mz_repr::{GlobalId, RelationType};
20use mz_storage_types::controller::CollectionMetadata;
21use mz_storage_types::time_dependence::TimeDependence;
22use proptest::prelude::{Arbitrary, any};
23use proptest::strategy::{BoxedStrategy, Strategy};
24use proptest_derive::Arbitrary;
25use serde::{Deserialize, Serialize};
26use timely::progress::Antichain;
27
28use crate::dataflows::proto_dataflow_description::{
29 ProtoIndexExport, ProtoIndexImport, ProtoSinkExport, ProtoSourceImport,
30};
31use crate::plan::Plan;
32use crate::plan::render_plan::RenderPlan;
33use crate::sinks::{ComputeSinkConnection, ComputeSinkDesc};
34use crate::sources::{SourceInstanceArguments, SourceInstanceDesc};
35
36include!(concat!(env!("OUT_DIR"), "/mz_compute_types.dataflows.rs"));
37
38#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
40pub struct DataflowDescription<P, S: 'static = (), T = mz_repr::Timestamp> {
41 pub source_imports: BTreeMap<GlobalId, (SourceInstanceDesc<S>, bool, Antichain<T>)>,
43 pub index_imports: BTreeMap<GlobalId, IndexImport>,
46 pub objects_to_build: Vec<BuildDesc<P>>,
50 pub index_exports: BTreeMap<GlobalId, (IndexDesc, RelationType)>,
53 pub sink_exports: BTreeMap<GlobalId, ComputeSinkDesc<S, T>>,
56 pub as_of: Option<Antichain<T>>,
62 pub until: Antichain<T>,
69 pub initial_storage_as_of: Option<Antichain<T>>,
72 pub refresh_schedule: Option<RefreshSchedule>,
74 pub debug_name: String,
76 pub time_dependence: Option<TimeDependence>,
78}
79
80impl<P, S> DataflowDescription<P, S, mz_repr::Timestamp> {
81 pub fn is_single_time(&self) -> bool {
86 let until = &self.until;
90
91 let Some(as_of) = self.as_of.as_ref() else {
93 return false;
94 };
95 soft_assert_or_log!(
97 timely::PartialOrder::less_equal(as_of, until),
98 "expected empty `as_of ≤ until`, got `{as_of:?} ≰ {until:?}`",
99 );
100 let Some(as_of) = as_of.as_option() else {
102 return false;
103 };
104 soft_assert_or_log!(
106 as_of != &mz_repr::Timestamp::MAX || until.is_empty(),
107 "expected `until = {{}}` due to `as_of = MAX`, got `until = {until:?}`",
108 );
109 as_of.try_step_forward().as_ref() == until.as_option()
112 }
113}
114
115impl<T> DataflowDescription<Plan<T>, (), mz_repr::Timestamp> {
116 pub fn check_invariants(&self) -> Result<(), String> {
118 let mut plans: Vec<_> = self.objects_to_build.iter().map(|o| &o.plan).collect();
119 let mut lir_ids = BTreeSet::new();
120
121 while let Some(plan) = plans.pop() {
122 let lir_id = plan.lir_id;
123 if !lir_ids.insert(lir_id) {
124 return Err(format!(
125 "duplicate `LirId` in `DataflowDescription`: {lir_id}"
126 ));
127 }
128 plans.extend(plan.node.children());
129 }
130
131 Ok(())
132 }
133}
134
135impl<T> DataflowDescription<OptimizedMirRelationExpr, (), T> {
136 pub fn new(name: String) -> Self {
138 Self {
139 source_imports: Default::default(),
140 index_imports: Default::default(),
141 objects_to_build: Vec::new(),
142 index_exports: Default::default(),
143 sink_exports: Default::default(),
144 as_of: Default::default(),
145 until: Antichain::new(),
146 initial_storage_as_of: None,
147 refresh_schedule: None,
148 debug_name: name,
149 time_dependence: None,
150 }
151 }
152
153 pub fn import_index(
159 &mut self,
160 id: GlobalId,
161 desc: IndexDesc,
162 typ: RelationType,
163 monotonic: bool,
164 ) {
165 self.index_imports.insert(
166 id,
167 IndexImport {
168 desc,
169 typ,
170 monotonic,
171 },
172 );
173 }
174
175 pub fn import_source(&mut self, id: GlobalId, typ: RelationType, monotonic: bool) {
177 self.source_imports.insert(
180 id,
181 (
182 SourceInstanceDesc {
183 storage_metadata: (),
184 arguments: SourceInstanceArguments { operators: None },
185 typ,
186 },
187 monotonic,
188 Antichain::new(),
189 ),
190 );
191 }
192
193 pub fn insert_plan(&mut self, id: GlobalId, plan: OptimizedMirRelationExpr) {
195 self.objects_to_build.push(BuildDesc { id, plan });
196 }
197
198 pub fn export_index(&mut self, id: GlobalId, description: IndexDesc, on_type: RelationType) {
203 self.insert_plan(
206 id,
207 OptimizedMirRelationExpr::declare_optimized(MirRelationExpr::ArrangeBy {
208 input: Box::new(MirRelationExpr::global_get(
209 description.on_id,
210 on_type.clone(),
211 )),
212 keys: vec![description.key.clone()],
213 }),
214 );
215 self.index_exports.insert(id, (description, on_type));
216 }
217
218 pub fn export_sink(&mut self, id: GlobalId, description: ComputeSinkDesc<(), T>) {
220 self.sink_exports.insert(id, description);
221 }
222
223 pub fn is_imported(&self, id: &GlobalId) -> bool {
225 self.objects_to_build.iter().any(|bd| &bd.id == id)
226 || self.index_imports.keys().any(|i| i == id)
227 || self.source_imports.keys().any(|i| i == id)
228 }
229
230 pub fn arity_of(&self, id: &GlobalId) -> usize {
232 for (source_id, (source, _monotonic, _upper)) in self.source_imports.iter() {
233 if source_id == id {
234 return source.typ.arity();
235 }
236 }
237 for IndexImport { desc, typ, .. } in self.index_imports.values() {
238 if &desc.on_id == id {
239 return typ.arity();
240 }
241 }
242 for desc in self.objects_to_build.iter() {
243 if &desc.id == id {
244 return desc.plan.arity();
245 }
246 }
247 panic!("GlobalId {} not found in DataflowDesc", id);
248 }
249
250 pub fn visit_children<R, S, E>(&mut self, r: R, s: S) -> Result<(), E>
252 where
253 R: Fn(&mut OptimizedMirRelationExpr) -> Result<(), E>,
254 S: Fn(&mut MirScalarExpr) -> Result<(), E>,
255 {
256 for BuildDesc { plan, .. } in &mut self.objects_to_build {
257 r(plan)?;
258 }
259 for (source_instance_desc, _, _upper) in self.source_imports.values_mut() {
260 let Some(mfp) = source_instance_desc.arguments.operators.as_mut() else {
261 continue;
262 };
263 for expr in mfp.expressions.iter_mut() {
264 s(expr)?;
265 }
266 for (_, expr) in mfp.predicates.iter_mut() {
267 s(expr)?;
268 }
269 }
270 Ok(())
271 }
272}
273
274impl<P, S, T> DataflowDescription<P, S, T> {
275 pub fn set_as_of(&mut self, as_of: Antichain<T>) {
299 self.as_of = Some(as_of);
300 }
301
302 pub fn set_initial_as_of(&mut self, initial_as_of: Antichain<T>) {
304 self.initial_storage_as_of = Some(initial_as_of);
305 }
306
307 pub fn import_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
309 self.imported_index_ids().chain(self.imported_source_ids())
310 }
311
312 pub fn imported_index_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
314 self.index_imports.keys().copied()
315 }
316
317 pub fn imported_source_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
319 self.source_imports.keys().copied()
320 }
321
322 pub fn export_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
324 self.exported_index_ids().chain(self.exported_sink_ids())
325 }
326
327 pub fn exported_index_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
329 self.index_exports.keys().copied()
330 }
331
332 pub fn exported_sink_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
334 self.sink_exports.keys().copied()
335 }
336
337 pub fn persist_sink_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
339 self.sink_exports
340 .iter()
341 .filter_map(|(id, desc)| match desc.connection {
342 ComputeSinkConnection::MaterializedView(_) => Some(*id),
343 ComputeSinkConnection::ContinualTask(_) => Some(*id),
344 _ => None,
345 })
346 }
347
348 pub fn subscribe_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
350 self.sink_exports
351 .iter()
352 .filter_map(|(id, desc)| match desc.connection {
353 ComputeSinkConnection::Subscribe(_) => Some(*id),
354 _ => None,
355 })
356 }
357
358 pub fn continual_task_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
360 self.sink_exports
361 .iter()
362 .filter_map(|(id, desc)| match desc.connection {
363 ComputeSinkConnection::ContinualTask(_) => Some(*id),
364 _ => None,
365 })
366 }
367
368 pub fn copy_to_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
370 self.sink_exports
371 .iter()
372 .filter_map(|(id, desc)| match desc.connection {
373 ComputeSinkConnection::CopyToS3Oneshot(_) => Some(*id),
374 _ => None,
375 })
376 }
377
378 pub fn display_import_ids(&self) -> impl fmt::Display + '_ {
380 use mz_ore::str::{bracketed, separated};
381 bracketed("[", "]", separated(", ", self.import_ids()))
382 }
383
384 pub fn display_export_ids(&self) -> impl fmt::Display + '_ {
386 use mz_ore::str::{bracketed, separated};
387 bracketed("[", "]", separated(", ", self.export_ids()))
388 }
389
390 pub fn is_transient(&self) -> bool {
392 self.export_ids().all(|id| id.is_transient())
393 }
394
395 pub fn build_desc(&self, id: GlobalId) -> &BuildDesc<P> {
402 let mut builds = self.objects_to_build.iter().filter(|build| build.id == id);
403 let build = builds
404 .next()
405 .unwrap_or_else(|| panic!("object to build id {id} unexpectedly missing"));
406 assert!(builds.next().is_none());
407 build
408 }
409}
410
411impl<P, S, T> DataflowDescription<P, S, T>
412where
413 P: CollectionPlan,
414{
415 pub fn depends_on(&self, collection_id: GlobalId) -> BTreeSet<GlobalId> {
425 let mut out = BTreeSet::new();
426 self.depends_on_into(collection_id, &mut out);
427 out
428 }
429
430 pub fn depends_on_into(&self, collection_id: GlobalId, out: &mut BTreeSet<GlobalId>) {
432 out.insert(collection_id);
433 if self.source_imports.contains_key(&collection_id) {
434 out.insert(collection_id);
437 return;
438 }
439
440 let mut found_index = false;
444 for (index_id, IndexImport { desc, .. }) in &self.index_imports {
445 if desc.on_id == collection_id {
446 out.insert(*index_id);
449 found_index = true;
450 }
451 }
452 if found_index {
453 return;
454 }
455
456 let build = self.build_desc(collection_id);
459 for id in build.plan.depends_on() {
460 if !out.contains(&id) {
461 self.depends_on_into(id, out)
462 }
463 }
464 }
465
466 pub fn depends_on_imports(&self, collection_id: GlobalId) -> BTreeSet<GlobalId> {
471 let is_import = |id: &GlobalId| {
472 self.source_imports.contains_key(id) || self.index_imports.contains_key(id)
473 };
474
475 let deps = self.depends_on(collection_id);
476 deps.into_iter().filter(is_import).collect()
477 }
478}
479
480impl<S, T> DataflowDescription<RenderPlan, S, T>
481where
482 S: Clone + PartialEq,
483 T: Clone + timely::PartialOrder,
484{
485 pub fn compatible_with(&self, other: &Self) -> bool {
496 let old = self.as_comparable();
497 let new = other.as_comparable();
498
499 let equality = old.index_exports == new.index_exports
500 && old.sink_exports == new.sink_exports
501 && old.objects_to_build == new.objects_to_build
502 && old.index_imports == new.index_imports
503 && old.source_imports == new.source_imports
504 && old.time_dependence == new.time_dependence;
505
506 let partial = if let (Some(old_as_of), Some(new_as_of)) = (&old.as_of, &new.as_of) {
507 timely::PartialOrder::less_equal(old_as_of, new_as_of)
508 } else {
509 false
510 };
511
512 equality && partial
513 }
514
515 fn as_comparable(&self) -> Self {
522 let external_ids: BTreeSet<_> = self.import_ids().chain(self.export_ids()).collect();
523
524 let mut id_counter = 0;
525 let mut replacements = BTreeMap::new();
526
527 let mut maybe_replace = |id: GlobalId| {
528 if id.is_transient() && !external_ids.contains(&id) {
529 *replacements.entry(id).or_insert_with(|| {
530 id_counter += 1;
531 GlobalId::Transient(id_counter)
532 })
533 } else {
534 id
535 }
536 };
537
538 let mut source_imports = self.source_imports.clone();
539 for (_source, _monotonic, upper) in source_imports.values_mut() {
540 *upper = Antichain::new();
541 }
542
543 let mut objects_to_build = self.objects_to_build.clone();
544 for object in &mut objects_to_build {
545 object.id = maybe_replace(object.id);
546 object.plan.replace_ids(&mut maybe_replace);
547 }
548
549 let mut index_exports = self.index_exports.clone();
550 for (desc, _typ) in index_exports.values_mut() {
551 desc.on_id = maybe_replace(desc.on_id);
552 }
553
554 let mut sink_exports = self.sink_exports.clone();
555 for desc in sink_exports.values_mut() {
556 desc.from = maybe_replace(desc.from);
557 }
558
559 DataflowDescription {
560 source_imports,
561 index_imports: self.index_imports.clone(),
562 objects_to_build,
563 index_exports,
564 sink_exports,
565 as_of: self.as_of.clone(),
566 until: self.until.clone(),
567 initial_storage_as_of: self.initial_storage_as_of.clone(),
568 refresh_schedule: self.refresh_schedule.clone(),
569 debug_name: self.debug_name.clone(),
570 time_dependence: self.time_dependence.clone(),
571 }
572 }
573}
574
575impl RustType<ProtoDataflowDescription> for DataflowDescription<RenderPlan, CollectionMetadata> {
576 fn into_proto(&self) -> ProtoDataflowDescription {
577 ProtoDataflowDescription {
578 source_imports: self.source_imports.into_proto(),
579 index_imports: self.index_imports.into_proto(),
580 objects_to_build: self.objects_to_build.into_proto(),
581 index_exports: self.index_exports.into_proto(),
582 sink_exports: self.sink_exports.into_proto(),
583 as_of: self.as_of.into_proto(),
584 until: Some(self.until.into_proto()),
585 initial_storage_as_of: self.initial_storage_as_of.into_proto(),
586 refresh_schedule: self.refresh_schedule.into_proto(),
587 debug_name: self.debug_name.clone(),
588 time_dependence: self.time_dependence.into_proto(),
589 }
590 }
591
592 fn from_proto(proto: ProtoDataflowDescription) -> Result<Self, TryFromProtoError> {
593 Ok(DataflowDescription {
594 source_imports: proto.source_imports.into_rust()?,
595 index_imports: proto.index_imports.into_rust()?,
596 objects_to_build: proto.objects_to_build.into_rust()?,
597 index_exports: proto.index_exports.into_rust()?,
598 sink_exports: proto.sink_exports.into_rust()?,
599 as_of: proto.as_of.map(|x| x.into_rust()).transpose()?,
600 until: proto
601 .until
602 .map(|x| x.into_rust())
603 .transpose()?
604 .unwrap_or_else(Antichain::new),
605 initial_storage_as_of: proto
606 .initial_storage_as_of
607 .map(|x| x.into_rust())
608 .transpose()?,
609 refresh_schedule: proto.refresh_schedule.into_rust()?,
610 debug_name: proto.debug_name,
611 time_dependence: proto.time_dependence.into_rust()?,
612 })
613 }
614}
615
616impl
617 ProtoMapEntry<
618 GlobalId,
619 (
620 SourceInstanceDesc<CollectionMetadata>,
621 bool,
622 Antichain<mz_repr::Timestamp>,
623 ),
624 > for ProtoSourceImport
625{
626 fn from_rust<'a>(
627 entry: (
628 &'a GlobalId,
629 &'a (
630 SourceInstanceDesc<CollectionMetadata>,
631 bool,
632 Antichain<mz_repr::Timestamp>,
633 ),
634 ),
635 ) -> Self {
636 ProtoSourceImport {
637 id: Some(entry.0.into_proto()),
638 source_instance_desc: Some((entry.1).0.into_proto()),
639 monotonic: (entry.1).1.into_proto(),
640 upper: Some((entry.1).2.into_proto()),
641 }
642 }
643
644 fn into_rust(
645 self,
646 ) -> Result<
647 (
648 GlobalId,
649 (
650 SourceInstanceDesc<CollectionMetadata>,
651 bool,
652 Antichain<mz_repr::Timestamp>,
653 ),
654 ),
655 TryFromProtoError,
656 > {
657 Ok((
658 self.id.into_rust_if_some("ProtoSourceImport::id")?,
659 (
660 self.source_instance_desc
661 .into_rust_if_some("ProtoSourceImport::source_instance_desc")?,
662 self.monotonic.into_rust()?,
663 self.upper.into_rust_if_some("ProtoSourceImport::upper")?,
664 ),
665 ))
666 }
667}
668
669impl ProtoMapEntry<GlobalId, IndexImport> for ProtoIndexImport {
670 fn from_rust<'a>(
671 (
672 id,
673 IndexImport {
674 desc,
675 typ,
676 monotonic,
677 },
678 ): (&'a GlobalId, &'a IndexImport),
679 ) -> Self {
680 ProtoIndexImport {
681 id: Some(id.into_proto()),
682 index_desc: Some(desc.into_proto()),
683 typ: Some(typ.into_proto()),
684 monotonic: monotonic.into_proto(),
685 }
686 }
687
688 fn into_rust(self) -> Result<(GlobalId, IndexImport), TryFromProtoError> {
689 Ok((
690 self.id.into_rust_if_some("ProtoIndex::id")?,
691 IndexImport {
692 desc: self
693 .index_desc
694 .into_rust_if_some("ProtoIndexImport::index_desc")?,
695 typ: self.typ.into_rust_if_some("ProtoIndexImport::typ")?,
696 monotonic: self.monotonic.into_rust()?,
697 },
698 ))
699 }
700}
701
702impl ProtoMapEntry<GlobalId, (IndexDesc, RelationType)> for ProtoIndexExport {
703 fn from_rust<'a>(
704 (id, (index_desc, typ)): (&'a GlobalId, &'a (IndexDesc, RelationType)),
705 ) -> Self {
706 ProtoIndexExport {
707 id: Some(id.into_proto()),
708 index_desc: Some(index_desc.into_proto()),
709 typ: Some(typ.into_proto()),
710 }
711 }
712
713 fn into_rust(self) -> Result<(GlobalId, (IndexDesc, RelationType)), TryFromProtoError> {
714 Ok((
715 self.id.into_rust_if_some("ProtoIndexExport::id")?,
716 (
717 self.index_desc
718 .into_rust_if_some("ProtoIndexExport::index_desc")?,
719 self.typ.into_rust_if_some("ProtoIndexExport::typ")?,
720 ),
721 ))
722 }
723}
724
725impl ProtoMapEntry<GlobalId, ComputeSinkDesc<CollectionMetadata>> for ProtoSinkExport {
726 fn from_rust<'a>(
727 (id, sink_desc): (&'a GlobalId, &'a ComputeSinkDesc<CollectionMetadata>),
728 ) -> Self {
729 ProtoSinkExport {
730 id: Some(id.into_proto()),
731 sink_desc: Some(sink_desc.into_proto()),
732 }
733 }
734
735 fn into_rust(
736 self,
737 ) -> Result<(GlobalId, ComputeSinkDesc<CollectionMetadata>), TryFromProtoError> {
738 Ok((
739 self.id.into_rust_if_some("ProtoSinkExport::id")?,
740 self.sink_desc
741 .into_rust_if_some("ProtoSinkExport::sink_desc")?,
742 ))
743 }
744}
745
746impl Arbitrary for DataflowDescription<RenderPlan, CollectionMetadata, mz_repr::Timestamp> {
747 type Strategy = BoxedStrategy<Self>;
748 type Parameters = ();
749
750 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
751 any_dataflow_description(any_source_import_collection_metadata()).boxed()
752 }
753}
754
755impl Arbitrary for DataflowDescription<OptimizedMirRelationExpr, (), mz_repr::Timestamp> {
756 type Strategy = BoxedStrategy<Self>;
757 type Parameters = ();
758
759 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
760 any_dataflow_description(any_source_import()).boxed()
761 }
762}
763
764impl Arbitrary for DataflowDescription<Plan, (), mz_repr::Timestamp> {
765 type Strategy = BoxedStrategy<Self>;
766 type Parameters = ();
767
768 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
769 any_dataflow_description(any_source_import()).boxed()
770 }
771}
772
773fn any_dataflow_description<P, S, T>(
774 any_source_import: impl Strategy<Value = (GlobalId, (SourceInstanceDesc<S>, bool, Antichain<T>))>,
775) -> impl Strategy<Value = DataflowDescription<P, S, T>>
776where
777 P: Arbitrary,
778 S: 'static + Arbitrary,
779 T: Arbitrary + timely::PartialOrder,
780 ComputeSinkDesc<S, T>: Arbitrary,
781{
782 (
785 (
786 proptest::collection::vec(any_source_import, 1..3),
787 proptest::collection::vec(any_dataflow_index_import(), 1..3),
788 proptest::collection::vec(any::<BuildDesc<P>>(), 1..3),
789 proptest::collection::vec(any_dataflow_index_export(), 1..3),
790 proptest::collection::vec(any::<(GlobalId, ComputeSinkDesc<S, T>)>(), 1..3),
791 any::<bool>(),
792 proptest::collection::vec(any::<T>(), 1..5),
793 any::<bool>(),
794 proptest::collection::vec(any::<T>(), 1..5),
795 any::<bool>(),
796 any::<RefreshSchedule>(),
797 proptest::string::string_regex(".*").unwrap(),
798 ),
799 any::<Option<TimeDependence>>(),
800 )
801 .prop_map(
802 |(
803 (
804 source_imports,
805 index_imports,
806 objects_to_build,
807 index_exports,
808 sink_descs,
809 as_of_some,
810 as_of,
811 initial_storage_as_of_some,
812 initial_as_of,
813 refresh_schedule_some,
814 refresh_schedule,
815 debug_name,
816 ),
817 time_dependence,
818 )| DataflowDescription {
819 source_imports: BTreeMap::from_iter(source_imports),
820 index_imports: BTreeMap::from_iter(index_imports),
821 objects_to_build,
822 index_exports: BTreeMap::from_iter(index_exports),
823 sink_exports: BTreeMap::from_iter(sink_descs),
824 as_of: if as_of_some {
825 Some(Antichain::from(as_of))
826 } else {
827 None
828 },
829 until: Antichain::new(),
830 initial_storage_as_of: if initial_storage_as_of_some {
831 Some(Antichain::from(initial_as_of))
832 } else {
833 None
834 },
835 refresh_schedule: if refresh_schedule_some {
836 Some(refresh_schedule)
837 } else {
838 None
839 },
840 debug_name,
841 time_dependence,
842 },
843 )
844}
845
846fn any_source_import_collection_metadata() -> impl Strategy<
847 Value = (
848 GlobalId,
849 (
850 SourceInstanceDesc<CollectionMetadata>,
851 bool,
852 Antichain<mz_repr::Timestamp>,
853 ),
854 ),
855> {
856 (
857 any::<GlobalId>(),
858 any::<(SourceInstanceDesc<CollectionMetadata>, bool)>().prop_map(
859 |(source_instance_desc, monotonic)| (source_instance_desc, monotonic, Antichain::new()),
860 ),
861 )
862}
863
864fn any_source_import() -> impl Strategy<
865 Value = (
866 GlobalId,
867 (SourceInstanceDesc<()>, bool, Antichain<mz_repr::Timestamp>),
868 ),
869> {
870 (any::<GlobalId>(), any::<(SourceInstanceDesc<()>, bool)>()).prop_map(
871 |(id, (source_instance_desc, monotonic))| {
872 (id, (source_instance_desc, monotonic, Antichain::new()))
873 },
874 )
875}
876
877proptest::prop_compose! {
878 fn any_dataflow_index_import()(
879 id in any::<GlobalId>(),
880 desc in any::<IndexDesc>(),
881 typ in any::<RelationType>(),
882 monotonic in any::<bool>(),
883 ) -> (GlobalId, IndexImport) {
884 (id, IndexImport {desc, typ, monotonic})
885 }
886}
887
888proptest::prop_compose! {
889 fn any_dataflow_index_export()(
890 id in any::<GlobalId>(),
891 index in any::<IndexDesc>(),
892 typ in any::<RelationType>(),
893 ) -> (GlobalId, (IndexDesc, RelationType)) {
894 (id, (index, typ))
895 }
896}
897
898pub type DataflowDesc = DataflowDescription<OptimizedMirRelationExpr, ()>;
900
901#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
904pub struct IndexDesc {
905 pub on_id: GlobalId,
907 #[proptest(strategy = "proptest::collection::vec(any::<MirScalarExpr>(), 1..3)")]
909 pub key: Vec<MirScalarExpr>,
910}
911
912impl RustType<ProtoIndexDesc> for IndexDesc {
913 fn into_proto(&self) -> ProtoIndexDesc {
914 ProtoIndexDesc {
915 on_id: Some(self.on_id.into_proto()),
916 key: self.key.into_proto(),
917 }
918 }
919
920 fn from_proto(proto: ProtoIndexDesc) -> Result<Self, TryFromProtoError> {
921 Ok(IndexDesc {
922 on_id: proto.on_id.into_rust_if_some("ProtoIndexDesc::on_id")?,
923 key: proto.key.into_rust()?,
924 })
925 }
926}
927
928#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
930pub struct IndexImport {
931 pub desc: IndexDesc,
933 pub typ: RelationType,
935 pub monotonic: bool,
937}
938
939#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
941pub struct BuildDesc<P> {
942 pub id: GlobalId,
944 pub plan: P,
946}
947
948impl RustType<ProtoBuildDesc> for BuildDesc<RenderPlan> {
949 fn into_proto(&self) -> ProtoBuildDesc {
950 ProtoBuildDesc {
951 id: Some(self.id.into_proto()),
952 plan: Some(self.plan.into_proto()),
953 }
954 }
955
956 fn from_proto(x: ProtoBuildDesc) -> Result<Self, TryFromProtoError> {
957 Ok(BuildDesc {
958 id: x.id.into_rust_if_some("ProtoBuildDesc::id")?,
959 plan: x.plan.into_rust_if_some("ProtoBuildDesc::plan")?,
960 })
961 }
962}
963
964#[cfg(test)]
965mod tests {
966 use mz_ore::assert_ok;
967 use mz_proto::protobuf_roundtrip;
968 use proptest::prelude::ProptestConfig;
969 use proptest::proptest;
970
971 use crate::dataflows::DataflowDescription;
972
973 use super::*;
974
975 proptest! {
976 #![proptest_config(ProptestConfig::with_cases(32))]
977
978
979 #[mz_ore::test]
980 fn dataflow_description_protobuf_roundtrip(expect in any::<DataflowDescription<RenderPlan, CollectionMetadata, mz_repr::Timestamp>>()) {
981 let actual = protobuf_roundtrip::<_, ProtoDataflowDescription>(&expect);
982 assert_ok!(actual);
983 assert_eq!(actual.unwrap(), expect);
984 }
985 }
986}