mz_compute_types/
dataflows.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types for describing dataflows.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14
15use mz_expr::{CollectionPlan, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr};
16use mz_ore::soft_assert_or_log;
17use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
18use mz_repr::refresh_schedule::RefreshSchedule;
19use mz_repr::{GlobalId, RelationType};
20use mz_storage_types::controller::CollectionMetadata;
21use mz_storage_types::time_dependence::TimeDependence;
22use proptest::prelude::{Arbitrary, any};
23use proptest::strategy::{BoxedStrategy, Strategy};
24use proptest_derive::Arbitrary;
25use serde::{Deserialize, Serialize};
26use timely::progress::Antichain;
27
28use crate::dataflows::proto_dataflow_description::{
29    ProtoIndexExport, ProtoIndexImport, ProtoSinkExport, ProtoSourceImport,
30};
31use crate::plan::Plan;
32use crate::plan::render_plan::RenderPlan;
33use crate::sinks::{ComputeSinkConnection, ComputeSinkDesc};
34use crate::sources::{SourceInstanceArguments, SourceInstanceDesc};
35
36include!(concat!(env!("OUT_DIR"), "/mz_compute_types.dataflows.rs"));
37
38/// A description of a dataflow to construct and results to surface.
39#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
40pub struct DataflowDescription<P, S: 'static = (), T = mz_repr::Timestamp> {
41    /// Sources instantiations made available to the dataflow pair with monotonicity information.
42    pub source_imports: BTreeMap<GlobalId, (SourceInstanceDesc<S>, bool, Antichain<T>)>,
43    /// Indexes made available to the dataflow.
44    /// (id of index, import)
45    pub index_imports: BTreeMap<GlobalId, IndexImport>,
46    /// Views and indexes to be built and stored in the local context.
47    /// Objects must be built in the specific order, as there may be
48    /// dependencies of later objects on prior identifiers.
49    pub objects_to_build: Vec<BuildDesc<P>>,
50    /// Indexes to be made available to be shared with other dataflows
51    /// (id of new index, description of index, relationtype of base source/view/table)
52    pub index_exports: BTreeMap<GlobalId, (IndexDesc, RelationType)>,
53    /// sinks to be created
54    /// (id of new sink, description of sink)
55    pub sink_exports: BTreeMap<GlobalId, ComputeSinkDesc<S, T>>,
56    /// An optional frontier to which inputs should be advanced.
57    ///
58    /// If this is set, it should override the default setting determined by
59    /// the upper bound of `since` frontiers contributing to the dataflow.
60    /// It is an error for this to be set to a frontier not beyond that default.
61    pub as_of: Option<Antichain<T>>,
62    /// Frontier beyond which the dataflow should not execute.
63    /// Specifically, updates at times greater or equal to this frontier are suppressed.
64    /// This is often set to `as_of + 1` to enable "batch" computations.
65    /// Note that frontier advancements might still happen to times that are after the `until`,
66    /// only data is suppressed. (This is consistent with how frontier advancements can also
67    /// happen before the `as_of`.)
68    pub until: Antichain<T>,
69    /// The initial as_of when the collection is first created. Filled only for materialized views.
70    /// Note that this doesn't change upon restarts.
71    pub initial_storage_as_of: Option<Antichain<T>>,
72    /// The schedule of REFRESH materialized views.
73    pub refresh_schedule: Option<RefreshSchedule>,
74    /// Human-readable name
75    pub debug_name: String,
76    /// Description of how the dataflow's progress relates to wall-clock time. None for unknown.
77    pub time_dependence: Option<TimeDependence>,
78}
79
80impl<P, S> DataflowDescription<P, S, mz_repr::Timestamp> {
81    /// Tests if the dataflow refers to a single timestamp, namely
82    /// that `as_of` has a single coordinate and that the `until`
83    /// value corresponds to the `as_of` value plus one, or `as_of`
84    /// is the maximum timestamp and is thus single.
85    pub fn is_single_time(&self) -> bool {
86        // TODO: this would be much easier to check if `until` was a strict lower bound,
87        // and we would be testing that `until == as_of`.
88
89        let until = &self.until;
90
91        // IF `as_of` is not set at all this can't be a single time dataflow.
92        let Some(as_of) = self.as_of.as_ref() else {
93            return false;
94        };
95        // Ensure that as_of <= until.
96        soft_assert_or_log!(
97            timely::PartialOrder::less_equal(as_of, until),
98            "expected empty `as_of ≤ until`, got `{as_of:?} ≰ {until:?}`",
99        );
100        // IF `as_of` is not a single timestamp this can't be a single time dataflow.
101        let Some(as_of) = as_of.as_option() else {
102            return false;
103        };
104        // Ensure that `as_of = MAX` implies `until.is_empty()`.
105        soft_assert_or_log!(
106            as_of != &mz_repr::Timestamp::MAX || until.is_empty(),
107            "expected `until = {{}}` due to `as_of = MAX`, got `until = {until:?}`",
108        );
109        // Note that the `(as_of = MAX, until = {})` case also returns `true`
110        // here (as expected) since we are going to compare two `None` values.
111        as_of.try_step_forward().as_ref() == until.as_option()
112    }
113}
114
115impl<T> DataflowDescription<Plan<T>, (), mz_repr::Timestamp> {
116    /// Check invariants expected to be true about `DataflowDescription`s.
117    pub fn check_invariants(&self) -> Result<(), String> {
118        let mut plans: Vec<_> = self.objects_to_build.iter().map(|o| &o.plan).collect();
119        let mut lir_ids = BTreeSet::new();
120
121        while let Some(plan) = plans.pop() {
122            let lir_id = plan.lir_id;
123            if !lir_ids.insert(lir_id) {
124                return Err(format!(
125                    "duplicate `LirId` in `DataflowDescription`: {lir_id}"
126                ));
127            }
128            plans.extend(plan.node.children());
129        }
130
131        Ok(())
132    }
133}
134
135impl<T> DataflowDescription<OptimizedMirRelationExpr, (), T> {
136    /// Creates a new dataflow description with a human-readable name.
137    pub fn new(name: String) -> Self {
138        Self {
139            source_imports: Default::default(),
140            index_imports: Default::default(),
141            objects_to_build: Vec::new(),
142            index_exports: Default::default(),
143            sink_exports: Default::default(),
144            as_of: Default::default(),
145            until: Antichain::new(),
146            initial_storage_as_of: None,
147            refresh_schedule: None,
148            debug_name: name,
149            time_dependence: None,
150        }
151    }
152
153    /// Imports a previously exported index.
154    ///
155    /// This method makes available an index previously exported as `id`, identified
156    /// to the query by `description` (which names the view the index arranges, and
157    /// the keys by which it is arranged).
158    pub fn import_index(
159        &mut self,
160        id: GlobalId,
161        desc: IndexDesc,
162        typ: RelationType,
163        monotonic: bool,
164    ) {
165        self.index_imports.insert(
166            id,
167            IndexImport {
168                desc,
169                typ,
170                monotonic,
171            },
172        );
173    }
174
175    /// Imports a source and makes it available as `id`.
176    pub fn import_source(&mut self, id: GlobalId, typ: RelationType, monotonic: bool) {
177        // Import the source with no linear operators applied to it.
178        // They may be populated by whole-dataflow optimization.
179        self.source_imports.insert(
180            id,
181            (
182                SourceInstanceDesc {
183                    storage_metadata: (),
184                    arguments: SourceInstanceArguments { operators: None },
185                    typ,
186                },
187                monotonic,
188                Antichain::new(),
189            ),
190        );
191    }
192
193    /// Binds to `id` the relation expression `plan`.
194    pub fn insert_plan(&mut self, id: GlobalId, plan: OptimizedMirRelationExpr) {
195        self.objects_to_build.push(BuildDesc { id, plan });
196    }
197
198    /// Exports as `id` an index described by `description`.
199    ///
200    /// Future uses of `import_index` in other dataflow descriptions may use `id`,
201    /// as long as this dataflow has not been terminated in the meantime.
202    pub fn export_index(&mut self, id: GlobalId, description: IndexDesc, on_type: RelationType) {
203        // We first create a "view" named `id` that ensures that the
204        // data are correctly arranged and available for export.
205        self.insert_plan(
206            id,
207            OptimizedMirRelationExpr::declare_optimized(MirRelationExpr::ArrangeBy {
208                input: Box::new(MirRelationExpr::global_get(
209                    description.on_id,
210                    on_type.clone(),
211                )),
212                keys: vec![description.key.clone()],
213            }),
214        );
215        self.index_exports.insert(id, (description, on_type));
216    }
217
218    /// Exports as `id` a sink described by `description`.
219    pub fn export_sink(&mut self, id: GlobalId, description: ComputeSinkDesc<(), T>) {
220        self.sink_exports.insert(id, description);
221    }
222
223    /// Returns true iff `id` is already imported.
224    pub fn is_imported(&self, id: &GlobalId) -> bool {
225        self.objects_to_build.iter().any(|bd| &bd.id == id)
226            || self.index_imports.keys().any(|i| i == id)
227            || self.source_imports.keys().any(|i| i == id)
228    }
229
230    /// The number of columns associated with an identifier in the dataflow.
231    pub fn arity_of(&self, id: &GlobalId) -> usize {
232        for (source_id, (source, _monotonic, _upper)) in self.source_imports.iter() {
233            if source_id == id {
234                return source.typ.arity();
235            }
236        }
237        for IndexImport { desc, typ, .. } in self.index_imports.values() {
238            if &desc.on_id == id {
239                return typ.arity();
240            }
241        }
242        for desc in self.objects_to_build.iter() {
243            if &desc.id == id {
244                return desc.plan.arity();
245            }
246        }
247        panic!("GlobalId {} not found in DataflowDesc", id);
248    }
249
250    /// Calls r and s on any sub-members of those types in self. Halts at the first error return.
251    pub fn visit_children<R, S, E>(&mut self, r: R, s: S) -> Result<(), E>
252    where
253        R: Fn(&mut OptimizedMirRelationExpr) -> Result<(), E>,
254        S: Fn(&mut MirScalarExpr) -> Result<(), E>,
255    {
256        for BuildDesc { plan, .. } in &mut self.objects_to_build {
257            r(plan)?;
258        }
259        for (source_instance_desc, _, _upper) in self.source_imports.values_mut() {
260            let Some(mfp) = source_instance_desc.arguments.operators.as_mut() else {
261                continue;
262            };
263            for expr in mfp.expressions.iter_mut() {
264                s(expr)?;
265            }
266            for (_, expr) in mfp.predicates.iter_mut() {
267                s(expr)?;
268            }
269        }
270        Ok(())
271    }
272}
273
274impl<P, S, T> DataflowDescription<P, S, T> {
275    /// Sets the `as_of` frontier to the supplied argument.
276    ///
277    /// This method allows the dataflow to indicate a frontier up through
278    /// which all times should be advanced. This can be done for at least
279    /// two reasons: 1. correctness and 2. performance.
280    ///
281    /// Correctness may require an `as_of` to ensure that historical detail
282    /// is consolidated at representative times that do not present specific
283    /// detail that is not specifically correct. For example, updates may be
284    /// compacted to times that are no longer the source times, but instead
285    /// some byproduct of when compaction was executed; we should not present
286    /// those specific times as meaningfully different from other equivalent
287    /// times.
288    ///
289    /// Performance may benefit from an aggressive `as_of` as it reduces the
290    /// number of distinct moments at which collections vary. Differential
291    /// dataflow will refresh its outputs at each time its inputs change and
292    /// to moderate that we can minimize the volume of distinct input times
293    /// as much as possible.
294    ///
295    /// Generally, one should consider setting `as_of` at least to the `since`
296    /// frontiers of contributing data sources and as aggressively as the
297    /// computation permits.
298    pub fn set_as_of(&mut self, as_of: Antichain<T>) {
299        self.as_of = Some(as_of);
300    }
301
302    /// Records the initial `as_of` of the storage collection associated with a materialized view.
303    pub fn set_initial_as_of(&mut self, initial_as_of: Antichain<T>) {
304        self.initial_storage_as_of = Some(initial_as_of);
305    }
306
307    /// Identifiers of imported objects (indexes and sources).
308    pub fn import_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
309        self.imported_index_ids().chain(self.imported_source_ids())
310    }
311
312    /// Identifiers of imported indexes.
313    pub fn imported_index_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
314        self.index_imports.keys().copied()
315    }
316
317    /// Identifiers of imported sources.
318    pub fn imported_source_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
319        self.source_imports.keys().copied()
320    }
321
322    /// Identifiers of exported objects (indexes and sinks).
323    pub fn export_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
324        self.exported_index_ids().chain(self.exported_sink_ids())
325    }
326
327    /// Identifiers of exported indexes.
328    pub fn exported_index_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
329        self.index_exports.keys().copied()
330    }
331
332    /// Identifiers of exported sinks.
333    pub fn exported_sink_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
334        self.sink_exports.keys().copied()
335    }
336
337    /// Identifiers of exported persist sinks.
338    pub fn persist_sink_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
339        self.sink_exports
340            .iter()
341            .filter_map(|(id, desc)| match desc.connection {
342                ComputeSinkConnection::MaterializedView(_) => Some(*id),
343                ComputeSinkConnection::ContinualTask(_) => Some(*id),
344                _ => None,
345            })
346    }
347
348    /// Identifiers of exported subscribe sinks.
349    pub fn subscribe_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
350        self.sink_exports
351            .iter()
352            .filter_map(|(id, desc)| match desc.connection {
353                ComputeSinkConnection::Subscribe(_) => Some(*id),
354                _ => None,
355            })
356    }
357
358    /// Identifiers of exported continual tasks.
359    pub fn continual_task_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
360        self.sink_exports
361            .iter()
362            .filter_map(|(id, desc)| match desc.connection {
363                ComputeSinkConnection::ContinualTask(_) => Some(*id),
364                _ => None,
365            })
366    }
367
368    /// Identifiers of exported copy to sinks.
369    pub fn copy_to_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
370        self.sink_exports
371            .iter()
372            .filter_map(|(id, desc)| match desc.connection {
373                ComputeSinkConnection::CopyToS3Oneshot(_) => Some(*id),
374                _ => None,
375            })
376    }
377
378    /// Produce a `Display`able value containing the import IDs of this dataflow.
379    pub fn display_import_ids(&self) -> impl fmt::Display + '_ {
380        use mz_ore::str::{bracketed, separated};
381        bracketed("[", "]", separated(", ", self.import_ids()))
382    }
383
384    /// Produce a `Display`able value containing the export IDs of this dataflow.
385    pub fn display_export_ids(&self) -> impl fmt::Display + '_ {
386        use mz_ore::str::{bracketed, separated};
387        bracketed("[", "]", separated(", ", self.export_ids()))
388    }
389
390    /// Whether this dataflow installs transient collections.
391    pub fn is_transient(&self) -> bool {
392        self.export_ids().all(|id| id.is_transient())
393    }
394
395    /// Returns the description of the object to build with the specified
396    /// identifier.
397    ///
398    /// # Panics
399    ///
400    /// Panics if `id` is not present in `objects_to_build` exactly once.
401    pub fn build_desc(&self, id: GlobalId) -> &BuildDesc<P> {
402        let mut builds = self.objects_to_build.iter().filter(|build| build.id == id);
403        let build = builds
404            .next()
405            .unwrap_or_else(|| panic!("object to build id {id} unexpectedly missing"));
406        assert!(builds.next().is_none());
407        build
408    }
409}
410
411impl<P, S, T> DataflowDescription<P, S, T>
412where
413    P: CollectionPlan,
414{
415    /// Computes the set of identifiers upon which the specified collection
416    /// identifier depends.
417    ///
418    /// `collection_id` must specify a valid object in `objects_to_build`.
419    ///
420    /// This method includes identifiers for e.g. intermediate views, and should be filtered
421    /// if one only wants sources and indexes.
422    ///
423    /// This method is safe for mutually recursive view definitions.
424    pub fn depends_on(&self, collection_id: GlobalId) -> BTreeSet<GlobalId> {
425        let mut out = BTreeSet::new();
426        self.depends_on_into(collection_id, &mut out);
427        out
428    }
429
430    /// Like `depends_on`, but appends to an existing `BTreeSet`.
431    pub fn depends_on_into(&self, collection_id: GlobalId, out: &mut BTreeSet<GlobalId>) {
432        out.insert(collection_id);
433        if self.source_imports.contains_key(&collection_id) {
434            // The collection is provided by an imported source. Report the
435            // dependency on the source.
436            out.insert(collection_id);
437            return;
438        }
439
440        // NOTE(benesch): we're not smart enough here to know *which* index
441        // for the collection will be used, if one exists, so we have to report
442        // the dependency on all of them.
443        let mut found_index = false;
444        for (index_id, IndexImport { desc, .. }) in &self.index_imports {
445            if desc.on_id == collection_id {
446                // The collection is provided by an imported index. Report the
447                // dependency on the index.
448                out.insert(*index_id);
449                found_index = true;
450            }
451        }
452        if found_index {
453            return;
454        }
455
456        // The collection is not provided by a source or imported index.
457        // It must be a collection whose plan we have handy. Recurse.
458        let build = self.build_desc(collection_id);
459        for id in build.plan.depends_on() {
460            if !out.contains(&id) {
461                self.depends_on_into(id, out)
462            }
463        }
464    }
465
466    /// Computes the set of imports upon which the specified collection depends.
467    ///
468    /// This method behaves like `depends_on` but filters out internal dependencies that are not
469    /// included in the dataflow imports.
470    pub fn depends_on_imports(&self, collection_id: GlobalId) -> BTreeSet<GlobalId> {
471        let is_import = |id: &GlobalId| {
472            self.source_imports.contains_key(id) || self.index_imports.contains_key(id)
473        };
474
475        let deps = self.depends_on(collection_id);
476        deps.into_iter().filter(is_import).collect()
477    }
478}
479
480impl<S, T> DataflowDescription<RenderPlan, S, T>
481where
482    S: Clone + PartialEq,
483    T: Clone + timely::PartialOrder,
484{
485    /// Determine if a dataflow description is compatible with this dataflow description.
486    ///
487    /// Compatible dataflows have structurally equal exports, imports, and objects to build. The
488    /// `as_of` of the receiver has to be less equal the `other` `as_of`.
489    ///
490    /// Note that this method performs normalization as part of the structural equality checking,
491    /// which involves cloning both `self` and `other`. It is therefore relatively expensive and
492    /// should only be used on cold code paths.
493    ///
494    // TODO: The semantics of this function are only useful for command reconciliation at the moment.
495    pub fn compatible_with(&self, other: &Self) -> bool {
496        let old = self.as_comparable();
497        let new = other.as_comparable();
498
499        let equality = old.index_exports == new.index_exports
500            && old.sink_exports == new.sink_exports
501            && old.objects_to_build == new.objects_to_build
502            && old.index_imports == new.index_imports
503            && old.source_imports == new.source_imports
504            && old.time_dependence == new.time_dependence;
505
506        let partial = if let (Some(old_as_of), Some(new_as_of)) = (&old.as_of, &new.as_of) {
507            timely::PartialOrder::less_equal(old_as_of, new_as_of)
508        } else {
509            false
510        };
511
512        equality && partial
513    }
514
515    /// Returns a `DataflowDescription` that has the same structure as `self` and can be
516    /// structurally compared to other `DataflowDescription`s.
517    ///
518    /// The function normalizes several properties. It replaces transient `GlobalId`s
519    /// that are only used internally (i.e. not imported nor exported) with consecutive IDs
520    /// starting from `t1`. It replaces the source import's `upper` by a dummy value.
521    fn as_comparable(&self) -> Self {
522        let external_ids: BTreeSet<_> = self.import_ids().chain(self.export_ids()).collect();
523
524        let mut id_counter = 0;
525        let mut replacements = BTreeMap::new();
526
527        let mut maybe_replace = |id: GlobalId| {
528            if id.is_transient() && !external_ids.contains(&id) {
529                *replacements.entry(id).or_insert_with(|| {
530                    id_counter += 1;
531                    GlobalId::Transient(id_counter)
532                })
533            } else {
534                id
535            }
536        };
537
538        let mut source_imports = self.source_imports.clone();
539        for (_source, _monotonic, upper) in source_imports.values_mut() {
540            *upper = Antichain::new();
541        }
542
543        let mut objects_to_build = self.objects_to_build.clone();
544        for object in &mut objects_to_build {
545            object.id = maybe_replace(object.id);
546            object.plan.replace_ids(&mut maybe_replace);
547        }
548
549        let mut index_exports = self.index_exports.clone();
550        for (desc, _typ) in index_exports.values_mut() {
551            desc.on_id = maybe_replace(desc.on_id);
552        }
553
554        let mut sink_exports = self.sink_exports.clone();
555        for desc in sink_exports.values_mut() {
556            desc.from = maybe_replace(desc.from);
557        }
558
559        DataflowDescription {
560            source_imports,
561            index_imports: self.index_imports.clone(),
562            objects_to_build,
563            index_exports,
564            sink_exports,
565            as_of: self.as_of.clone(),
566            until: self.until.clone(),
567            initial_storage_as_of: self.initial_storage_as_of.clone(),
568            refresh_schedule: self.refresh_schedule.clone(),
569            debug_name: self.debug_name.clone(),
570            time_dependence: self.time_dependence.clone(),
571        }
572    }
573}
574
575impl RustType<ProtoDataflowDescription> for DataflowDescription<RenderPlan, CollectionMetadata> {
576    fn into_proto(&self) -> ProtoDataflowDescription {
577        ProtoDataflowDescription {
578            source_imports: self.source_imports.into_proto(),
579            index_imports: self.index_imports.into_proto(),
580            objects_to_build: self.objects_to_build.into_proto(),
581            index_exports: self.index_exports.into_proto(),
582            sink_exports: self.sink_exports.into_proto(),
583            as_of: self.as_of.into_proto(),
584            until: Some(self.until.into_proto()),
585            initial_storage_as_of: self.initial_storage_as_of.into_proto(),
586            refresh_schedule: self.refresh_schedule.into_proto(),
587            debug_name: self.debug_name.clone(),
588            time_dependence: self.time_dependence.into_proto(),
589        }
590    }
591
592    fn from_proto(proto: ProtoDataflowDescription) -> Result<Self, TryFromProtoError> {
593        Ok(DataflowDescription {
594            source_imports: proto.source_imports.into_rust()?,
595            index_imports: proto.index_imports.into_rust()?,
596            objects_to_build: proto.objects_to_build.into_rust()?,
597            index_exports: proto.index_exports.into_rust()?,
598            sink_exports: proto.sink_exports.into_rust()?,
599            as_of: proto.as_of.map(|x| x.into_rust()).transpose()?,
600            until: proto
601                .until
602                .map(|x| x.into_rust())
603                .transpose()?
604                .unwrap_or_else(Antichain::new),
605            initial_storage_as_of: proto
606                .initial_storage_as_of
607                .map(|x| x.into_rust())
608                .transpose()?,
609            refresh_schedule: proto.refresh_schedule.into_rust()?,
610            debug_name: proto.debug_name,
611            time_dependence: proto.time_dependence.into_rust()?,
612        })
613    }
614}
615
616impl
617    ProtoMapEntry<
618        GlobalId,
619        (
620            SourceInstanceDesc<CollectionMetadata>,
621            bool,
622            Antichain<mz_repr::Timestamp>,
623        ),
624    > for ProtoSourceImport
625{
626    fn from_rust<'a>(
627        entry: (
628            &'a GlobalId,
629            &'a (
630                SourceInstanceDesc<CollectionMetadata>,
631                bool,
632                Antichain<mz_repr::Timestamp>,
633            ),
634        ),
635    ) -> Self {
636        ProtoSourceImport {
637            id: Some(entry.0.into_proto()),
638            source_instance_desc: Some((entry.1).0.into_proto()),
639            monotonic: (entry.1).1.into_proto(),
640            upper: Some((entry.1).2.into_proto()),
641        }
642    }
643
644    fn into_rust(
645        self,
646    ) -> Result<
647        (
648            GlobalId,
649            (
650                SourceInstanceDesc<CollectionMetadata>,
651                bool,
652                Antichain<mz_repr::Timestamp>,
653            ),
654        ),
655        TryFromProtoError,
656    > {
657        Ok((
658            self.id.into_rust_if_some("ProtoSourceImport::id")?,
659            (
660                self.source_instance_desc
661                    .into_rust_if_some("ProtoSourceImport::source_instance_desc")?,
662                self.monotonic.into_rust()?,
663                self.upper.into_rust_if_some("ProtoSourceImport::upper")?,
664            ),
665        ))
666    }
667}
668
669impl ProtoMapEntry<GlobalId, IndexImport> for ProtoIndexImport {
670    fn from_rust<'a>(
671        (
672            id,
673            IndexImport {
674                desc,
675                typ,
676                monotonic,
677            },
678        ): (&'a GlobalId, &'a IndexImport),
679    ) -> Self {
680        ProtoIndexImport {
681            id: Some(id.into_proto()),
682            index_desc: Some(desc.into_proto()),
683            typ: Some(typ.into_proto()),
684            monotonic: monotonic.into_proto(),
685        }
686    }
687
688    fn into_rust(self) -> Result<(GlobalId, IndexImport), TryFromProtoError> {
689        Ok((
690            self.id.into_rust_if_some("ProtoIndex::id")?,
691            IndexImport {
692                desc: self
693                    .index_desc
694                    .into_rust_if_some("ProtoIndexImport::index_desc")?,
695                typ: self.typ.into_rust_if_some("ProtoIndexImport::typ")?,
696                monotonic: self.monotonic.into_rust()?,
697            },
698        ))
699    }
700}
701
702impl ProtoMapEntry<GlobalId, (IndexDesc, RelationType)> for ProtoIndexExport {
703    fn from_rust<'a>(
704        (id, (index_desc, typ)): (&'a GlobalId, &'a (IndexDesc, RelationType)),
705    ) -> Self {
706        ProtoIndexExport {
707            id: Some(id.into_proto()),
708            index_desc: Some(index_desc.into_proto()),
709            typ: Some(typ.into_proto()),
710        }
711    }
712
713    fn into_rust(self) -> Result<(GlobalId, (IndexDesc, RelationType)), TryFromProtoError> {
714        Ok((
715            self.id.into_rust_if_some("ProtoIndexExport::id")?,
716            (
717                self.index_desc
718                    .into_rust_if_some("ProtoIndexExport::index_desc")?,
719                self.typ.into_rust_if_some("ProtoIndexExport::typ")?,
720            ),
721        ))
722    }
723}
724
725impl ProtoMapEntry<GlobalId, ComputeSinkDesc<CollectionMetadata>> for ProtoSinkExport {
726    fn from_rust<'a>(
727        (id, sink_desc): (&'a GlobalId, &'a ComputeSinkDesc<CollectionMetadata>),
728    ) -> Self {
729        ProtoSinkExport {
730            id: Some(id.into_proto()),
731            sink_desc: Some(sink_desc.into_proto()),
732        }
733    }
734
735    fn into_rust(
736        self,
737    ) -> Result<(GlobalId, ComputeSinkDesc<CollectionMetadata>), TryFromProtoError> {
738        Ok((
739            self.id.into_rust_if_some("ProtoSinkExport::id")?,
740            self.sink_desc
741                .into_rust_if_some("ProtoSinkExport::sink_desc")?,
742        ))
743    }
744}
745
746impl Arbitrary for DataflowDescription<RenderPlan, CollectionMetadata, mz_repr::Timestamp> {
747    type Strategy = BoxedStrategy<Self>;
748    type Parameters = ();
749
750    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
751        any_dataflow_description(any_source_import_collection_metadata()).boxed()
752    }
753}
754
755impl Arbitrary for DataflowDescription<OptimizedMirRelationExpr, (), mz_repr::Timestamp> {
756    type Strategy = BoxedStrategy<Self>;
757    type Parameters = ();
758
759    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
760        any_dataflow_description(any_source_import()).boxed()
761    }
762}
763
764impl Arbitrary for DataflowDescription<Plan, (), mz_repr::Timestamp> {
765    type Strategy = BoxedStrategy<Self>;
766    type Parameters = ();
767
768    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
769        any_dataflow_description(any_source_import()).boxed()
770    }
771}
772
773fn any_dataflow_description<P, S, T>(
774    any_source_import: impl Strategy<Value = (GlobalId, (SourceInstanceDesc<S>, bool, Antichain<T>))>,
775) -> impl Strategy<Value = DataflowDescription<P, S, T>>
776where
777    P: Arbitrary,
778    S: 'static + Arbitrary,
779    T: Arbitrary + timely::PartialOrder,
780    ComputeSinkDesc<S, T>: Arbitrary,
781{
782    // `prop_map` is only implemented for tuples of 12 elements or less, so we need to use nested
783    // tuples.
784    (
785        (
786            proptest::collection::vec(any_source_import, 1..3),
787            proptest::collection::vec(any_dataflow_index_import(), 1..3),
788            proptest::collection::vec(any::<BuildDesc<P>>(), 1..3),
789            proptest::collection::vec(any_dataflow_index_export(), 1..3),
790            proptest::collection::vec(any::<(GlobalId, ComputeSinkDesc<S, T>)>(), 1..3),
791            any::<bool>(),
792            proptest::collection::vec(any::<T>(), 1..5),
793            any::<bool>(),
794            proptest::collection::vec(any::<T>(), 1..5),
795            any::<bool>(),
796            any::<RefreshSchedule>(),
797            proptest::string::string_regex(".*").unwrap(),
798        ),
799        any::<Option<TimeDependence>>(),
800    )
801        .prop_map(
802            |(
803                (
804                    source_imports,
805                    index_imports,
806                    objects_to_build,
807                    index_exports,
808                    sink_descs,
809                    as_of_some,
810                    as_of,
811                    initial_storage_as_of_some,
812                    initial_as_of,
813                    refresh_schedule_some,
814                    refresh_schedule,
815                    debug_name,
816                ),
817                time_dependence,
818            )| DataflowDescription {
819                source_imports: BTreeMap::from_iter(source_imports),
820                index_imports: BTreeMap::from_iter(index_imports),
821                objects_to_build,
822                index_exports: BTreeMap::from_iter(index_exports),
823                sink_exports: BTreeMap::from_iter(sink_descs),
824                as_of: if as_of_some {
825                    Some(Antichain::from(as_of))
826                } else {
827                    None
828                },
829                until: Antichain::new(),
830                initial_storage_as_of: if initial_storage_as_of_some {
831                    Some(Antichain::from(initial_as_of))
832                } else {
833                    None
834                },
835                refresh_schedule: if refresh_schedule_some {
836                    Some(refresh_schedule)
837                } else {
838                    None
839                },
840                debug_name,
841                time_dependence,
842            },
843        )
844}
845
846fn any_source_import_collection_metadata() -> impl Strategy<
847    Value = (
848        GlobalId,
849        (
850            SourceInstanceDesc<CollectionMetadata>,
851            bool,
852            Antichain<mz_repr::Timestamp>,
853        ),
854    ),
855> {
856    (
857        any::<GlobalId>(),
858        any::<(SourceInstanceDesc<CollectionMetadata>, bool)>().prop_map(
859            |(source_instance_desc, monotonic)| (source_instance_desc, monotonic, Antichain::new()),
860        ),
861    )
862}
863
864fn any_source_import() -> impl Strategy<
865    Value = (
866        GlobalId,
867        (SourceInstanceDesc<()>, bool, Antichain<mz_repr::Timestamp>),
868    ),
869> {
870    (any::<GlobalId>(), any::<(SourceInstanceDesc<()>, bool)>()).prop_map(
871        |(id, (source_instance_desc, monotonic))| {
872            (id, (source_instance_desc, monotonic, Antichain::new()))
873        },
874    )
875}
876
877proptest::prop_compose! {
878    fn any_dataflow_index_import()(
879        id in any::<GlobalId>(),
880        desc in any::<IndexDesc>(),
881        typ in any::<RelationType>(),
882        monotonic in any::<bool>(),
883    ) -> (GlobalId, IndexImport) {
884        (id, IndexImport {desc, typ, monotonic})
885    }
886}
887
888proptest::prop_compose! {
889    fn any_dataflow_index_export()(
890        id in any::<GlobalId>(),
891        index in any::<IndexDesc>(),
892        typ in any::<RelationType>(),
893    ) -> (GlobalId, (IndexDesc, RelationType)) {
894        (id, (index, typ))
895    }
896}
897
898/// A commonly used name for dataflows contain MIR expressions.
899pub type DataflowDesc = DataflowDescription<OptimizedMirRelationExpr, ()>;
900
901/// An index storing processed updates so they can be queried
902/// or reused in other computations
903#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
904pub struct IndexDesc {
905    /// Identity of the collection the index is on.
906    pub on_id: GlobalId,
907    /// Expressions to be arranged, in order of decreasing primacy.
908    #[proptest(strategy = "proptest::collection::vec(any::<MirScalarExpr>(), 1..3)")]
909    pub key: Vec<MirScalarExpr>,
910}
911
912impl RustType<ProtoIndexDesc> for IndexDesc {
913    fn into_proto(&self) -> ProtoIndexDesc {
914        ProtoIndexDesc {
915            on_id: Some(self.on_id.into_proto()),
916            key: self.key.into_proto(),
917        }
918    }
919
920    fn from_proto(proto: ProtoIndexDesc) -> Result<Self, TryFromProtoError> {
921        Ok(IndexDesc {
922            on_id: proto.on_id.into_rust_if_some("ProtoIndexDesc::on_id")?,
923            key: proto.key.into_rust()?,
924        })
925    }
926}
927
928/// Information about an imported index, and how it will be used by the dataflow.
929#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
930pub struct IndexImport {
931    /// Description of index.
932    pub desc: IndexDesc,
933    /// Schema and keys of the object the index is on.
934    pub typ: RelationType,
935    /// Whether the index will supply monotonic data.
936    pub monotonic: bool,
937}
938
939/// An association of a global identifier to an expression.
940#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
941pub struct BuildDesc<P> {
942    /// TODO(database-issues#7533): Add documentation.
943    pub id: GlobalId,
944    /// TODO(database-issues#7533): Add documentation.
945    pub plan: P,
946}
947
948impl RustType<ProtoBuildDesc> for BuildDesc<RenderPlan> {
949    fn into_proto(&self) -> ProtoBuildDesc {
950        ProtoBuildDesc {
951            id: Some(self.id.into_proto()),
952            plan: Some(self.plan.into_proto()),
953        }
954    }
955
956    fn from_proto(x: ProtoBuildDesc) -> Result<Self, TryFromProtoError> {
957        Ok(BuildDesc {
958            id: x.id.into_rust_if_some("ProtoBuildDesc::id")?,
959            plan: x.plan.into_rust_if_some("ProtoBuildDesc::plan")?,
960        })
961    }
962}
963
964#[cfg(test)]
965mod tests {
966    use mz_ore::assert_ok;
967    use mz_proto::protobuf_roundtrip;
968    use proptest::prelude::ProptestConfig;
969    use proptest::proptest;
970
971    use crate::dataflows::DataflowDescription;
972
973    use super::*;
974
975    proptest! {
976        #![proptest_config(ProptestConfig::with_cases(32))]
977
978
979        #[mz_ore::test]
980        fn dataflow_description_protobuf_roundtrip(expect in any::<DataflowDescription<RenderPlan, CollectionMetadata, mz_repr::Timestamp>>()) {
981            let actual = protobuf_roundtrip::<_, ProtoDataflowDescription>(&expect);
982            assert_ok!(actual);
983            assert_eq!(actual.unwrap(), expect);
984        }
985    }
986}