mz_compute_types/
dataflows.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types for describing dataflows.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14
15use mz_expr::{CollectionPlan, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr};
16use mz_ore::soft_assert_or_log;
17use mz_repr::refresh_schedule::RefreshSchedule;
18use mz_repr::{GlobalId, SqlRelationType};
19use mz_storage_types::time_dependence::TimeDependence;
20use serde::{Deserialize, Serialize};
21use timely::progress::Antichain;
22
23use crate::plan::Plan;
24use crate::plan::render_plan::RenderPlan;
25use crate::sinks::{ComputeSinkConnection, ComputeSinkDesc};
26use crate::sources::{SourceInstanceArguments, SourceInstanceDesc};
27
28/// A description of a dataflow to construct and results to surface.
29#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
30pub struct DataflowDescription<P, S: 'static = (), T = mz_repr::Timestamp> {
31    /// Sources instantiations made available to the dataflow pair with monotonicity information.
32    pub source_imports: BTreeMap<GlobalId, (SourceInstanceDesc<S>, bool, Antichain<T>)>,
33    /// Indexes made available to the dataflow.
34    /// (id of index, import)
35    pub index_imports: BTreeMap<GlobalId, IndexImport>,
36    /// Views and indexes to be built and stored in the local context.
37    /// Objects must be built in the specific order, as there may be
38    /// dependencies of later objects on prior identifiers.
39    pub objects_to_build: Vec<BuildDesc<P>>,
40    /// Indexes to be made available to be shared with other dataflows
41    /// (id of new index, description of index, relationtype of base source/view/table)
42    pub index_exports: BTreeMap<GlobalId, (IndexDesc, SqlRelationType)>,
43    /// sinks to be created
44    /// (id of new sink, description of sink)
45    pub sink_exports: BTreeMap<GlobalId, ComputeSinkDesc<S, T>>,
46    /// An optional frontier to which inputs should be advanced.
47    ///
48    /// If this is set, it should override the default setting determined by
49    /// the upper bound of `since` frontiers contributing to the dataflow.
50    /// It is an error for this to be set to a frontier not beyond that default.
51    pub as_of: Option<Antichain<T>>,
52    /// Frontier beyond which the dataflow should not execute.
53    /// Specifically, updates at times greater or equal to this frontier are suppressed.
54    /// This is often set to `as_of + 1` to enable "batch" computations.
55    /// Note that frontier advancements might still happen to times that are after the `until`,
56    /// only data is suppressed. (This is consistent with how frontier advancements can also
57    /// happen before the `as_of`.)
58    pub until: Antichain<T>,
59    /// The initial as_of when the collection is first created. Filled only for materialized views.
60    /// Note that this doesn't change upon restarts.
61    pub initial_storage_as_of: Option<Antichain<T>>,
62    /// The schedule of REFRESH materialized views.
63    pub refresh_schedule: Option<RefreshSchedule>,
64    /// Human-readable name
65    pub debug_name: String,
66    /// Description of how the dataflow's progress relates to wall-clock time. None for unknown.
67    pub time_dependence: Option<TimeDependence>,
68}
69
70impl<P, S> DataflowDescription<P, S, mz_repr::Timestamp> {
71    /// Tests if the dataflow refers to a single timestamp, namely
72    /// that `as_of` has a single coordinate and that the `until`
73    /// value corresponds to the `as_of` value plus one, or `as_of`
74    /// is the maximum timestamp and is thus single.
75    pub fn is_single_time(&self) -> bool {
76        // TODO: this would be much easier to check if `until` was a strict lower bound,
77        // and we would be testing that `until == as_of`.
78
79        let until = &self.until;
80
81        // IF `as_of` is not set at all this can't be a single time dataflow.
82        let Some(as_of) = self.as_of.as_ref() else {
83            return false;
84        };
85        // Ensure that as_of <= until.
86        soft_assert_or_log!(
87            timely::PartialOrder::less_equal(as_of, until),
88            "expected empty `as_of ≤ until`, got `{as_of:?} ≰ {until:?}`",
89        );
90        // IF `as_of` is not a single timestamp this can't be a single time dataflow.
91        let Some(as_of) = as_of.as_option() else {
92            return false;
93        };
94        // Ensure that `as_of = MAX` implies `until.is_empty()`.
95        soft_assert_or_log!(
96            as_of != &mz_repr::Timestamp::MAX || until.is_empty(),
97            "expected `until = {{}}` due to `as_of = MAX`, got `until = {until:?}`",
98        );
99        // Note that the `(as_of = MAX, until = {})` case also returns `true`
100        // here (as expected) since we are going to compare two `None` values.
101        as_of.try_step_forward().as_ref() == until.as_option()
102    }
103}
104
105impl<T> DataflowDescription<Plan<T>, (), mz_repr::Timestamp> {
106    /// Check invariants expected to be true about `DataflowDescription`s.
107    pub fn check_invariants(&self) -> Result<(), String> {
108        let mut plans: Vec<_> = self.objects_to_build.iter().map(|o| &o.plan).collect();
109        let mut lir_ids = BTreeSet::new();
110
111        while let Some(plan) = plans.pop() {
112            let lir_id = plan.lir_id;
113            if !lir_ids.insert(lir_id) {
114                return Err(format!(
115                    "duplicate `LirId` in `DataflowDescription`: {lir_id}"
116                ));
117            }
118            plans.extend(plan.node.children());
119        }
120
121        Ok(())
122    }
123}
124
125impl<T> DataflowDescription<OptimizedMirRelationExpr, (), T> {
126    /// Imports a previously exported index.
127    ///
128    /// This method makes available an index previously exported as `id`, identified
129    /// to the query by `description` (which names the view the index arranges, and
130    /// the keys by which it is arranged).
131    pub fn import_index(
132        &mut self,
133        id: GlobalId,
134        desc: IndexDesc,
135        typ: SqlRelationType,
136        monotonic: bool,
137    ) {
138        self.index_imports.insert(
139            id,
140            IndexImport {
141                desc,
142                typ,
143                monotonic,
144            },
145        );
146    }
147
148    /// Imports a source and makes it available as `id`.
149    pub fn import_source(&mut self, id: GlobalId, typ: SqlRelationType, monotonic: bool) {
150        // Import the source with no linear operators applied to it.
151        // They may be populated by whole-dataflow optimization.
152        self.source_imports.insert(
153            id,
154            (
155                SourceInstanceDesc {
156                    storage_metadata: (),
157                    arguments: SourceInstanceArguments { operators: None },
158                    typ,
159                },
160                monotonic,
161                Antichain::new(),
162            ),
163        );
164    }
165
166    /// Binds to `id` the relation expression `plan`.
167    pub fn insert_plan(&mut self, id: GlobalId, plan: OptimizedMirRelationExpr) {
168        self.objects_to_build.push(BuildDesc { id, plan });
169    }
170
171    /// Exports as `id` an index described by `description`.
172    ///
173    /// Future uses of `import_index` in other dataflow descriptions may use `id`,
174    /// as long as this dataflow has not been terminated in the meantime.
175    pub fn export_index(&mut self, id: GlobalId, description: IndexDesc, on_type: SqlRelationType) {
176        // We first create a "view" named `id` that ensures that the
177        // data are correctly arranged and available for export.
178        self.insert_plan(
179            id,
180            OptimizedMirRelationExpr::declare_optimized(MirRelationExpr::ArrangeBy {
181                input: Box::new(MirRelationExpr::global_get(
182                    description.on_id,
183                    on_type.clone(),
184                )),
185                keys: vec![description.key.clone()],
186            }),
187        );
188        self.index_exports.insert(id, (description, on_type));
189    }
190
191    /// Exports as `id` a sink described by `description`.
192    pub fn export_sink(&mut self, id: GlobalId, description: ComputeSinkDesc<(), T>) {
193        self.sink_exports.insert(id, description);
194    }
195
196    /// Returns true iff `id` is already imported.
197    pub fn is_imported(&self, id: &GlobalId) -> bool {
198        self.objects_to_build.iter().any(|bd| &bd.id == id)
199            || self.index_imports.keys().any(|i| i == id)
200            || self.source_imports.keys().any(|i| i == id)
201    }
202
203    /// The number of columns associated with an identifier in the dataflow.
204    pub fn arity_of(&self, id: &GlobalId) -> usize {
205        for (source_id, (source, _monotonic, _upper)) in self.source_imports.iter() {
206            if source_id == id {
207                return source.typ.arity();
208            }
209        }
210        for IndexImport { desc, typ, .. } in self.index_imports.values() {
211            if &desc.on_id == id {
212                return typ.arity();
213            }
214        }
215        for desc in self.objects_to_build.iter() {
216            if &desc.id == id {
217                return desc.plan.arity();
218            }
219        }
220        panic!("GlobalId {} not found in DataflowDesc", id);
221    }
222
223    /// Calls r and s on any sub-members of those types in self. Halts at the first error return.
224    pub fn visit_children<R, S, E>(&mut self, r: R, s: S) -> Result<(), E>
225    where
226        R: Fn(&mut OptimizedMirRelationExpr) -> Result<(), E>,
227        S: Fn(&mut MirScalarExpr) -> Result<(), E>,
228    {
229        for BuildDesc { plan, .. } in &mut self.objects_to_build {
230            r(plan)?;
231        }
232        for (source_instance_desc, _, _upper) in self.source_imports.values_mut() {
233            let Some(mfp) = source_instance_desc.arguments.operators.as_mut() else {
234                continue;
235            };
236            for expr in mfp.expressions.iter_mut() {
237                s(expr)?;
238            }
239            for (_, expr) in mfp.predicates.iter_mut() {
240                s(expr)?;
241            }
242        }
243        Ok(())
244    }
245}
246
247impl<P, S, T> DataflowDescription<P, S, T> {
248    /// Creates a new dataflow description with a human-readable name.
249    pub fn new(name: String) -> Self {
250        Self {
251            source_imports: Default::default(),
252            index_imports: Default::default(),
253            objects_to_build: Vec::new(),
254            index_exports: Default::default(),
255            sink_exports: Default::default(),
256            as_of: Default::default(),
257            until: Antichain::new(),
258            initial_storage_as_of: None,
259            refresh_schedule: None,
260            debug_name: name,
261            time_dependence: None,
262        }
263    }
264
265    /// Sets the `as_of` frontier to the supplied argument.
266    ///
267    /// This method allows the dataflow to indicate a frontier up through
268    /// which all times should be advanced. This can be done for at least
269    /// two reasons: 1. correctness and 2. performance.
270    ///
271    /// Correctness may require an `as_of` to ensure that historical detail
272    /// is consolidated at representative times that do not present specific
273    /// detail that is not specifically correct. For example, updates may be
274    /// compacted to times that are no longer the source times, but instead
275    /// some byproduct of when compaction was executed; we should not present
276    /// those specific times as meaningfully different from other equivalent
277    /// times.
278    ///
279    /// Performance may benefit from an aggressive `as_of` as it reduces the
280    /// number of distinct moments at which collections vary. Differential
281    /// dataflow will refresh its outputs at each time its inputs change and
282    /// to moderate that we can minimize the volume of distinct input times
283    /// as much as possible.
284    ///
285    /// Generally, one should consider setting `as_of` at least to the `since`
286    /// frontiers of contributing data sources and as aggressively as the
287    /// computation permits.
288    pub fn set_as_of(&mut self, as_of: Antichain<T>) {
289        self.as_of = Some(as_of);
290    }
291
292    /// Records the initial `as_of` of the storage collection associated with a materialized view.
293    pub fn set_initial_as_of(&mut self, initial_as_of: Antichain<T>) {
294        self.initial_storage_as_of = Some(initial_as_of);
295    }
296
297    /// Identifiers of imported objects (indexes and sources).
298    pub fn import_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
299        self.imported_index_ids().chain(self.imported_source_ids())
300    }
301
302    /// Identifiers of imported indexes.
303    pub fn imported_index_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
304        self.index_imports.keys().copied()
305    }
306
307    /// Identifiers of imported sources.
308    pub fn imported_source_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
309        self.source_imports.keys().copied()
310    }
311
312    /// Identifiers of exported objects (indexes and sinks).
313    pub fn export_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
314        self.exported_index_ids().chain(self.exported_sink_ids())
315    }
316
317    /// Identifiers of exported indexes.
318    pub fn exported_index_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
319        self.index_exports.keys().copied()
320    }
321
322    /// Identifiers of exported sinks.
323    pub fn exported_sink_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
324        self.sink_exports.keys().copied()
325    }
326
327    /// Identifiers of exported persist sinks.
328    pub fn persist_sink_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
329        self.sink_exports
330            .iter()
331            .filter_map(|(id, desc)| match desc.connection {
332                ComputeSinkConnection::MaterializedView(_) => Some(*id),
333                ComputeSinkConnection::ContinualTask(_) => Some(*id),
334                _ => None,
335            })
336    }
337
338    /// Identifiers of exported subscribe sinks.
339    pub fn subscribe_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
340        self.sink_exports
341            .iter()
342            .filter_map(|(id, desc)| match desc.connection {
343                ComputeSinkConnection::Subscribe(_) => Some(*id),
344                _ => None,
345            })
346    }
347
348    /// Identifiers of exported continual tasks.
349    pub fn continual_task_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
350        self.sink_exports
351            .iter()
352            .filter_map(|(id, desc)| match desc.connection {
353                ComputeSinkConnection::ContinualTask(_) => Some(*id),
354                _ => None,
355            })
356    }
357
358    /// Identifiers of exported copy to sinks.
359    pub fn copy_to_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
360        self.sink_exports
361            .iter()
362            .filter_map(|(id, desc)| match desc.connection {
363                ComputeSinkConnection::CopyToS3Oneshot(_) => Some(*id),
364                _ => None,
365            })
366    }
367
368    /// Produce a `Display`able value containing the import IDs of this dataflow.
369    pub fn display_import_ids(&self) -> impl fmt::Display + '_ {
370        use mz_ore::str::{bracketed, separated};
371        bracketed("[", "]", separated(", ", self.import_ids()))
372    }
373
374    /// Produce a `Display`able value containing the export IDs of this dataflow.
375    pub fn display_export_ids(&self) -> impl fmt::Display + '_ {
376        use mz_ore::str::{bracketed, separated};
377        bracketed("[", "]", separated(", ", self.export_ids()))
378    }
379
380    /// Whether this dataflow installs transient collections.
381    pub fn is_transient(&self) -> bool {
382        self.export_ids().all(|id| id.is_transient())
383    }
384
385    /// Returns the description of the object to build with the specified
386    /// identifier.
387    ///
388    /// # Panics
389    ///
390    /// Panics if `id` is not present in `objects_to_build` exactly once.
391    pub fn build_desc(&self, id: GlobalId) -> &BuildDesc<P> {
392        let mut builds = self.objects_to_build.iter().filter(|build| build.id == id);
393        let build = builds
394            .next()
395            .unwrap_or_else(|| panic!("object to build id {id} unexpectedly missing"));
396        assert!(builds.next().is_none());
397        build
398    }
399}
400
401impl<P, S, T> DataflowDescription<P, S, T>
402where
403    P: CollectionPlan,
404{
405    /// Computes the set of identifiers upon which the specified collection
406    /// identifier depends.
407    ///
408    /// `collection_id` must specify a valid object in `objects_to_build`.
409    ///
410    /// This method includes identifiers for e.g. intermediate views, and should be filtered
411    /// if one only wants sources and indexes.
412    ///
413    /// This method is safe for mutually recursive view definitions.
414    pub fn depends_on(&self, collection_id: GlobalId) -> BTreeSet<GlobalId> {
415        let mut out = BTreeSet::new();
416        self.depends_on_into(collection_id, &mut out);
417        out
418    }
419
420    /// Like `depends_on`, but appends to an existing `BTreeSet`.
421    pub fn depends_on_into(&self, collection_id: GlobalId, out: &mut BTreeSet<GlobalId>) {
422        out.insert(collection_id);
423        if self.source_imports.contains_key(&collection_id) {
424            // The collection is provided by an imported source. Report the
425            // dependency on the source.
426            out.insert(collection_id);
427            return;
428        }
429
430        // NOTE(benesch): we're not smart enough here to know *which* index
431        // for the collection will be used, if one exists, so we have to report
432        // the dependency on all of them.
433        let mut found_index = false;
434        for (index_id, IndexImport { desc, .. }) in &self.index_imports {
435            if desc.on_id == collection_id {
436                // The collection is provided by an imported index. Report the
437                // dependency on the index.
438                out.insert(*index_id);
439                found_index = true;
440            }
441        }
442        if found_index {
443            return;
444        }
445
446        // The collection is not provided by a source or imported index.
447        // It must be a collection whose plan we have handy. Recurse.
448        let build = self.build_desc(collection_id);
449        for id in build.plan.depends_on() {
450            if !out.contains(&id) {
451                self.depends_on_into(id, out)
452            }
453        }
454    }
455
456    /// Computes the set of imports upon which the specified collection depends.
457    ///
458    /// This method behaves like `depends_on` but filters out internal dependencies that are not
459    /// included in the dataflow imports.
460    pub fn depends_on_imports(&self, collection_id: GlobalId) -> BTreeSet<GlobalId> {
461        let is_import = |id: &GlobalId| {
462            self.source_imports.contains_key(id) || self.index_imports.contains_key(id)
463        };
464
465        let deps = self.depends_on(collection_id);
466        deps.into_iter().filter(is_import).collect()
467    }
468}
469
470impl<S, T> DataflowDescription<RenderPlan, S, T>
471where
472    S: Clone + PartialEq,
473    T: Clone + timely::PartialOrder,
474{
475    /// Determine if a dataflow description is compatible with this dataflow description.
476    ///
477    /// Compatible dataflows have structurally equal exports, imports, and objects to build. The
478    /// `as_of` of the receiver has to be less equal the `other` `as_of`.
479    ///
480    /// Note that this method performs normalization as part of the structural equality checking,
481    /// which involves cloning both `self` and `other`. It is therefore relatively expensive and
482    /// should only be used on cold code paths.
483    ///
484    // TODO: The semantics of this function are only useful for command reconciliation at the moment.
485    pub fn compatible_with(&self, other: &Self) -> bool {
486        let old = self.as_comparable();
487        let new = other.as_comparable();
488
489        let equality = old.index_exports == new.index_exports
490            && old.sink_exports == new.sink_exports
491            && old.objects_to_build == new.objects_to_build
492            && old.index_imports == new.index_imports
493            && old.source_imports == new.source_imports
494            && old.time_dependence == new.time_dependence;
495
496        let partial = if let (Some(old_as_of), Some(new_as_of)) = (&old.as_of, &new.as_of) {
497            timely::PartialOrder::less_equal(old_as_of, new_as_of)
498        } else {
499            false
500        };
501
502        equality && partial
503    }
504
505    /// Returns a `DataflowDescription` that has the same structure as `self` and can be
506    /// structurally compared to other `DataflowDescription`s.
507    ///
508    /// The function normalizes several properties. It replaces transient `GlobalId`s
509    /// that are only used internally (i.e. not imported nor exported) with consecutive IDs
510    /// starting from `t1`. It replaces the source import's `upper` by a dummy value.
511    fn as_comparable(&self) -> Self {
512        let external_ids: BTreeSet<_> = self.import_ids().chain(self.export_ids()).collect();
513
514        let mut id_counter = 0;
515        let mut replacements = BTreeMap::new();
516
517        let mut maybe_replace = |id: GlobalId| {
518            if id.is_transient() && !external_ids.contains(&id) {
519                *replacements.entry(id).or_insert_with(|| {
520                    id_counter += 1;
521                    GlobalId::Transient(id_counter)
522                })
523            } else {
524                id
525            }
526        };
527
528        let mut source_imports = self.source_imports.clone();
529        for (_source, _monotonic, upper) in source_imports.values_mut() {
530            *upper = Antichain::new();
531        }
532
533        let mut objects_to_build = self.objects_to_build.clone();
534        for object in &mut objects_to_build {
535            object.id = maybe_replace(object.id);
536            object.plan.replace_ids(&mut maybe_replace);
537        }
538
539        let mut index_exports = self.index_exports.clone();
540        for (desc, _typ) in index_exports.values_mut() {
541            desc.on_id = maybe_replace(desc.on_id);
542        }
543
544        let mut sink_exports = self.sink_exports.clone();
545        for desc in sink_exports.values_mut() {
546            desc.from = maybe_replace(desc.from);
547        }
548
549        DataflowDescription {
550            source_imports,
551            index_imports: self.index_imports.clone(),
552            objects_to_build,
553            index_exports,
554            sink_exports,
555            as_of: self.as_of.clone(),
556            until: self.until.clone(),
557            initial_storage_as_of: self.initial_storage_as_of.clone(),
558            refresh_schedule: self.refresh_schedule.clone(),
559            debug_name: self.debug_name.clone(),
560            time_dependence: self.time_dependence.clone(),
561        }
562    }
563}
564
565/// A commonly used name for dataflows contain MIR expressions.
566pub type DataflowDesc = DataflowDescription<OptimizedMirRelationExpr, ()>;
567
568/// An index storing processed updates so they can be queried
569/// or reused in other computations
570#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
571pub struct IndexDesc {
572    /// Identity of the collection the index is on.
573    pub on_id: GlobalId,
574    /// Expressions to be arranged, in order of decreasing primacy.
575    pub key: Vec<MirScalarExpr>,
576}
577
578/// Information about an imported index, and how it will be used by the dataflow.
579#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
580pub struct IndexImport {
581    /// Description of index.
582    pub desc: IndexDesc,
583    /// Schema and keys of the object the index is on.
584    pub typ: SqlRelationType,
585    /// Whether the index will supply monotonic data.
586    pub monotonic: bool,
587}
588
589/// An association of a global identifier to an expression.
590#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
591pub struct BuildDesc<P> {
592    /// TODO(database-issues#7533): Add documentation.
593    pub id: GlobalId,
594    /// TODO(database-issues#7533): Add documentation.
595    pub plan: P,
596}