mz_compute_types/dataflows.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types for describing dataflows.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14
15use mz_expr::{CollectionPlan, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr};
16use mz_ore::collections::CollectionExt;
17use mz_ore::soft_assert_or_log;
18use mz_repr::refresh_schedule::RefreshSchedule;
19use mz_repr::{GlobalId, ReprRelationType, SqlRelationType, Timestamp};
20use mz_storage_types::time_dependence::TimeDependence;
21use serde::{Deserialize, Serialize};
22use timely::progress::Antichain;
23
24use crate::plan::Plan;
25use crate::plan::render_plan::RenderPlan;
26use crate::sinks::{ComputeSinkConnection, ComputeSinkDesc};
27use crate::sources::{SourceInstanceArguments, SourceInstanceDesc};
28
29/// A description of a dataflow to construct and results to surface.
30#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
31pub struct DataflowDescription<P, S: 'static = ()> {
32 /// Sources instantiations made available to the dataflow pair with monotonicity information.
33 pub source_imports: BTreeMap<GlobalId, SourceImport<S>>,
34 /// Indexes made available to the dataflow.
35 /// (id of index, import)
36 pub index_imports: BTreeMap<GlobalId, IndexImport>,
37 /// Views and indexes to be built and stored in the local context.
38 /// Objects must be built in the specific order, as there may be
39 /// dependencies of later objects on prior identifiers.
40 pub objects_to_build: Vec<BuildDesc<P>>,
41 /// Indexes to be made available to be shared with other dataflows
42 /// (id of new index, description of index, relationtype of base source/view/table)
43 pub index_exports: BTreeMap<GlobalId, (IndexDesc, ReprRelationType)>,
44 /// sinks to be created
45 /// (id of new sink, description of sink)
46 pub sink_exports: BTreeMap<GlobalId, ComputeSinkDesc<S>>,
47 /// An optional frontier to which inputs should be advanced.
48 ///
49 /// If this is set, it should override the default setting determined by
50 /// the upper bound of `since` frontiers contributing to the dataflow.
51 /// It is an error for this to be set to a frontier not beyond that default.
52 pub as_of: Option<Antichain<Timestamp>>,
53 /// Frontier beyond which the dataflow should not execute.
54 /// Specifically, updates at times greater or equal to this frontier are suppressed.
55 /// This is often set to `as_of + 1` to enable "batch" computations.
56 /// Note that frontier advancements might still happen to times that are after the `until`,
57 /// only data is suppressed. (This is consistent with how frontier advancements can also
58 /// happen before the `as_of`.)
59 pub until: Antichain<Timestamp>,
60 /// The initial as_of when the collection is first created. Filled only for materialized views.
61 /// Note that this doesn't change upon restarts.
62 pub initial_storage_as_of: Option<Antichain<Timestamp>>,
63 /// The schedule of REFRESH materialized views.
64 pub refresh_schedule: Option<RefreshSchedule>,
65 /// Human-readable name
66 pub debug_name: String,
67 /// Description of how the dataflow's progress relates to wall-clock time. None for unknown.
68 pub time_dependence: Option<TimeDependence>,
69}
70
71impl<P, S> DataflowDescription<P, S> {
72 /// Tests if the dataflow refers to a single timestamp, namely
73 /// that `as_of` has a single coordinate and that the `until`
74 /// value corresponds to the `as_of` value plus one, or `as_of`
75 /// is the maximum timestamp and is thus single.
76 pub fn is_single_time(&self) -> bool {
77 // TODO: this would be much easier to check if `until` was a strict lower bound,
78 // and we would be testing that `until == as_of`.
79
80 let until = &self.until;
81
82 // IF `as_of` is not set at all this can't be a single time dataflow.
83 let Some(as_of) = self.as_of.as_ref() else {
84 return false;
85 };
86 // Ensure that as_of <= until.
87 soft_assert_or_log!(
88 timely::PartialOrder::less_equal(as_of, until),
89 "expected empty `as_of ≤ until`, got `{as_of:?} ≰ {until:?}`",
90 );
91 // IF `as_of` is not a single timestamp this can't be a single time dataflow.
92 let Some(as_of) = as_of.as_option() else {
93 return false;
94 };
95 // Ensure that `as_of = MAX` implies `until.is_empty()`.
96 soft_assert_or_log!(
97 as_of != &mz_repr::Timestamp::MAX || until.is_empty(),
98 "expected `until = {{}}` due to `as_of = MAX`, got `until = {until:?}`",
99 );
100 // Note that the `(as_of = MAX, until = {})` case also returns `true`
101 // here (as expected) since we are going to compare two `None` values.
102 as_of.try_step_forward().as_ref() == until.as_option()
103 }
104}
105
106impl DataflowDescription<Plan, ()> {
107 /// Check invariants expected to be true about `DataflowDescription`s.
108 pub fn check_invariants(&self) -> Result<(), String> {
109 let mut plans: Vec<_> = self.objects_to_build.iter().map(|o| &o.plan).collect();
110 let mut lir_ids = BTreeSet::new();
111
112 while let Some(plan) = plans.pop() {
113 let lir_id = plan.lir_id;
114 if !lir_ids.insert(lir_id) {
115 return Err(format!(
116 "duplicate `LirId` in `DataflowDescription`: {lir_id}"
117 ));
118 }
119 plans.extend(plan.node.children());
120 }
121
122 Ok(())
123 }
124}
125
126impl DataflowDescription<OptimizedMirRelationExpr, ()> {
127 /// Imports a previously exported index.
128 ///
129 /// This method makes available an index previously exported as `id`, identified
130 /// to the query by `description` (which names the view the index arranges, and
131 /// the keys by which it is arranged).
132 pub fn import_index(
133 &mut self,
134 id: GlobalId,
135 desc: IndexDesc,
136 typ: ReprRelationType,
137 monotonic: bool,
138 ) {
139 self.index_imports.insert(
140 id,
141 IndexImport {
142 desc,
143 typ,
144 monotonic,
145 with_snapshot: true,
146 },
147 );
148 }
149
150 /// Imports a source and makes it available as `id`.
151 pub fn import_source(&mut self, id: GlobalId, typ: SqlRelationType, monotonic: bool) {
152 // Import the source with no linear operators applied to it.
153 // They may be populated by whole-dataflow optimization.
154 // Similarly, we require the snapshot by default, though optimization may choose to skip it.
155 self.source_imports.insert(
156 id,
157 SourceImport {
158 desc: SourceInstanceDesc {
159 storage_metadata: (),
160 arguments: SourceInstanceArguments { operators: None },
161 typ,
162 },
163 monotonic,
164 with_snapshot: true,
165 upper: Antichain::new(),
166 },
167 );
168 }
169
170 /// Binds to `id` the relation expression `plan`.
171 pub fn insert_plan(&mut self, id: GlobalId, plan: OptimizedMirRelationExpr) {
172 self.objects_to_build.push(BuildDesc { id, plan });
173 }
174
175 /// Exports as `id` an index described by `description`.
176 ///
177 /// Future uses of `import_index` in other dataflow descriptions may use `id`,
178 /// as long as this dataflow has not been terminated in the meantime.
179 pub fn export_index(
180 &mut self,
181 id: GlobalId,
182 description: IndexDesc,
183 on_type: ReprRelationType,
184 ) {
185 // We first create a "view" named `id` that ensures that the
186 // data are correctly arranged and available for export.
187 self.insert_plan(
188 id,
189 OptimizedMirRelationExpr::declare_optimized(MirRelationExpr::ArrangeBy {
190 input: Box::new(MirRelationExpr::global_get(
191 description.on_id,
192 on_type.clone(),
193 )),
194 keys: vec![description.key.clone()],
195 }),
196 );
197 self.index_exports.insert(id, (description, on_type));
198 }
199
200 /// Exports as `id` a sink described by `description`.
201 pub fn export_sink(&mut self, id: GlobalId, description: ComputeSinkDesc<()>) {
202 self.sink_exports.insert(id, description);
203 }
204
205 /// Returns true iff `id` is already imported.
206 pub fn is_imported(&self, id: &GlobalId) -> bool {
207 self.objects_to_build.iter().any(|bd| &bd.id == id)
208 || self.index_imports.keys().any(|i| i == id)
209 || self.source_imports.keys().any(|i| i == id)
210 }
211
212 /// The number of columns associated with an identifier in the dataflow.
213 pub fn arity_of(&self, id: &GlobalId) -> usize {
214 for (source_id, source_import) in self.source_imports.iter() {
215 let source = &source_import.desc;
216 if source_id == id {
217 return source.typ.arity();
218 }
219 }
220 for IndexImport { desc, typ, .. } in self.index_imports.values() {
221 if &desc.on_id == id {
222 return typ.arity();
223 }
224 }
225 for desc in self.objects_to_build.iter() {
226 if &desc.id == id {
227 return desc.plan.arity();
228 }
229 }
230 panic!("GlobalId {} not found in DataflowDesc", id);
231 }
232
233 /// Calls r and s on any sub-members of those types in self. Halts at the first error return.
234 pub fn visit_children<R, S, E>(&mut self, r: R, s: S) -> Result<(), E>
235 where
236 R: Fn(&mut OptimizedMirRelationExpr) -> Result<(), E>,
237 S: Fn(&mut MirScalarExpr) -> Result<(), E>,
238 {
239 for BuildDesc { plan, .. } in &mut self.objects_to_build {
240 r(plan)?;
241 }
242 for source_import in self.source_imports.values_mut() {
243 let Some(mfp) = source_import.desc.arguments.operators.as_mut() else {
244 continue;
245 };
246 for expr in mfp.expressions.iter_mut() {
247 s(expr)?;
248 }
249 for (_, expr) in mfp.predicates.iter_mut() {
250 s(expr)?;
251 }
252 }
253 Ok(())
254 }
255}
256
257impl<P, S> DataflowDescription<P, S> {
258 /// Creates a new dataflow description with a human-readable name.
259 pub fn new(name: String) -> Self {
260 Self {
261 source_imports: Default::default(),
262 index_imports: Default::default(),
263 objects_to_build: Vec::new(),
264 index_exports: Default::default(),
265 sink_exports: Default::default(),
266 as_of: Default::default(),
267 until: Antichain::new(),
268 initial_storage_as_of: None,
269 refresh_schedule: None,
270 debug_name: name,
271 time_dependence: None,
272 }
273 }
274
275 /// Sets the `as_of` frontier to the supplied argument.
276 ///
277 /// This method allows the dataflow to indicate a frontier up through
278 /// which all times should be advanced. This can be done for at least
279 /// two reasons: 1. correctness and 2. performance.
280 ///
281 /// Correctness may require an `as_of` to ensure that historical detail
282 /// is consolidated at representative times that do not present specific
283 /// detail that is not specifically correct. For example, updates may be
284 /// compacted to times that are no longer the source times, but instead
285 /// some byproduct of when compaction was executed; we should not present
286 /// those specific times as meaningfully different from other equivalent
287 /// times.
288 ///
289 /// Performance may benefit from an aggressive `as_of` as it reduces the
290 /// number of distinct moments at which collections vary. Differential
291 /// dataflow will refresh its outputs at each time its inputs change and
292 /// to moderate that we can minimize the volume of distinct input times
293 /// as much as possible.
294 ///
295 /// Generally, one should consider setting `as_of` at least to the `since`
296 /// frontiers of contributing data sources and as aggressively as the
297 /// computation permits.
298 pub fn set_as_of(&mut self, as_of: Antichain<Timestamp>) {
299 self.as_of = Some(as_of);
300 }
301
302 /// Records the initial `as_of` of the storage collection associated with a materialized view.
303 pub fn set_initial_as_of(&mut self, initial_as_of: Antichain<Timestamp>) {
304 self.initial_storage_as_of = Some(initial_as_of);
305 }
306
307 /// Identifiers of imported objects (indexes and sources).
308 pub fn import_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
309 self.imported_index_ids().chain(self.imported_source_ids())
310 }
311
312 /// Identifiers of imported indexes.
313 pub fn imported_index_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
314 self.index_imports.keys().copied()
315 }
316
317 /// Identifiers of imported sources.
318 pub fn imported_source_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
319 self.source_imports.keys().copied()
320 }
321
322 /// Identifiers of exported objects (indexes and sinks).
323 pub fn export_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
324 self.exported_index_ids().chain(self.exported_sink_ids())
325 }
326
327 /// Identifiers of exported indexes.
328 pub fn exported_index_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
329 self.index_exports.keys().copied()
330 }
331
332 /// Identifiers of exported sinks.
333 pub fn exported_sink_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
334 self.sink_exports.keys().copied()
335 }
336
337 /// Identifiers of exported persist sinks.
338 pub fn persist_sink_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
339 self.sink_exports
340 .iter()
341 .filter_map(|(id, desc)| match desc.connection {
342 ComputeSinkConnection::MaterializedView(_) => Some(*id),
343 _ => None,
344 })
345 }
346
347 /// Identifiers of exported subscribe sinks.
348 pub fn subscribe_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
349 self.sink_exports
350 .iter()
351 .filter_map(|(id, desc)| match desc.connection {
352 ComputeSinkConnection::Subscribe(_) => Some(*id),
353 _ => None,
354 })
355 }
356
357 /// Identifiers of exported copy to sinks.
358 pub fn copy_to_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
359 self.sink_exports
360 .iter()
361 .filter_map(|(id, desc)| match desc.connection {
362 ComputeSinkConnection::CopyToS3Oneshot(_) => Some(*id),
363 _ => None,
364 })
365 }
366
367 /// Produce a `Display`able value containing the import IDs of this dataflow.
368 pub fn display_import_ids(&self) -> impl fmt::Display + '_ {
369 use mz_ore::str::{bracketed, separated};
370 bracketed("[", "]", separated(", ", self.import_ids()))
371 }
372
373 /// Produce a `Display`able value containing the export IDs of this dataflow.
374 pub fn display_export_ids(&self) -> impl fmt::Display + '_ {
375 use mz_ore::str::{bracketed, separated};
376 bracketed("[", "]", separated(", ", self.export_ids()))
377 }
378
379 /// Whether this dataflow installs transient collections.
380 pub fn is_transient(&self) -> bool {
381 self.export_ids().all(|id| id.is_transient())
382 }
383
384 /// Returns the description of the object to build with the specified
385 /// identifier.
386 ///
387 /// # Panics
388 ///
389 /// Panics if `id` is not present in `objects_to_build` exactly once.
390 pub fn build_desc(&self, id: GlobalId) -> &BuildDesc<P> {
391 let mut builds = self.objects_to_build.iter().filter(|build| build.id == id);
392 let build = builds
393 .next()
394 .unwrap_or_else(|| panic!("object to build id {id} unexpectedly missing"));
395 assert!(builds.next().is_none());
396 build
397 }
398
399 /// Returns the id of the dataflow's sink export.
400 ///
401 /// # Panics
402 ///
403 /// Panics if the dataflow has no sink exports or has more than one.
404 pub fn sink_id(&self) -> GlobalId {
405 let sink_exports = &self.sink_exports;
406 let sink_id = sink_exports.keys().into_element();
407 *sink_id
408 }
409}
410
411impl<P, S> DataflowDescription<P, S>
412where
413 P: CollectionPlan,
414{
415 /// Computes the set of identifiers upon which the specified collection
416 /// identifier depends.
417 ///
418 /// `collection_id` must specify a valid object in `objects_to_build`.
419 ///
420 /// This method includes identifiers for e.g. intermediate views, and should be filtered
421 /// if one only wants sources and indexes.
422 ///
423 /// This method is safe for mutually recursive view definitions.
424 pub fn depends_on(&self, collection_id: GlobalId) -> BTreeSet<GlobalId> {
425 let mut out = BTreeSet::new();
426 self.depends_on_into(collection_id, &mut out);
427 out
428 }
429
430 /// Like `depends_on`, but appends to an existing `BTreeSet`.
431 pub fn depends_on_into(&self, collection_id: GlobalId, out: &mut BTreeSet<GlobalId>) {
432 out.insert(collection_id);
433 if self.source_imports.contains_key(&collection_id) {
434 // The collection is provided by an imported source. Report the
435 // dependency on the source.
436 out.insert(collection_id);
437 return;
438 }
439
440 // NOTE(benesch): we're not smart enough here to know *which* index
441 // for the collection will be used, if one exists, so we have to report
442 // the dependency on all of them.
443 let mut found_index = false;
444 for (index_id, IndexImport { desc, .. }) in &self.index_imports {
445 if desc.on_id == collection_id {
446 // The collection is provided by an imported index. Report the
447 // dependency on the index.
448 out.insert(*index_id);
449 found_index = true;
450 }
451 }
452 if found_index {
453 return;
454 }
455
456 // The collection is not provided by a source or imported index.
457 // It must be a collection whose plan we have handy. Recurse.
458 let build = self.build_desc(collection_id);
459 for id in build.plan.depends_on() {
460 if !out.contains(&id) {
461 self.depends_on_into(id, out)
462 }
463 }
464 }
465
466 /// Computes the set of imports upon which the specified collection depends.
467 ///
468 /// This method behaves like `depends_on` but filters out internal dependencies that are not
469 /// included in the dataflow imports.
470 pub fn depends_on_imports(&self, collection_id: GlobalId) -> BTreeSet<GlobalId> {
471 let is_import = |id: &GlobalId| {
472 self.source_imports.contains_key(id) || self.index_imports.contains_key(id)
473 };
474
475 let deps = self.depends_on(collection_id);
476 deps.into_iter().filter(is_import).collect()
477 }
478}
479
480impl<S> DataflowDescription<RenderPlan, S>
481where
482 S: Clone + PartialEq,
483{
484 /// Determine if a dataflow description is compatible with this dataflow description.
485 ///
486 /// Compatible dataflows have structurally equal exports, imports, and objects to build. The
487 /// `as_of` of the receiver has to be less equal the `other` `as_of`.
488 ///
489 /// Note that this method performs normalization as part of the structural equality checking,
490 /// which involves cloning both `self` and `other`. It is therefore relatively expensive and
491 /// should only be used on cold code paths.
492 ///
493 // TODO: The semantics of this function are only useful for command reconciliation at the moment.
494 pub fn compatible_with(&self, other: &Self) -> bool {
495 let old = self.as_comparable();
496 let new = other.as_comparable();
497
498 let equality = old.index_exports == new.index_exports
499 && old.sink_exports == new.sink_exports
500 && old.objects_to_build == new.objects_to_build
501 && old.index_imports == new.index_imports
502 && old.source_imports == new.source_imports
503 && old.time_dependence == new.time_dependence;
504
505 let partial = if let (Some(old_as_of), Some(new_as_of)) = (&old.as_of, &new.as_of) {
506 timely::PartialOrder::less_equal(old_as_of, new_as_of)
507 } else {
508 false
509 };
510
511 equality && partial
512 }
513
514 /// Returns a `DataflowDescription` that has the same structure as `self` and can be
515 /// structurally compared to other `DataflowDescription`s.
516 ///
517 /// The function normalizes several properties. It replaces transient `GlobalId`s
518 /// that are only used internally (i.e. not imported nor exported) with consecutive IDs
519 /// starting from `t1`. It replaces the source import's `upper` by a dummy value.
520 fn as_comparable(&self) -> Self {
521 let external_ids: BTreeSet<_> = self.import_ids().chain(self.export_ids()).collect();
522
523 let mut id_counter = 0;
524 let mut replacements = BTreeMap::new();
525
526 let mut maybe_replace = |id: GlobalId| {
527 if id.is_transient() && !external_ids.contains(&id) {
528 *replacements.entry(id).or_insert_with(|| {
529 id_counter += 1;
530 GlobalId::Transient(id_counter)
531 })
532 } else {
533 id
534 }
535 };
536
537 let mut source_imports = self.source_imports.clone();
538 for import in source_imports.values_mut() {
539 import.upper = Antichain::new();
540 }
541
542 let mut objects_to_build = self.objects_to_build.clone();
543 for object in &mut objects_to_build {
544 object.id = maybe_replace(object.id);
545 object.plan.replace_ids(&mut maybe_replace);
546 }
547
548 let mut index_exports = self.index_exports.clone();
549 for (desc, _typ) in index_exports.values_mut() {
550 desc.on_id = maybe_replace(desc.on_id);
551 }
552
553 let mut sink_exports = self.sink_exports.clone();
554 for desc in sink_exports.values_mut() {
555 desc.from = maybe_replace(desc.from);
556 }
557
558 DataflowDescription {
559 source_imports,
560 index_imports: self.index_imports.clone(),
561 objects_to_build,
562 index_exports,
563 sink_exports,
564 as_of: self.as_of.clone(),
565 until: self.until.clone(),
566 initial_storage_as_of: self.initial_storage_as_of.clone(),
567 refresh_schedule: self.refresh_schedule.clone(),
568 debug_name: self.debug_name.clone(),
569 time_dependence: self.time_dependence.clone(),
570 }
571 }
572}
573
574/// A commonly used name for dataflows contain MIR expressions.
575pub type DataflowDesc = DataflowDescription<OptimizedMirRelationExpr, ()>;
576
577/// An index storing processed updates so they can be queried
578/// or reused in other computations
579#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
580pub struct IndexDesc {
581 /// Identity of the collection the index is on.
582 pub on_id: GlobalId,
583 /// Expressions to be arranged, in order of decreasing primacy.
584 pub key: Vec<MirScalarExpr>,
585}
586
587/// Information about an imported index, and how it will be used by the dataflow.
588#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
589pub struct IndexImport {
590 /// Description of index.
591 pub desc: IndexDesc,
592 /// Schema and keys of the object the index is on.
593 pub typ: ReprRelationType,
594 /// Whether the index will supply monotonic data.
595 pub monotonic: bool,
596 /// Whether this import must include the snapshot data.
597 pub with_snapshot: bool,
598}
599
600/// Information about an imported source, and how it will be used by the dataflow.
601#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
602pub struct SourceImport<S: 'static = ()> {
603 /// Description of the source instance to import.
604 pub desc: SourceInstanceDesc<S>,
605 /// Whether the source will supply monotonic data.
606 pub monotonic: bool,
607 /// Whether this import must include the snapshot data.
608 pub with_snapshot: bool,
609 /// The initial known upper frontier for the source.
610 pub upper: Antichain<Timestamp>,
611}
612
613/// An association of a global identifier to an expression.
614#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
615pub struct BuildDesc<P> {
616 /// TODO(database-issues#7533): Add documentation.
617 pub id: GlobalId,
618 /// TODO(database-issues#7533): Add documentation.
619 pub plan: P,
620}