mz_compute_client/
as_of_selection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for selecting as-ofs of compute dataflows during system initialization.
11//!
12//! The functionality implemented here is invoked by the coordinator during its bootstrap process.
13//! Ideally, it would be part of the controller and transparent to the coordinator, but that's
14//! difficult to reconcile with the current controller API. For now, we still make the coordinator
15//! worry about as-of selection but keep the implementation in a compute crate because it really is
16//! a compute implementation concern.
17//!
18//! The as-of selection process takes a list of `DataflowDescription`s, determines compatible
19//! as-ofs for the compute collections they export, and augments the `DataflowDescription`s with
20//! these as-ofs.
21//!
22//! For each compute collection, the as-of selection process keeps an `AsOfBounds` instance that
23//! tracks a lower and an upper bound for the as-of the collection may get assigned. Throughout the
24//! process, a collection's `AsOfBounds` get repeatedly refined, by increasing the lower bound and
25//! decreasing the upper bound. The final upper bound is then used for the collection as-of. Using
26//! the upper bound maximizes the chances of compute reconciliation being effective, and minimizes
27//! the amount of historical data that must be read from the dataflow sources.
28//!
29//! Refinement of `AsOfBounds` is performed by applying `Constraint`s to collections. A
30//! `Constraint` specifies which bound should be refined to which frontier. A `Constraint` may be
31//! "hard" or "soft", which determines how failure to apply it is handled. Failing to apply a hard
32//! constraint is treated as an error, failing to apply a soft constraint is not. If a constraint
33//! fails to apply, the respective `AsOfBounds` are refined as much as possible (to a single
34//! frontier) and marked as "sealed". Subsequent constraint applications against the sealed bounds
35//! are no-ops. This is done to avoid log noise from repeated constraint application failures.
36//!
37//! Note that failing to apply a hard constraint does not abort the as-of selection process for the
38//! affected collection. Instead the failure is handled gracefully by logging an error and
39//! assigning the collection a best-effort as-of. This is done, rather than panicking or returning
40//! an error and letting the coordinator panic, to ensure the availability of the system. Ideally,
41//! we would instead mark the affected dataflow as failed/poisoned, but such a mechanism doesn't
42//! currently exist.
43//!
44//! The as-of selection process applies constraints in order of importance, because once a
45//! constraint application fails, the respective `AsOfBounds` are sealed and later applications
46//! won't have any effect. This means hard constraints must be applied before soft constraints, and
47//! more desirable soft constraints should be applied before less desirable ones.
48//!
49//! # `AsOfBounds` Invariants
50//!
51//! Correctness requires two invariants of `AsOfBounds` of dependent collections:
52//!
53//!  (1) The lower bound of a collection is >= the lower bound of each of its inputs.
54//!  (2) The upper bound of a collection is >= the upper bound of each of its inputs.
55//!
56//! Each step of the as-of selection process needs to ensure that these invariants are upheld once
57//! it completes. The expectation is that each step (a) performs local changes to either the
58//! `lower` _or_ the `upper` bounds of some collections and (b) invokes the appropriate
59//! `propagate_bounds_*` method to restore the invariant broken by (a).
60//!
61//! For steps that behave as described in (a), we can prove that (b) will always succeed in
62//! applying the bounds propagation constraints:
63//!
64//! | Let `A` and `B` be any pair of collections where `A` is an input of `B`.
65//! | Before (a), both invariants are upheld, i.e. `A.lower <= B.lower` and `A.upper <= B.upper`.
66//! |
67//! | Case 1: (a) increases `A.lower` and/or `B.lower` to `A.lower'` and `B.lower'`
68//! |     Invariant (1) might be broken, need to prove that it can be restored.
69//! |     Case 1.a: `A.lower' <= B.lower'`
70//! |         Invariant (1) is still upheld without propagation.
71//! |     Case 1.b: `A.lower' > B.lower'`
72//! |         A collection's lower bound can only be increased up to its upper bound.
73//! |         Therefore, and from invariant (2): `A.lower' <= A.upper <= B.upper`
74//! |         Therefore, propagation can set `B.lower' = A.lower'`, restoring invariant (1).
75//! | Case 2: (a) decreases `A.upper` and/or `B.upper`
76//! |     Invariant (2) might be broken, need to prove that it can be restored.
77//! |     The proof is equivalent to Case 1.
78
79use std::cell::RefCell;
80use std::collections::BTreeMap;
81use std::fmt;
82use std::rc::Rc;
83
84use mz_compute_types::dataflows::DataflowDescription;
85use mz_compute_types::plan::Plan;
86use mz_ore::collections::CollectionExt;
87use mz_ore::soft_panic_or_log;
88use mz_repr::{GlobalId, TimestampManipulation};
89use mz_storage_client::storage_collections::StorageCollections;
90use mz_storage_types::read_holds::ReadHold;
91use mz_storage_types::read_policy::ReadPolicy;
92use timely::PartialOrder;
93use timely::progress::{Antichain, Timestamp};
94use tracing::info;
95
96/// Runs as-of selection for the given dataflows.
97///
98/// Assigns the selected as-of to the provided dataflow descriptions and returns a set of
99/// `ReadHold`s that must not be dropped nor downgraded until the dataflows have been installed
100/// with the compute controller.
101pub fn run<T: TimestampManipulation>(
102    dataflows: &mut [DataflowDescription<Plan<T>, (), T>],
103    read_policies: &BTreeMap<GlobalId, ReadPolicy<T>>,
104    storage_collections: &dyn StorageCollections<Timestamp = T>,
105    current_time: T,
106) -> BTreeMap<GlobalId, ReadHold<T>> {
107    // Get read holds for the storage inputs of the dataflows.
108    // This ensures that storage frontiers don't advance past the selected as-ofs.
109    let mut storage_read_holds = BTreeMap::new();
110    for dataflow in &*dataflows {
111        for id in dataflow.source_imports.keys() {
112            if !storage_read_holds.contains_key(id) {
113                let read_hold = storage_collections
114                    .acquire_read_holds(vec![*id])
115                    .expect("storage collection exists")
116                    .into_element();
117                storage_read_holds.insert(*id, read_hold);
118            }
119        }
120    }
121
122    let mut ctx = Context::new(dataflows, storage_collections, read_policies, current_time);
123
124    // Dataflows that sink into a storage collection that has advanced to the empty frontier don't
125    // need to be installed at all. So we can apply an optimization where we prune them here and
126    // assign them an empty as-of at the end.
127    ctx.prune_sealed_persist_sinks();
128
129    // Apply hard constraints from upstream and downstream storage collections.
130    ctx.apply_upstream_storage_constraints(&storage_read_holds);
131    ctx.apply_downstream_storage_constraints();
132
133    // At this point all collections have as-of bounds that reflect what is required for
134    // correctness. The current state isn't very usable though. In particular, most of the upper
135    // bounds are likely to be the empty frontier, so if we'd select as-ofs on this basis, the
136    // resulting dataflows would never hydrate. Instead we'll apply a number of soft constraints to
137    // end up in a better place.
138
139    // Constrain collection as-ofs to times that are currently available in the inputs. This
140    // ensures that dataflows can immediately start hydrating. It also ensures that dataflows don't
141    // get an empty as-of, except when they exclusively depend on constant collections.
142    ctx.apply_warmup_constraints();
143
144    // Constrain as-ofs of indexes according to their read policies.
145    ctx.apply_index_read_policy_constraints();
146
147    // Constrain as-ofs of indexes to the current time. This ensures that indexes are immediately
148    // readable.
149    ctx.apply_index_current_time_constraints();
150
151    // Apply the derived as-of bounds to the dataflows.
152    for dataflow in dataflows {
153        // `AsOfBounds` are shared between the exports of a dataflow, so looking at just the first
154        // export is sufficient.
155        let first_export = dataflow.export_ids().next();
156        let as_of = first_export.map_or_else(Antichain::new, |id| ctx.best_as_of(id));
157        dataflow.as_of = Some(as_of);
158    }
159
160    storage_read_holds
161}
162
163/// Bounds for possible as-of values of a dataflow.
164#[derive(Debug)]
165struct AsOfBounds<T> {
166    lower: Antichain<T>,
167    upper: Antichain<T>,
168    /// Whether these bounds can still change.
169    sealed: bool,
170}
171
172impl<T: Clone> AsOfBounds<T> {
173    /// Creates an `AsOfBounds` that only allows the given `frontier`.
174    fn single(frontier: Antichain<T>) -> Self {
175        Self {
176            lower: frontier.clone(),
177            upper: frontier,
178            sealed: false,
179        }
180    }
181
182    /// Get the bound of the given type.
183    fn get(&self, type_: BoundType) -> &Antichain<T> {
184        match type_ {
185            BoundType::Lower => &self.lower,
186            BoundType::Upper => &self.upper,
187        }
188    }
189}
190
191impl<T: Timestamp> Default for AsOfBounds<T> {
192    fn default() -> Self {
193        Self {
194            lower: Antichain::from_elem(T::minimum()),
195            upper: Antichain::new(),
196            sealed: false,
197        }
198    }
199}
200
201impl<T: fmt::Debug> fmt::Display for AsOfBounds<T> {
202    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
203        write!(
204            f,
205            "[{:?} .. {:?}]",
206            self.lower.elements(),
207            self.upper.elements()
208        )
209    }
210}
211
212/// Types of bounds.
213#[derive(Clone, Copy, Debug, PartialEq, Eq)]
214enum BoundType {
215    Lower,
216    Upper,
217}
218
219impl fmt::Display for BoundType {
220    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221        match self {
222            Self::Lower => f.write_str("lower"),
223            Self::Upper => f.write_str("upper"),
224        }
225    }
226}
227
228/// Types of constraints.
229#[derive(Clone, Copy, Debug, PartialEq, Eq)]
230enum ConstraintType {
231    /// Hard constraints are applied to enforce correctness properties, and failing to apply them is
232    /// an error.
233    Hard,
234    /// Soft constraints are applied to improve performance or UX, and failing to apply them is
235    /// undesirable but not an error.
236    Soft,
237}
238
239/// A constraint that can be applied to the `AsOfBounds` of a collection.
240#[derive(Debug)]
241struct Constraint<'a, T> {
242    type_: ConstraintType,
243    /// Which bound this constraint applies to.
244    bound_type: BoundType,
245    /// The frontier by which the bound should be constrained.
246    frontier: &'a Antichain<T>,
247    /// A short description of the reason for applying this constraint.
248    ///
249    /// Used only for logging.
250    reason: &'a str,
251}
252
253impl<T: Timestamp> Constraint<'_, T> {
254    /// Applies this constraint to the given bounds.
255    ///
256    /// Returns a bool indicating whether the given bounds were changed as a result.
257    ///
258    /// Applying a constraint can fail, if the constraint frontier is incompatible with the
259    /// existing bounds. In this case, the constraint still gets partially applied by moving one of
260    /// the bounds up/down to the other, depending on the `bound_type`.
261    ///
262    /// Applying a constraint to sealed bounds is a no-op.
263    fn apply(&self, bounds: &mut AsOfBounds<T>) -> Result<bool, bool> {
264        if bounds.sealed {
265            return Ok(false);
266        }
267
268        match self.bound_type {
269            BoundType::Lower => {
270                if PartialOrder::less_than(&bounds.upper, self.frontier) {
271                    bounds.sealed = true;
272                    if PartialOrder::less_than(&bounds.lower, &bounds.upper) {
273                        bounds.lower.clone_from(&bounds.upper);
274                        Err(true)
275                    } else {
276                        Err(false)
277                    }
278                } else if PartialOrder::less_equal(self.frontier, &bounds.lower) {
279                    Ok(false)
280                } else {
281                    bounds.lower.clone_from(self.frontier);
282                    Ok(true)
283                }
284            }
285            BoundType::Upper => {
286                if PartialOrder::less_than(self.frontier, &bounds.lower) {
287                    bounds.sealed = true;
288                    if PartialOrder::less_than(&bounds.lower, &bounds.upper) {
289                        bounds.upper.clone_from(&bounds.lower);
290                        Err(true)
291                    } else {
292                        Err(false)
293                    }
294                } else if PartialOrder::less_equal(&bounds.upper, self.frontier) {
295                    Ok(false)
296                } else {
297                    bounds.upper.clone_from(self.frontier);
298                    Ok(true)
299                }
300            }
301        }
302    }
303}
304
305/// State tracked for a compute collection during as-of selection.
306struct Collection<'a, T> {
307    storage_inputs: Vec<GlobalId>,
308    compute_inputs: Vec<GlobalId>,
309    read_policy: Option<&'a ReadPolicy<T>>,
310    /// The currently known as-of bounds.
311    ///
312    /// Shared between collections exported by the same dataflow.
313    bounds: Rc<RefCell<AsOfBounds<T>>>,
314    /// Whether this collection is an index.
315    is_index: bool,
316}
317
318/// The as-of selection context.
319struct Context<'a, T> {
320    collections: BTreeMap<GlobalId, Collection<'a, T>>,
321    storage_collections: &'a dyn StorageCollections<Timestamp = T>,
322    current_time: T,
323}
324
325impl<'a, T: TimestampManipulation> Context<'a, T> {
326    /// Initializes an as-of selection context for the given `dataflows`.
327    fn new(
328        dataflows: &[DataflowDescription<Plan<T>, (), T>],
329        storage_collections: &'a dyn StorageCollections<Timestamp = T>,
330        read_policies: &'a BTreeMap<GlobalId, ReadPolicy<T>>,
331        current_time: T,
332    ) -> Self {
333        // Construct initial collection state for each dataflow export. Dataflows might have their
334        // as-ofs already fixed, which we need to take into account when constructing `AsOfBounds`.
335        let mut collections = BTreeMap::new();
336        for dataflow in dataflows {
337            let storage_inputs: Vec<_> = dataflow.source_imports.keys().copied().collect();
338            let compute_inputs: Vec<_> = dataflow.index_imports.keys().copied().collect();
339
340            let bounds = match dataflow.as_of.clone() {
341                Some(frontier) => AsOfBounds::single(frontier),
342                None => AsOfBounds::default(),
343            };
344            let bounds = Rc::new(RefCell::new(bounds));
345
346            for id in dataflow.export_ids() {
347                let collection = Collection {
348                    storage_inputs: storage_inputs.clone(),
349                    compute_inputs: compute_inputs.clone(),
350                    read_policy: read_policies.get(&id),
351                    bounds: Rc::clone(&bounds),
352                    is_index: dataflow.index_exports.contains_key(&id),
353                };
354                collections.insert(id, collection);
355            }
356        }
357
358        Self {
359            collections,
360            storage_collections,
361            current_time,
362        }
363    }
364
365    /// Returns the state of the identified collection.
366    ///
367    /// # Panics
368    ///
369    /// Panics if the identified collection doesn't exist.
370    fn expect_collection(&self, id: GlobalId) -> &Collection<T> {
371        self.collections
372            .get(&id)
373            .unwrap_or_else(|| panic!("collection missing: {id}"))
374    }
375
376    /// Applies the given as-of constraint to the identified collection.
377    ///
378    /// Returns whether the collection's as-of bounds where changed as a result.
379    fn apply_constraint(&self, id: GlobalId, constraint: Constraint<T>) -> bool {
380        let collection = self.expect_collection(id);
381        let mut bounds = collection.bounds.borrow_mut();
382        match constraint.apply(&mut bounds) {
383            Ok(changed) => {
384                if changed {
385                    info!(%id, %bounds, reason = %constraint.reason, "applied as-of constraint");
386                }
387                changed
388            }
389            Err(changed) => {
390                match constraint.type_ {
391                    ConstraintType::Hard => {
392                        soft_panic_or_log!(
393                            "failed to apply hard as-of constraint \
394                             (id={id}, bounds={bounds}, constraint={constraint:?})"
395                        );
396                    }
397                    ConstraintType::Soft => {
398                        info!(%id, %bounds, ?constraint, "failed to apply soft as-of constraint");
399                    }
400                }
401                changed
402            }
403        }
404    }
405
406    /// Apply as-of constraints imposed by the frontiers of upstream storage collections.
407    ///
408    /// A collection's as-of _must_ be >= the read frontier of each of its (transitive) storage
409    /// inputs.
410    ///
411    /// Failing to apply this constraint to a collection is an error. The affected dataflow will
412    /// not be able to hydrate successfully.
413    fn apply_upstream_storage_constraints(
414        &self,
415        storage_read_holds: &BTreeMap<GlobalId, ReadHold<T>>,
416    ) {
417        // Apply direct constraints from storage inputs.
418        for (id, collection) in &self.collections {
419            for input_id in &collection.storage_inputs {
420                let read_hold = &storage_read_holds[input_id];
421                let constraint = Constraint {
422                    type_: ConstraintType::Hard,
423                    bound_type: BoundType::Lower,
424                    frontier: read_hold.since(),
425                    reason: &format!("storage input {input_id} read frontier"),
426                };
427                self.apply_constraint(*id, constraint);
428            }
429        }
430
431        // Propagate constraints downstream, restoring `AsOfBounds` invariant (1).
432        self.propagate_bounds_downstream(BoundType::Lower);
433    }
434
435    /// Apply as-of constraints imposed by the frontiers of downstream storage collections.
436    ///
437    /// A collection's as-of _must_ be < the write frontier of the storage collection it exports to
438    /// (if any) if it is non-empty, and <= the storage collection's read frontier otherwise.
439    ///
440    /// Rationale:
441    ///
442    /// * A collection's as-of must be <= the write frontier of its dependent storage collection,
443    ///   because we need to pick up computing the contents of storage collections where we left
444    ///   off previously, to avoid skipped times observable in the durable output.
445    /// * Some dataflows feeding into storage collections (specifically: continual tasks) need to
446    ///   be able to observe input changes at times they write to the output. If we selected the
447    ///   as-of to be equal to the write frontier of the output storage collection, we wouldn't be
448    ///   able to produce the correct output at that frontier. Thus the selected as-of must be
449    ///   strictly less than the write frontier.
450    /// * As an exception to the above, if the output storage collection is empty (i.e. its write
451    ///   frontier is <= its read frontier), we need to allow the as-of to be equal to the read
452    ///   frontier. This is correct in the sense that it mirrors the timestamp selection behavior
453    ///   of the sequencer when it created the collection. Chances are that the sequencer chose the
454    ///   initial as-of (and therefore the initial read frontier of the storage collection) as the
455    ///   smallest possible time that can still be read from the collection inputs, so forcing the
456    ///   upper bound any lower than that read frontier would produce a hard constraint violation.
457    ///
458    /// Failing to apply this constraint to a collection is an error. The storage collection it
459    /// exports to may have times visible to readers skipped in its output, violating correctness.
460    fn apply_downstream_storage_constraints(&self) {
461        // Apply direct constraints from storage exports.
462        for id in self.collections.keys() {
463            let Ok(frontiers) = self.storage_collections.collection_frontiers(*id) else {
464                continue;
465            };
466
467            let collection_empty =
468                PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
469            let upper = if collection_empty {
470                frontiers.read_capabilities
471            } else {
472                Antichain::from_iter(
473                    frontiers
474                        .write_frontier
475                        .iter()
476                        .map(|t| t.step_back().unwrap_or_else(T::minimum)),
477                )
478            };
479
480            let constraint = Constraint {
481                type_: ConstraintType::Hard,
482                bound_type: BoundType::Upper,
483                frontier: &upper,
484                reason: &format!("storage export {id} write frontier"),
485            };
486            self.apply_constraint(*id, constraint);
487        }
488
489        // Propagate constraints upstream, restoring `AsOfBounds` invariant (2).
490        self.propagate_bounds_upstream(BoundType::Upper);
491    }
492
493    /// Apply as-of constraints to ensure collections can hydrate immediately.
494    ///
495    /// A collection's as-of _should_ be < the write frontier of each of its (transitive) storage
496    /// inputs.
497    ///
498    /// Failing to apply this constraint is not an error. The affected dataflow will not be able to
499    /// hydrate immediately, but it will be able to hydrate once its inputs have sufficiently
500    /// advanced.
501    fn apply_warmup_constraints(&self) {
502        // Apply direct constraints from storage inputs.
503        for (id, collection) in &self.collections {
504            for input_id in &collection.storage_inputs {
505                let frontiers = self
506                    .storage_collections
507                    .collection_frontiers(*input_id)
508                    .expect("storage collection exists");
509                let upper = step_back_frontier(&frontiers.write_frontier);
510                let constraint = Constraint {
511                    type_: ConstraintType::Soft,
512                    bound_type: BoundType::Upper,
513                    frontier: &upper,
514                    reason: &format!("storage input {input_id} warmup frontier"),
515                };
516                self.apply_constraint(*id, constraint);
517            }
518        }
519
520        // Propagate constraints downstream. This transparently restores any violations of
521        // `AsOfBounds` invariant (2) that might be introduced by the propagation.
522        self.propagate_bounds_downstream(BoundType::Upper);
523    }
524
525    /// Apply as-of constraints to ensure indexes contain historical data as requested by their
526    /// associated read policies.
527    ///
528    /// An index's as-of _should_ be <= the frontier determined by its read policy applied to its
529    /// write frontier.
530    ///
531    /// Failing to apply this constraint is not an error. The affected index will not contain
532    /// historical times for its entire compaction window initially, but will do so once sufficient
533    /// time has passed.
534    fn apply_index_read_policy_constraints(&self) {
535        // For the write frontier of an index, we'll use the least write frontier of its
536        // (transitive) storage inputs. This is an upper bound for the write frontier the index
537        // could have had before the restart. For indexes without storage inputs we use the current
538        // time.
539
540        // Collect write frontiers from storage inputs.
541        let mut write_frontiers = BTreeMap::new();
542        for (id, collection) in &self.collections {
543            let storage_frontiers = self
544                .storage_collections
545                .collections_frontiers(collection.storage_inputs.clone())
546                .expect("storage collections exist");
547
548            let mut write_frontier = Antichain::new();
549            for frontiers in storage_frontiers {
550                write_frontier.extend(frontiers.write_frontier);
551            }
552
553            write_frontiers.insert(*id, write_frontier);
554        }
555
556        // Propagate write frontiers through compute inputs.
557        fixpoint(|changed| {
558            for (id, collection) in &self.collections {
559                let write_frontier = write_frontiers.get_mut(id).expect("inserted above");
560                for input_id in &collection.compute_inputs {
561                    let input_collection = self.expect_collection(*input_id);
562                    let bounds = input_collection.bounds.borrow();
563                    *changed |= write_frontier.extend(bounds.upper.iter().cloned());
564                }
565            }
566        });
567
568        // Apply the read policy constraint to indexes.
569        for (id, collection) in &self.collections {
570            if let (true, Some(read_policy)) = (collection.is_index, &collection.read_policy) {
571                let mut write_frontier = write_frontiers.remove(id).expect("inserted above");
572                if write_frontier.is_empty() {
573                    write_frontier = Antichain::from_elem(self.current_time.clone());
574                }
575                let upper = read_policy.frontier(write_frontier.borrow());
576                let constraint = Constraint {
577                    type_: ConstraintType::Soft,
578                    bound_type: BoundType::Upper,
579                    frontier: &upper,
580                    reason: &format!(
581                        "read policy applied to write frontier {:?}",
582                        write_frontier.elements()
583                    ),
584                };
585                self.apply_constraint(*id, constraint);
586            }
587        }
588
589        // Restore `AsOfBounds` invariant (2).
590        self.propagate_bounds_upstream(BoundType::Upper);
591    }
592
593    /// Apply as-of constraints to ensure indexes are immediately readable.
594    ///
595    /// An index's as-of _should_ be <= the current time.
596    ///
597    /// Failing to apply this constraint is not an error. The affected index will not be readable
598    /// immediately, but will be readable once sufficient time has passed.
599    fn apply_index_current_time_constraints(&self) {
600        // Apply the current time constraint to indexes.
601        let upper = Antichain::from_elem(self.current_time.clone());
602        for (id, collection) in &self.collections {
603            if collection.is_index {
604                let constraint = Constraint {
605                    type_: ConstraintType::Soft,
606                    bound_type: BoundType::Upper,
607                    frontier: &upper,
608                    reason: "index current time",
609                };
610                self.apply_constraint(*id, constraint);
611            }
612        }
613
614        // Restore `AsOfBounds` invariant (2).
615        self.propagate_bounds_upstream(BoundType::Upper);
616    }
617
618    /// Propagate as-of bounds through the dependency graph, in downstream direction.
619    fn propagate_bounds_downstream(&self, bound_type: BoundType) {
620        // Propagating `lower` bounds downstream restores `AsOfBounds` invariant (1) and must
621        // therefore always succeed.
622        let constraint_type = match bound_type {
623            BoundType::Lower => ConstraintType::Hard,
624            BoundType::Upper => ConstraintType::Soft,
625        };
626
627        // We don't want to rely on a correspondence between `GlobalId` order and dependency order,
628        // so we use a fixpoint loop here.
629        fixpoint(|changed| {
630            self.propagate_bounds_downstream_inner(bound_type, constraint_type, changed);
631
632            // Propagating `upper` bounds downstream might break `AsOfBounds` invariant (2), so we
633            // need to restore it.
634            if bound_type == BoundType::Upper {
635                self.propagate_bounds_upstream_inner(
636                    BoundType::Upper,
637                    ConstraintType::Hard,
638                    changed,
639                );
640            }
641        });
642    }
643
644    fn propagate_bounds_downstream_inner(
645        &self,
646        bound_type: BoundType,
647        constraint_type: ConstraintType,
648        changed: &mut bool,
649    ) {
650        for (id, collection) in &self.collections {
651            for input_id in &collection.compute_inputs {
652                let input_collection = self.expect_collection(*input_id);
653                let bounds = input_collection.bounds.borrow();
654                let constraint = Constraint {
655                    type_: constraint_type,
656                    bound_type,
657                    frontier: bounds.get(bound_type),
658                    reason: &format!("upstream {input_id} {bound_type} as-of bound"),
659                };
660                *changed |= self.apply_constraint(*id, constraint);
661            }
662        }
663    }
664
665    /// Propagate as-of bounds through the dependency graph, in upstream direction.
666    fn propagate_bounds_upstream(&self, bound_type: BoundType) {
667        // Propagating `upper` bounds upstream restores `AsOfBounds` invariant (2) and must
668        // therefore always succeed.
669        let constraint_type = match bound_type {
670            BoundType::Lower => ConstraintType::Soft,
671            BoundType::Upper => ConstraintType::Hard,
672        };
673
674        // We don't want to rely on a correspondence between `GlobalId` order and dependency order,
675        // so we use a fixpoint loop here.
676        fixpoint(|changed| {
677            self.propagate_bounds_upstream_inner(bound_type, constraint_type, changed);
678
679            // Propagating `lower` bounds upstream might break `AsOfBounds` invariant (1), so we
680            // need to restore it.
681            if bound_type == BoundType::Lower {
682                self.propagate_bounds_downstream_inner(
683                    BoundType::Lower,
684                    ConstraintType::Hard,
685                    changed,
686                );
687            }
688        });
689    }
690
691    fn propagate_bounds_upstream_inner(
692        &self,
693        bound_type: BoundType,
694        constraint_type: ConstraintType,
695        changed: &mut bool,
696    ) {
697        for (id, collection) in self.collections.iter().rev() {
698            let bounds = collection.bounds.borrow();
699            for input_id in &collection.compute_inputs {
700                let constraint = Constraint {
701                    type_: constraint_type,
702                    bound_type,
703                    frontier: bounds.get(bound_type),
704                    reason: &format!("downstream {id} {bound_type} as-of bound"),
705                };
706                *changed |= self.apply_constraint(*input_id, constraint);
707            }
708        }
709    }
710
711    /// Selects the "best" as-of for the identified collection, based on its currently known
712    /// bounds.
713    ///
714    /// We simply use the upper bound here, to maximize the chances of compute reconciliation
715    /// succeeding. Choosing the latest possible as-of also minimizes the amount of work the
716    /// dataflow has to spend processing historical data from its sources.
717    fn best_as_of(&self, id: GlobalId) -> Antichain<T> {
718        if let Some(collection) = self.collections.get(&id) {
719            let bounds = collection.bounds.borrow();
720            bounds.upper.clone()
721        } else {
722            Antichain::new()
723        }
724    }
725
726    /// Removes collections that sink into sealed persist shards from the context.
727    ///
728    /// The dataflows of these collections will get an empty default as-of assigned at the end of
729    /// the as-of selection process, ensuring that they won't get installed unnecessarily.
730    ///
731    /// Note that it is valid to remove these collections from consideration because they don't
732    /// impose as-of constraints on other compute collections.
733    fn prune_sealed_persist_sinks(&mut self) {
734        self.collections.retain(|id, _| {
735            self.storage_collections
736                .collection_frontiers(*id)
737                .map_or(true, |f| !f.write_frontier.is_empty())
738        });
739    }
740}
741
742/// Runs `step` in a loop until it stops reporting changes.
743fn fixpoint(mut step: impl FnMut(&mut bool)) {
744    loop {
745        let mut changed = false;
746        step(&mut changed);
747        if !changed {
748            break;
749        }
750    }
751}
752
753/// Step back the given frontier.
754///
755/// This method is saturating: If the frontier contains `T::minimum()` times, these are kept
756/// unchanged.
757fn step_back_frontier<T: TimestampManipulation>(frontier: &Antichain<T>) -> Antichain<T> {
758    frontier
759        .iter()
760        .map(|t| t.step_back().unwrap_or_else(T::minimum))
761        .collect()
762}
763
764#[cfg(test)]
765mod tests {
766    use std::collections::BTreeSet;
767
768    use async_trait::async_trait;
769    use differential_dataflow::lattice::Lattice;
770    use futures::future::BoxFuture;
771    use futures::stream::BoxStream;
772    use mz_compute_types::dataflows::{IndexDesc, IndexImport};
773    use mz_compute_types::sinks::ComputeSinkConnection;
774    use mz_compute_types::sinks::ComputeSinkDesc;
775    use mz_compute_types::sinks::MaterializedViewSinkConnection;
776    use mz_compute_types::sources::SourceInstanceArguments;
777    use mz_compute_types::sources::SourceInstanceDesc;
778    use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
779    use mz_persist_types::{Codec64, ShardId};
780    use mz_repr::Timestamp;
781    use mz_repr::{RelationDesc, RelationType, RelationVersion, Row};
782    use mz_storage_client::client::TimestamplessUpdateBuilder;
783    use mz_storage_client::controller::{CollectionDescription, StorageMetadata, StorageTxn};
784    use mz_storage_client::storage_collections::{CollectionFrontiers, SnapshotCursor};
785    use mz_storage_types::StorageDiff;
786    use mz_storage_types::connections::inline::InlinedConnection;
787    use mz_storage_types::controller::{CollectionMetadata, StorageError};
788    use mz_storage_types::parameters::StorageParameters;
789    use mz_storage_types::read_holds::ReadHoldError;
790    use mz_storage_types::sources::{GenericSourceConnection, SourceDesc};
791    use mz_storage_types::sources::{SourceData, SourceExportDataConfig};
792    use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
793    use timely::progress::Timestamp as TimelyTimestamp;
794
795    use super::*;
796
797    const SEALED: u64 = 0x5ea1ed;
798
799    fn ts_to_frontier(ts: u64) -> Antichain<Timestamp> {
800        if ts == SEALED {
801            Antichain::new()
802        } else {
803            Antichain::from_elem(ts.into())
804        }
805    }
806
807    #[derive(Debug)]
808    struct StorageFrontiers(BTreeMap<GlobalId, (Antichain<Timestamp>, Antichain<Timestamp>)>);
809
810    #[async_trait]
811    impl StorageCollections for StorageFrontiers {
812        type Timestamp = Timestamp;
813
814        async fn initialize_state(
815            &self,
816            _txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
817            _init_ids: BTreeSet<GlobalId>,
818        ) -> Result<(), StorageError<Self::Timestamp>> {
819            unimplemented!()
820        }
821
822        fn update_parameters(&self, _config_params: StorageParameters) {
823            unimplemented!()
824        }
825
826        fn collection_metadata(
827            &self,
828            _id: GlobalId,
829        ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>> {
830            unimplemented!()
831        }
832
833        fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
834            unimplemented!()
835        }
836
837        fn collections_frontiers(
838            &self,
839            ids: Vec<GlobalId>,
840        ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>>
841        {
842            let mut frontiers = Vec::with_capacity(ids.len());
843            for id in ids {
844                let (read, write) = self.0.get(&id).ok_or(StorageError::IdentifierMissing(id))?;
845                frontiers.push(CollectionFrontiers {
846                    id,
847                    write_frontier: write.clone(),
848                    implied_capability: read.clone(),
849                    read_capabilities: read.clone(),
850                })
851            }
852            Ok(frontiers)
853        }
854
855        fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
856            unimplemented!()
857        }
858
859        fn check_exists(&self, _id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
860            unimplemented!()
861        }
862
863        async fn snapshot_stats(
864            &self,
865            _id: GlobalId,
866            _as_of: Antichain<Self::Timestamp>,
867        ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
868            unimplemented!()
869        }
870
871        async fn snapshot_parts_stats(
872            &self,
873            _id: GlobalId,
874            _as_of: Antichain<Self::Timestamp>,
875        ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
876            unimplemented!()
877        }
878
879        fn snapshot(
880            &self,
881            _id: GlobalId,
882            _as_of: Self::Timestamp,
883        ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>
884        {
885            unimplemented!()
886        }
887
888        async fn snapshot_latest(
889            &self,
890            _id: GlobalId,
891        ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
892            unimplemented!()
893        }
894
895        fn snapshot_cursor(
896            &self,
897            _id: GlobalId,
898            _as_of: Self::Timestamp,
899        ) -> BoxFuture<
900            'static,
901            Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>,
902        >
903        where
904            Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
905        {
906            unimplemented!()
907        }
908
909        fn snapshot_and_stream(
910            &self,
911            _id: GlobalId,
912            _as_of: Self::Timestamp,
913        ) -> BoxFuture<
914            'static,
915            Result<
916                BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
917                StorageError<Self::Timestamp>,
918            >,
919        > {
920            unimplemented!()
921        }
922
923        /// Create a [`TimestamplessUpdateBuilder`] that can be used to stage
924        /// updates for the provided [`GlobalId`].
925        fn create_update_builder(
926            &self,
927            _id: GlobalId,
928        ) -> BoxFuture<
929            'static,
930            Result<
931                TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
932                StorageError<Self::Timestamp>,
933            >,
934        >
935        where
936            Self::Timestamp: Lattice + Codec64,
937        {
938            unimplemented!()
939        }
940
941        async fn prepare_state(
942            &self,
943            _txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
944            _ids_to_add: BTreeSet<GlobalId>,
945            _ids_to_drop: BTreeSet<GlobalId>,
946            _ids_to_register: BTreeMap<GlobalId, ShardId>,
947        ) -> Result<(), StorageError<Self::Timestamp>> {
948            unimplemented!()
949        }
950
951        async fn create_collections_for_bootstrap(
952            &self,
953            _storage_metadata: &StorageMetadata,
954            _register_ts: Option<Self::Timestamp>,
955            _collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
956            _migrated_storage_collections: &BTreeSet<GlobalId>,
957        ) -> Result<(), StorageError<Self::Timestamp>> {
958            unimplemented!()
959        }
960
961        async fn alter_ingestion_source_desc(
962            &self,
963            _ingestion_id: GlobalId,
964            _source_desc: SourceDesc,
965        ) -> Result<(), StorageError<Self::Timestamp>> {
966            unimplemented!()
967        }
968
969        async fn alter_ingestion_export_data_configs(
970            &self,
971            _source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
972        ) -> Result<(), StorageError<Self::Timestamp>> {
973            unimplemented!()
974        }
975
976        async fn alter_ingestion_connections(
977            &self,
978            _source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
979        ) -> Result<(), StorageError<Self::Timestamp>> {
980            unimplemented!()
981        }
982
983        async fn alter_table_desc(
984            &self,
985            _existing_collection: GlobalId,
986            _new_collection: GlobalId,
987            _new_desc: RelationDesc,
988            _expected_version: RelationVersion,
989        ) -> Result<(), StorageError<Self::Timestamp>> {
990            unimplemented!()
991        }
992
993        fn drop_collections_unvalidated(
994            &self,
995            _storage_metadata: &StorageMetadata,
996            _identifiers: Vec<GlobalId>,
997        ) {
998            unimplemented!()
999        }
1000
1001        fn set_read_policies(&self, _policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
1002            unimplemented!()
1003        }
1004
1005        fn acquire_read_holds(
1006            &self,
1007            desired_holds: Vec<GlobalId>,
1008        ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError> {
1009            let mut holds = Vec::with_capacity(desired_holds.len());
1010            for id in desired_holds {
1011                let (read, _write) = self
1012                    .0
1013                    .get(&id)
1014                    .ok_or(ReadHoldError::CollectionMissing(id))?;
1015                let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
1016                holds.push(ReadHold::with_channel(id, read.clone(), tx));
1017            }
1018            Ok(holds)
1019        }
1020
1021        fn determine_time_dependence(
1022            &self,
1023            _id: GlobalId,
1024        ) -> Result<Option<TimeDependence>, TimeDependenceError> {
1025            unimplemented!()
1026        }
1027    }
1028
1029    fn dataflow(
1030        export_id: &str,
1031        input_ids: &[&str],
1032        storage_ids: &BTreeSet<&str>,
1033    ) -> DataflowDescription<Plan> {
1034        let source_imports = input_ids
1035            .iter()
1036            .filter(|s| storage_ids.contains(*s))
1037            .map(|s| {
1038                let id = s.parse().unwrap();
1039                let desc = SourceInstanceDesc {
1040                    arguments: SourceInstanceArguments {
1041                        operators: Default::default(),
1042                    },
1043                    storage_metadata: Default::default(),
1044                    typ: RelationType::empty(),
1045                };
1046                (id, (desc, Default::default(), Default::default()))
1047            })
1048            .collect();
1049        let index_imports = input_ids
1050            .iter()
1051            .filter(|s| !storage_ids.contains(*s))
1052            .map(|s| {
1053                let id = s.parse().unwrap();
1054                let import = IndexImport {
1055                    desc: IndexDesc {
1056                        on_id: GlobalId::Transient(0),
1057                        key: Default::default(),
1058                    },
1059                    typ: RelationType::empty(),
1060                    monotonic: Default::default(),
1061                };
1062                (id, import)
1063            })
1064            .collect();
1065        let index_exports = std::iter::once(export_id)
1066            .filter(|s| !storage_ids.contains(*s))
1067            .map(|sid| {
1068                let id = sid.parse().unwrap();
1069                let desc = IndexDesc {
1070                    on_id: GlobalId::Transient(0),
1071                    key: Default::default(),
1072                };
1073                let typ = RelationType::empty();
1074                (id, (desc, typ))
1075            })
1076            .collect();
1077        let sink_exports = std::iter::once(export_id)
1078            .filter(|s| storage_ids.contains(*s))
1079            .map(|sid| {
1080                let id = sid.parse().unwrap();
1081                let desc = ComputeSinkDesc {
1082                    from: GlobalId::Transient(0),
1083                    from_desc: RelationDesc::empty(),
1084                    connection: ComputeSinkConnection::MaterializedView(
1085                        MaterializedViewSinkConnection {
1086                            value_desc: RelationDesc::empty(),
1087                            storage_metadata: Default::default(),
1088                        },
1089                    ),
1090                    with_snapshot: Default::default(),
1091                    up_to: Default::default(),
1092                    non_null_assertions: Default::default(),
1093                    refresh_schedule: Default::default(),
1094                };
1095                (id, desc)
1096            })
1097            .collect();
1098
1099        DataflowDescription {
1100            source_imports,
1101            index_imports,
1102            objects_to_build: Default::default(),
1103            index_exports,
1104            sink_exports,
1105            as_of: None,
1106            until: Default::default(),
1107            initial_storage_as_of: Default::default(),
1108            refresh_schedule: Default::default(),
1109            debug_name: Default::default(),
1110            time_dependence: None,
1111        }
1112    }
1113
1114    macro_rules! testcase {
1115        ($name:ident, {
1116            storage: { $( $storage_id:literal: ($read:expr, $write:expr), )* },
1117            dataflows: [ $( $export_id:literal <- $inputs:expr => $as_of:expr, )* ],
1118            current_time: $current_time:literal,
1119            $( read_policies: { $( $policy_id:literal: $policy:expr, )* }, )?
1120        }) => {
1121            #[mz_ore::test]
1122            fn $name() {
1123                let storage_ids = [$( $storage_id, )*].into();
1124
1125                let storage_frontiers = StorageFrontiers(BTreeMap::from([
1126                    $(
1127                        (
1128                            $storage_id.parse().unwrap(),
1129                            (ts_to_frontier($read), ts_to_frontier($write)),
1130                        ),
1131                    )*
1132                ]));
1133
1134                let mut dataflows = [
1135                    $(
1136                        dataflow($export_id, &$inputs, &storage_ids),
1137                    )*
1138                ];
1139
1140                let read_policies = BTreeMap::from([
1141                    $($( ($policy_id.parse().unwrap(), $policy), )*)?
1142                ]);
1143
1144                super::run(
1145                    &mut dataflows,
1146                    &read_policies,
1147                    &storage_frontiers,
1148                    $current_time.into(),
1149                );
1150
1151                let actual_as_ofs: Vec<_> = dataflows
1152                    .into_iter()
1153                    .map(|d| d.as_of.unwrap())
1154                    .collect();
1155                let expected_as_ofs = [ $( ts_to_frontier($as_of), )* ];
1156
1157                assert_eq!(actual_as_ofs, expected_as_ofs);
1158            }
1159        };
1160    }
1161
1162    testcase!(upstream_storage_constraints, {
1163        storage: {
1164            "s1": (10, 20),
1165            "s2": (20, 30),
1166        },
1167        dataflows: [
1168            "u1" <- ["s1"]       => 10,
1169            "u2" <- ["s2"]       => 20,
1170            "u3" <- ["s1", "s2"] => 20,
1171            "u4" <- ["u1", "u2"] => 20,
1172        ],
1173        current_time: 0,
1174    });
1175
1176    testcase!(downstream_storage_constraints, {
1177        storage: {
1178            "s1": (10, 20),
1179            "u3": (10, 15),
1180            "u4": (10, 13),
1181        },
1182        dataflows: [
1183            "u1" <- ["s1"] => 19,
1184            "u2" <- ["s1"] => 12,
1185            "u3" <- ["u2"] => 12,
1186            "u4" <- ["u2"] => 12,
1187        ],
1188        current_time: 100,
1189    });
1190
1191    testcase!(warmup_constraints, {
1192        storage: {
1193            "s1": (10, 20),
1194            "s2": (10, 30),
1195            "s3": (10, 40),
1196            "s4": (10, 50),
1197        },
1198        dataflows: [
1199            "u1" <- ["s1"]       => 19,
1200            "u2" <- ["s2"]       => 19,
1201            "u3" <- ["s3"]       => 39,
1202            "u4" <- ["s4"]       => 39,
1203            "u5" <- ["u1", "u2"] => 19,
1204            "u6" <- ["u3", "u4"] => 39,
1205        ],
1206        current_time: 100,
1207    });
1208
1209    testcase!(index_read_policy_constraints, {
1210        storage: {
1211            "s1": (10, 20),
1212            "u6": (10, 18),
1213        },
1214        dataflows: [
1215            "u1" <- ["s1"] => 15,
1216            "u2" <- ["s1"] => 10,
1217            "u3" <- ["s1"] => 13,
1218            "u4" <- ["s1"] => 10,
1219            "u5" <- []     => 95,
1220            "u6" <- ["s1"] => 17,
1221        ],
1222        current_time: 100,
1223        read_policies: {
1224            "u1": ReadPolicy::lag_writes_by(5.into(), 1.into()),
1225            "u2": ReadPolicy::lag_writes_by(15.into(), 1.into()),
1226            "u3": ReadPolicy::ValidFrom(Antichain::from_elem(13.into())),
1227            "u4": ReadPolicy::ValidFrom(Antichain::from_elem(5.into())),
1228            "u5": ReadPolicy::lag_writes_by(5.into(), 1.into()),
1229            "u6": ReadPolicy::ValidFrom(Antichain::from_elem(13.into())),
1230        },
1231    });
1232
1233    testcase!(index_current_time_constraints, {
1234        storage: {
1235            "s1": (10, 20),
1236            "s2": (20, 30),
1237            "u4": (10, 12),
1238            "u5": (10, 18),
1239        },
1240        dataflows: [
1241            "u1" <- ["s1"] => 15,
1242            "u2" <- ["s2"] => 20,
1243            "u3" <- ["s1"] => 11,
1244            "u4" <- ["u3"] => 11,
1245            "u5" <- ["s1"] => 17,
1246            "u6" <- []     => 15,
1247        ],
1248        current_time: 15,
1249    });
1250
1251    testcase!(sealed_storage_sink, {
1252        storage: {
1253            "s1": (10, 20),
1254            "u1": (10, SEALED),
1255        },
1256        dataflows: [
1257            "u1" <- ["s1"] => SEALED,
1258        ],
1259        current_time: 100,
1260    });
1261}