mz_compute_types/
dataflows.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types for describing dataflows.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14
15use mz_expr::{CollectionPlan, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr};
16use mz_ore::collections::CollectionExt;
17use mz_ore::soft_assert_or_log;
18use mz_repr::refresh_schedule::RefreshSchedule;
19use mz_repr::{GlobalId, SqlRelationType};
20use mz_storage_types::time_dependence::TimeDependence;
21use serde::{Deserialize, Serialize};
22use timely::progress::Antichain;
23
24use crate::plan::Plan;
25use crate::plan::render_plan::RenderPlan;
26use crate::sinks::{ComputeSinkConnection, ComputeSinkDesc};
27use crate::sources::{SourceInstanceArguments, SourceInstanceDesc};
28
29/// A description of a dataflow to construct and results to surface.
30#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
31pub struct DataflowDescription<P, S: 'static = (), T = mz_repr::Timestamp> {
32    /// Sources instantiations made available to the dataflow pair with monotonicity information.
33    pub source_imports: BTreeMap<GlobalId, (SourceInstanceDesc<S>, bool, Antichain<T>)>,
34    /// Indexes made available to the dataflow.
35    /// (id of index, import)
36    pub index_imports: BTreeMap<GlobalId, IndexImport>,
37    /// Views and indexes to be built and stored in the local context.
38    /// Objects must be built in the specific order, as there may be
39    /// dependencies of later objects on prior identifiers.
40    pub objects_to_build: Vec<BuildDesc<P>>,
41    /// Indexes to be made available to be shared with other dataflows
42    /// (id of new index, description of index, relationtype of base source/view/table)
43    pub index_exports: BTreeMap<GlobalId, (IndexDesc, SqlRelationType)>,
44    /// sinks to be created
45    /// (id of new sink, description of sink)
46    pub sink_exports: BTreeMap<GlobalId, ComputeSinkDesc<S, T>>,
47    /// An optional frontier to which inputs should be advanced.
48    ///
49    /// If this is set, it should override the default setting determined by
50    /// the upper bound of `since` frontiers contributing to the dataflow.
51    /// It is an error for this to be set to a frontier not beyond that default.
52    pub as_of: Option<Antichain<T>>,
53    /// Frontier beyond which the dataflow should not execute.
54    /// Specifically, updates at times greater or equal to this frontier are suppressed.
55    /// This is often set to `as_of + 1` to enable "batch" computations.
56    /// Note that frontier advancements might still happen to times that are after the `until`,
57    /// only data is suppressed. (This is consistent with how frontier advancements can also
58    /// happen before the `as_of`.)
59    pub until: Antichain<T>,
60    /// The initial as_of when the collection is first created. Filled only for materialized views.
61    /// Note that this doesn't change upon restarts.
62    pub initial_storage_as_of: Option<Antichain<T>>,
63    /// The schedule of REFRESH materialized views.
64    pub refresh_schedule: Option<RefreshSchedule>,
65    /// Human-readable name
66    pub debug_name: String,
67    /// Description of how the dataflow's progress relates to wall-clock time. None for unknown.
68    pub time_dependence: Option<TimeDependence>,
69}
70
71impl<P, S> DataflowDescription<P, S, mz_repr::Timestamp> {
72    /// Tests if the dataflow refers to a single timestamp, namely
73    /// that `as_of` has a single coordinate and that the `until`
74    /// value corresponds to the `as_of` value plus one, or `as_of`
75    /// is the maximum timestamp and is thus single.
76    pub fn is_single_time(&self) -> bool {
77        // TODO: this would be much easier to check if `until` was a strict lower bound,
78        // and we would be testing that `until == as_of`.
79
80        let until = &self.until;
81
82        // IF `as_of` is not set at all this can't be a single time dataflow.
83        let Some(as_of) = self.as_of.as_ref() else {
84            return false;
85        };
86        // Ensure that as_of <= until.
87        soft_assert_or_log!(
88            timely::PartialOrder::less_equal(as_of, until),
89            "expected empty `as_of ≤ until`, got `{as_of:?} ≰ {until:?}`",
90        );
91        // IF `as_of` is not a single timestamp this can't be a single time dataflow.
92        let Some(as_of) = as_of.as_option() else {
93            return false;
94        };
95        // Ensure that `as_of = MAX` implies `until.is_empty()`.
96        soft_assert_or_log!(
97            as_of != &mz_repr::Timestamp::MAX || until.is_empty(),
98            "expected `until = {{}}` due to `as_of = MAX`, got `until = {until:?}`",
99        );
100        // Note that the `(as_of = MAX, until = {})` case also returns `true`
101        // here (as expected) since we are going to compare two `None` values.
102        as_of.try_step_forward().as_ref() == until.as_option()
103    }
104}
105
106impl<T> DataflowDescription<Plan<T>, (), mz_repr::Timestamp> {
107    /// Check invariants expected to be true about `DataflowDescription`s.
108    pub fn check_invariants(&self) -> Result<(), String> {
109        let mut plans: Vec<_> = self.objects_to_build.iter().map(|o| &o.plan).collect();
110        let mut lir_ids = BTreeSet::new();
111
112        while let Some(plan) = plans.pop() {
113            let lir_id = plan.lir_id;
114            if !lir_ids.insert(lir_id) {
115                return Err(format!(
116                    "duplicate `LirId` in `DataflowDescription`: {lir_id}"
117                ));
118            }
119            plans.extend(plan.node.children());
120        }
121
122        Ok(())
123    }
124}
125
126impl<T> DataflowDescription<OptimizedMirRelationExpr, (), T> {
127    /// Imports a previously exported index.
128    ///
129    /// This method makes available an index previously exported as `id`, identified
130    /// to the query by `description` (which names the view the index arranges, and
131    /// the keys by which it is arranged).
132    pub fn import_index(
133        &mut self,
134        id: GlobalId,
135        desc: IndexDesc,
136        typ: SqlRelationType,
137        monotonic: bool,
138    ) {
139        self.index_imports.insert(
140            id,
141            IndexImport {
142                desc,
143                typ,
144                monotonic,
145            },
146        );
147    }
148
149    /// Imports a source and makes it available as `id`.
150    pub fn import_source(&mut self, id: GlobalId, typ: SqlRelationType, monotonic: bool) {
151        // Import the source with no linear operators applied to it.
152        // They may be populated by whole-dataflow optimization.
153        self.source_imports.insert(
154            id,
155            (
156                SourceInstanceDesc {
157                    storage_metadata: (),
158                    arguments: SourceInstanceArguments { operators: None },
159                    typ,
160                },
161                monotonic,
162                Antichain::new(),
163            ),
164        );
165    }
166
167    /// Binds to `id` the relation expression `plan`.
168    pub fn insert_plan(&mut self, id: GlobalId, plan: OptimizedMirRelationExpr) {
169        self.objects_to_build.push(BuildDesc { id, plan });
170    }
171
172    /// Exports as `id` an index described by `description`.
173    ///
174    /// Future uses of `import_index` in other dataflow descriptions may use `id`,
175    /// as long as this dataflow has not been terminated in the meantime.
176    pub fn export_index(&mut self, id: GlobalId, description: IndexDesc, on_type: SqlRelationType) {
177        // We first create a "view" named `id` that ensures that the
178        // data are correctly arranged and available for export.
179        self.insert_plan(
180            id,
181            OptimizedMirRelationExpr::declare_optimized(MirRelationExpr::ArrangeBy {
182                input: Box::new(MirRelationExpr::global_get(
183                    description.on_id,
184                    on_type.clone(),
185                )),
186                keys: vec![description.key.clone()],
187            }),
188        );
189        self.index_exports.insert(id, (description, on_type));
190    }
191
192    /// Exports as `id` a sink described by `description`.
193    pub fn export_sink(&mut self, id: GlobalId, description: ComputeSinkDesc<(), T>) {
194        self.sink_exports.insert(id, description);
195    }
196
197    /// Returns true iff `id` is already imported.
198    pub fn is_imported(&self, id: &GlobalId) -> bool {
199        self.objects_to_build.iter().any(|bd| &bd.id == id)
200            || self.index_imports.keys().any(|i| i == id)
201            || self.source_imports.keys().any(|i| i == id)
202    }
203
204    /// The number of columns associated with an identifier in the dataflow.
205    pub fn arity_of(&self, id: &GlobalId) -> usize {
206        for (source_id, (source, _monotonic, _upper)) in self.source_imports.iter() {
207            if source_id == id {
208                return source.typ.arity();
209            }
210        }
211        for IndexImport { desc, typ, .. } in self.index_imports.values() {
212            if &desc.on_id == id {
213                return typ.arity();
214            }
215        }
216        for desc in self.objects_to_build.iter() {
217            if &desc.id == id {
218                return desc.plan.arity();
219            }
220        }
221        panic!("GlobalId {} not found in DataflowDesc", id);
222    }
223
224    /// Calls r and s on any sub-members of those types in self. Halts at the first error return.
225    pub fn visit_children<R, S, E>(&mut self, r: R, s: S) -> Result<(), E>
226    where
227        R: Fn(&mut OptimizedMirRelationExpr) -> Result<(), E>,
228        S: Fn(&mut MirScalarExpr) -> Result<(), E>,
229    {
230        for BuildDesc { plan, .. } in &mut self.objects_to_build {
231            r(plan)?;
232        }
233        for (source_instance_desc, _, _upper) in self.source_imports.values_mut() {
234            let Some(mfp) = source_instance_desc.arguments.operators.as_mut() else {
235                continue;
236            };
237            for expr in mfp.expressions.iter_mut() {
238                s(expr)?;
239            }
240            for (_, expr) in mfp.predicates.iter_mut() {
241                s(expr)?;
242            }
243        }
244        Ok(())
245    }
246}
247
248impl<P, S, T> DataflowDescription<P, S, T> {
249    /// Creates a new dataflow description with a human-readable name.
250    pub fn new(name: String) -> Self {
251        Self {
252            source_imports: Default::default(),
253            index_imports: Default::default(),
254            objects_to_build: Vec::new(),
255            index_exports: Default::default(),
256            sink_exports: Default::default(),
257            as_of: Default::default(),
258            until: Antichain::new(),
259            initial_storage_as_of: None,
260            refresh_schedule: None,
261            debug_name: name,
262            time_dependence: None,
263        }
264    }
265
266    /// Sets the `as_of` frontier to the supplied argument.
267    ///
268    /// This method allows the dataflow to indicate a frontier up through
269    /// which all times should be advanced. This can be done for at least
270    /// two reasons: 1. correctness and 2. performance.
271    ///
272    /// Correctness may require an `as_of` to ensure that historical detail
273    /// is consolidated at representative times that do not present specific
274    /// detail that is not specifically correct. For example, updates may be
275    /// compacted to times that are no longer the source times, but instead
276    /// some byproduct of when compaction was executed; we should not present
277    /// those specific times as meaningfully different from other equivalent
278    /// times.
279    ///
280    /// Performance may benefit from an aggressive `as_of` as it reduces the
281    /// number of distinct moments at which collections vary. Differential
282    /// dataflow will refresh its outputs at each time its inputs change and
283    /// to moderate that we can minimize the volume of distinct input times
284    /// as much as possible.
285    ///
286    /// Generally, one should consider setting `as_of` at least to the `since`
287    /// frontiers of contributing data sources and as aggressively as the
288    /// computation permits.
289    pub fn set_as_of(&mut self, as_of: Antichain<T>) {
290        self.as_of = Some(as_of);
291    }
292
293    /// Records the initial `as_of` of the storage collection associated with a materialized view.
294    pub fn set_initial_as_of(&mut self, initial_as_of: Antichain<T>) {
295        self.initial_storage_as_of = Some(initial_as_of);
296    }
297
298    /// Identifiers of imported objects (indexes and sources).
299    pub fn import_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
300        self.imported_index_ids().chain(self.imported_source_ids())
301    }
302
303    /// Identifiers of imported indexes.
304    pub fn imported_index_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
305        self.index_imports.keys().copied()
306    }
307
308    /// Identifiers of imported sources.
309    pub fn imported_source_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
310        self.source_imports.keys().copied()
311    }
312
313    /// Identifiers of exported objects (indexes and sinks).
314    pub fn export_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
315        self.exported_index_ids().chain(self.exported_sink_ids())
316    }
317
318    /// Identifiers of exported indexes.
319    pub fn exported_index_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
320        self.index_exports.keys().copied()
321    }
322
323    /// Identifiers of exported sinks.
324    pub fn exported_sink_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
325        self.sink_exports.keys().copied()
326    }
327
328    /// Identifiers of exported persist sinks.
329    pub fn persist_sink_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
330        self.sink_exports
331            .iter()
332            .filter_map(|(id, desc)| match desc.connection {
333                ComputeSinkConnection::MaterializedView(_) => Some(*id),
334                ComputeSinkConnection::ContinualTask(_) => Some(*id),
335                _ => None,
336            })
337    }
338
339    /// Identifiers of exported subscribe sinks.
340    pub fn subscribe_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
341        self.sink_exports
342            .iter()
343            .filter_map(|(id, desc)| match desc.connection {
344                ComputeSinkConnection::Subscribe(_) => Some(*id),
345                _ => None,
346            })
347    }
348
349    /// Identifiers of exported continual tasks.
350    pub fn continual_task_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
351        self.sink_exports
352            .iter()
353            .filter_map(|(id, desc)| match desc.connection {
354                ComputeSinkConnection::ContinualTask(_) => Some(*id),
355                _ => None,
356            })
357    }
358
359    /// Identifiers of exported copy to sinks.
360    pub fn copy_to_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
361        self.sink_exports
362            .iter()
363            .filter_map(|(id, desc)| match desc.connection {
364                ComputeSinkConnection::CopyToS3Oneshot(_) => Some(*id),
365                _ => None,
366            })
367    }
368
369    /// Produce a `Display`able value containing the import IDs of this dataflow.
370    pub fn display_import_ids(&self) -> impl fmt::Display + '_ {
371        use mz_ore::str::{bracketed, separated};
372        bracketed("[", "]", separated(", ", self.import_ids()))
373    }
374
375    /// Produce a `Display`able value containing the export IDs of this dataflow.
376    pub fn display_export_ids(&self) -> impl fmt::Display + '_ {
377        use mz_ore::str::{bracketed, separated};
378        bracketed("[", "]", separated(", ", self.export_ids()))
379    }
380
381    /// Whether this dataflow installs transient collections.
382    pub fn is_transient(&self) -> bool {
383        self.export_ids().all(|id| id.is_transient())
384    }
385
386    /// Returns the description of the object to build with the specified
387    /// identifier.
388    ///
389    /// # Panics
390    ///
391    /// Panics if `id` is not present in `objects_to_build` exactly once.
392    pub fn build_desc(&self, id: GlobalId) -> &BuildDesc<P> {
393        let mut builds = self.objects_to_build.iter().filter(|build| build.id == id);
394        let build = builds
395            .next()
396            .unwrap_or_else(|| panic!("object to build id {id} unexpectedly missing"));
397        assert!(builds.next().is_none());
398        build
399    }
400
401    /// Returns the id of the dataflow's sink export.
402    ///
403    /// # Panics
404    ///
405    /// Panics if the dataflow has no sink exports or has more than one.
406    pub fn sink_id(&self) -> GlobalId {
407        let sink_exports = &self.sink_exports;
408        let sink_id = sink_exports.keys().into_element();
409        *sink_id
410    }
411}
412
413impl<P, S, T> DataflowDescription<P, S, T>
414where
415    P: CollectionPlan,
416{
417    /// Computes the set of identifiers upon which the specified collection
418    /// identifier depends.
419    ///
420    /// `collection_id` must specify a valid object in `objects_to_build`.
421    ///
422    /// This method includes identifiers for e.g. intermediate views, and should be filtered
423    /// if one only wants sources and indexes.
424    ///
425    /// This method is safe for mutually recursive view definitions.
426    pub fn depends_on(&self, collection_id: GlobalId) -> BTreeSet<GlobalId> {
427        let mut out = BTreeSet::new();
428        self.depends_on_into(collection_id, &mut out);
429        out
430    }
431
432    /// Like `depends_on`, but appends to an existing `BTreeSet`.
433    pub fn depends_on_into(&self, collection_id: GlobalId, out: &mut BTreeSet<GlobalId>) {
434        out.insert(collection_id);
435        if self.source_imports.contains_key(&collection_id) {
436            // The collection is provided by an imported source. Report the
437            // dependency on the source.
438            out.insert(collection_id);
439            return;
440        }
441
442        // NOTE(benesch): we're not smart enough here to know *which* index
443        // for the collection will be used, if one exists, so we have to report
444        // the dependency on all of them.
445        let mut found_index = false;
446        for (index_id, IndexImport { desc, .. }) in &self.index_imports {
447            if desc.on_id == collection_id {
448                // The collection is provided by an imported index. Report the
449                // dependency on the index.
450                out.insert(*index_id);
451                found_index = true;
452            }
453        }
454        if found_index {
455            return;
456        }
457
458        // The collection is not provided by a source or imported index.
459        // It must be a collection whose plan we have handy. Recurse.
460        let build = self.build_desc(collection_id);
461        for id in build.plan.depends_on() {
462            if !out.contains(&id) {
463                self.depends_on_into(id, out)
464            }
465        }
466    }
467
468    /// Computes the set of imports upon which the specified collection depends.
469    ///
470    /// This method behaves like `depends_on` but filters out internal dependencies that are not
471    /// included in the dataflow imports.
472    pub fn depends_on_imports(&self, collection_id: GlobalId) -> BTreeSet<GlobalId> {
473        let is_import = |id: &GlobalId| {
474            self.source_imports.contains_key(id) || self.index_imports.contains_key(id)
475        };
476
477        let deps = self.depends_on(collection_id);
478        deps.into_iter().filter(is_import).collect()
479    }
480}
481
482impl<S, T> DataflowDescription<RenderPlan, S, T>
483where
484    S: Clone + PartialEq,
485    T: Clone + timely::PartialOrder,
486{
487    /// Determine if a dataflow description is compatible with this dataflow description.
488    ///
489    /// Compatible dataflows have structurally equal exports, imports, and objects to build. The
490    /// `as_of` of the receiver has to be less equal the `other` `as_of`.
491    ///
492    /// Note that this method performs normalization as part of the structural equality checking,
493    /// which involves cloning both `self` and `other`. It is therefore relatively expensive and
494    /// should only be used on cold code paths.
495    ///
496    // TODO: The semantics of this function are only useful for command reconciliation at the moment.
497    pub fn compatible_with(&self, other: &Self) -> bool {
498        let old = self.as_comparable();
499        let new = other.as_comparable();
500
501        let equality = old.index_exports == new.index_exports
502            && old.sink_exports == new.sink_exports
503            && old.objects_to_build == new.objects_to_build
504            && old.index_imports == new.index_imports
505            && old.source_imports == new.source_imports
506            && old.time_dependence == new.time_dependence;
507
508        let partial = if let (Some(old_as_of), Some(new_as_of)) = (&old.as_of, &new.as_of) {
509            timely::PartialOrder::less_equal(old_as_of, new_as_of)
510        } else {
511            false
512        };
513
514        equality && partial
515    }
516
517    /// Returns a `DataflowDescription` that has the same structure as `self` and can be
518    /// structurally compared to other `DataflowDescription`s.
519    ///
520    /// The function normalizes several properties. It replaces transient `GlobalId`s
521    /// that are only used internally (i.e. not imported nor exported) with consecutive IDs
522    /// starting from `t1`. It replaces the source import's `upper` by a dummy value.
523    fn as_comparable(&self) -> Self {
524        let external_ids: BTreeSet<_> = self.import_ids().chain(self.export_ids()).collect();
525
526        let mut id_counter = 0;
527        let mut replacements = BTreeMap::new();
528
529        let mut maybe_replace = |id: GlobalId| {
530            if id.is_transient() && !external_ids.contains(&id) {
531                *replacements.entry(id).or_insert_with(|| {
532                    id_counter += 1;
533                    GlobalId::Transient(id_counter)
534                })
535            } else {
536                id
537            }
538        };
539
540        let mut source_imports = self.source_imports.clone();
541        for (_source, _monotonic, upper) in source_imports.values_mut() {
542            *upper = Antichain::new();
543        }
544
545        let mut objects_to_build = self.objects_to_build.clone();
546        for object in &mut objects_to_build {
547            object.id = maybe_replace(object.id);
548            object.plan.replace_ids(&mut maybe_replace);
549        }
550
551        let mut index_exports = self.index_exports.clone();
552        for (desc, _typ) in index_exports.values_mut() {
553            desc.on_id = maybe_replace(desc.on_id);
554        }
555
556        let mut sink_exports = self.sink_exports.clone();
557        for desc in sink_exports.values_mut() {
558            desc.from = maybe_replace(desc.from);
559        }
560
561        DataflowDescription {
562            source_imports,
563            index_imports: self.index_imports.clone(),
564            objects_to_build,
565            index_exports,
566            sink_exports,
567            as_of: self.as_of.clone(),
568            until: self.until.clone(),
569            initial_storage_as_of: self.initial_storage_as_of.clone(),
570            refresh_schedule: self.refresh_schedule.clone(),
571            debug_name: self.debug_name.clone(),
572            time_dependence: self.time_dependence.clone(),
573        }
574    }
575}
576
577/// A commonly used name for dataflows contain MIR expressions.
578pub type DataflowDesc = DataflowDescription<OptimizedMirRelationExpr, ()>;
579
580/// An index storing processed updates so they can be queried
581/// or reused in other computations
582#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
583pub struct IndexDesc {
584    /// Identity of the collection the index is on.
585    pub on_id: GlobalId,
586    /// Expressions to be arranged, in order of decreasing primacy.
587    pub key: Vec<MirScalarExpr>,
588}
589
590/// Information about an imported index, and how it will be used by the dataflow.
591#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
592pub struct IndexImport {
593    /// Description of index.
594    pub desc: IndexDesc,
595    /// Schema and keys of the object the index is on.
596    pub typ: SqlRelationType,
597    /// Whether the index will supply monotonic data.
598    pub monotonic: bool,
599}
600
601/// An association of a global identifier to an expression.
602#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
603pub struct BuildDesc<P> {
604    /// TODO(database-issues#7533): Add documentation.
605    pub id: GlobalId,
606    /// TODO(database-issues#7533): Add documentation.
607    pub plan: P,
608}