Skip to main content

mz_compute_client/
as_of_selection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for selecting as-ofs of compute dataflows during system initialization.
11//!
12//! The functionality implemented here is invoked by the coordinator during its bootstrap process.
13//! Ideally, it would be part of the controller and transparent to the coordinator, but that's
14//! difficult to reconcile with the current controller API. For now, we still make the coordinator
15//! worry about as-of selection but keep the implementation in a compute crate because it really is
16//! a compute implementation concern.
17//!
18//! The as-of selection process takes a list of `DataflowDescription`s, determines compatible
19//! as-ofs for the compute collections they export, and augments the `DataflowDescription`s with
20//! these as-ofs.
21//!
22//! For each compute collection, the as-of selection process keeps an `AsOfBounds` instance that
23//! tracks a lower and an upper bound for the as-of the collection may get assigned. Throughout the
24//! process, a collection's `AsOfBounds` get repeatedly refined, by increasing the lower bound and
25//! decreasing the upper bound. The final upper bound is then used for the collection as-of. Using
26//! the upper bound maximizes the chances of compute reconciliation being effective, and minimizes
27//! the amount of historical data that must be read from the dataflow sources.
28//!
29//! Refinement of `AsOfBounds` is performed by applying `Constraint`s to collections. A
30//! `Constraint` specifies which bound should be refined to which frontier. A `Constraint` may be
31//! "hard" or "soft", which determines how failure to apply it is handled. Failing to apply a hard
32//! constraint is treated as an error, failing to apply a soft constraint is not. If a constraint
33//! fails to apply, the respective `AsOfBounds` are refined as much as possible (to a single
34//! frontier) and marked as "sealed". Subsequent constraint applications against the sealed bounds
35//! are no-ops. This is done to avoid log noise from repeated constraint application failures.
36//!
37//! Note that failing to apply a hard constraint does not abort the as-of selection process for the
38//! affected collection. Instead the failure is handled gracefully by logging an error and
39//! assigning the collection a best-effort as-of. This is done, rather than panicking or returning
40//! an error and letting the coordinator panic, to ensure the availability of the system. Ideally,
41//! we would instead mark the affected dataflow as failed/poisoned, but such a mechanism doesn't
42//! currently exist.
43//!
44//! The as-of selection process applies constraints in order of importance, because once a
45//! constraint application fails, the respective `AsOfBounds` are sealed and later applications
46//! won't have any effect. This means hard constraints must be applied before soft constraints, and
47//! more desirable soft constraints should be applied before less desirable ones.
48//!
49//! # `AsOfBounds` Invariants
50//!
51//! Correctness requires two invariants of `AsOfBounds` of dependent collections:
52//!
53//!  (1) The lower bound of a collection is >= the lower bound of each of its inputs.
54//!  (2) The upper bound of a collection is >= the upper bound of each of its inputs.
55//!
56//! Each step of the as-of selection process needs to ensure that these invariants are upheld once
57//! it completes. The expectation is that each step (a) performs local changes to either the
58//! `lower` _or_ the `upper` bounds of some collections and (b) invokes the appropriate
59//! `propagate_bounds_*` method to restore the invariant broken by (a).
60//!
61//! For steps that behave as described in (a), we can prove that (b) will always succeed in
62//! applying the bounds propagation constraints:
63//!
64//! | Let `A` and `B` be any pair of collections where `A` is an input of `B`.
65//! | Before (a), both invariants are upheld, i.e. `A.lower <= B.lower` and `A.upper <= B.upper`.
66//! |
67//! | Case 1: (a) increases `A.lower` and/or `B.lower` to `A.lower'` and `B.lower'`
68//! |     Invariant (1) might be broken, need to prove that it can be restored.
69//! |     Case 1.a: `A.lower' <= B.lower'`
70//! |         Invariant (1) is still upheld without propagation.
71//! |     Case 1.b: `A.lower' > B.lower'`
72//! |         A collection's lower bound can only be increased up to its upper bound.
73//! |         Therefore, and from invariant (2): `A.lower' <= A.upper <= B.upper`
74//! |         Therefore, propagation can set `B.lower' = A.lower'`, restoring invariant (1).
75//! | Case 2: (a) decreases `A.upper` and/or `B.upper`
76//! |     Invariant (2) might be broken, need to prove that it can be restored.
77//! |     The proof is equivalent to Case 1.
78
79use std::cell::RefCell;
80use std::collections::{BTreeMap, BTreeSet};
81use std::fmt;
82use std::rc::Rc;
83
84use mz_compute_types::dataflows::DataflowDescription;
85use mz_compute_types::plan::Plan;
86use mz_ore::collections::CollectionExt;
87use mz_ore::soft_panic_or_log;
88use mz_repr::{GlobalId, Timestamp};
89use mz_storage_client::storage_collections::StorageCollections;
90use mz_storage_types::read_holds::ReadHold;
91use mz_storage_types::read_policy::ReadPolicy;
92use timely::PartialOrder;
93use timely::progress::Antichain;
94use tracing::{info, warn};
95
96/// Runs as-of selection for the given dataflows.
97///
98/// Assigns the selected as-of to the provided dataflow descriptions and returns a set of
99/// `ReadHold`s that must not be dropped nor downgraded until the dataflows have been installed
100/// with the compute controller.
101pub fn run(
102    dataflows: &mut [DataflowDescription<Plan, ()>],
103    read_policies: &BTreeMap<GlobalId, ReadPolicy>,
104    storage_collections: &dyn StorageCollections,
105    current_time: Timestamp,
106    read_only_mode: bool,
107) -> BTreeMap<GlobalId, ReadHold> {
108    // Get read holds for the storage inputs of the dataflows.
109    // This ensures that storage frontiers don't advance past the selected as-ofs.
110    let mut storage_read_holds = BTreeMap::new();
111    for dataflow in &*dataflows {
112        for id in dataflow.source_imports.keys() {
113            if !storage_read_holds.contains_key(id) {
114                let read_hold = storage_collections
115                    .acquire_read_holds(vec![*id])
116                    .expect("storage collection exists")
117                    .into_element();
118                storage_read_holds.insert(*id, read_hold);
119            }
120        }
121    }
122
123    let mut ctx = Context::new(dataflows, storage_collections, read_policies, current_time);
124
125    // Dataflows that sink into a storage collection that has advanced to the empty frontier don't
126    // need to be installed at all. So we can apply an optimization where we prune them here and
127    // assign them an empty as-of at the end.
128    ctx.prune_sealed_persist_sinks();
129
130    // During 0dt upgrades, it can happen that the leader environment drops storage collections,
131    // making the read-only environment observe inconsistent read frontiers (database-issues#8836).
132    // To avoid hard-constraint failures, we prune all collections depending on these dropped
133    // storage collections.
134    if read_only_mode {
135        ctx.prune_dropped_collections();
136    }
137
138    // Apply hard constraints from upstream and downstream storage collections.
139    ctx.apply_upstream_storage_constraints(&storage_read_holds);
140    ctx.apply_downstream_storage_constraints();
141
142    // At this point all collections have as-of bounds that reflect what is required for
143    // correctness. The current state isn't very usable though. In particular, most of the upper
144    // bounds are likely to be the empty frontier, so if we'd select as-ofs on this basis, the
145    // resulting dataflows would never hydrate. Instead we'll apply a number of soft constraints to
146    // end up in a better place.
147
148    // Constrain collection as-ofs to times that are currently available in the inputs. This
149    // ensures that dataflows can immediately start hydrating. It also ensures that dataflows don't
150    // get an empty as-of, except when they exclusively depend on constant collections.
151    //
152    // Allowing dataflows to hydrate immediately is desirable. It allows them to complete the most
153    // resource-intensive phase of their lifecycle as early as possible. Ideally we want this phase
154    // to occur during a 0dt upgrade, where we still have the option to roll back if a cluster
155    // doesn't come up successfully. For dataflows with a refresh schedule, hydrating early also
156    // ensures that there isn't a large output delay when the refresh time is reached.
157    ctx.apply_warmup_constraints();
158
159    // Constrain as-ofs of indexes according to their read policies.
160    ctx.apply_index_read_policy_constraints();
161
162    // Constrain as-ofs of indexes to the current time. This ensures that indexes are immediately
163    // readable.
164    ctx.apply_index_current_time_constraints();
165
166    // Apply the derived as-of bounds to the dataflows.
167    for dataflow in dataflows {
168        // `AsOfBounds` are shared between the exports of a dataflow, so looking at just the first
169        // export is sufficient.
170        let first_export = dataflow.export_ids().next();
171        let as_of = first_export.map_or_else(Antichain::new, |id| ctx.best_as_of(id));
172        dataflow.as_of = Some(as_of);
173    }
174
175    storage_read_holds
176}
177
178/// Bounds for possible as-of values of a dataflow.
179#[derive(Debug)]
180struct AsOfBounds {
181    lower: Antichain<Timestamp>,
182    upper: Antichain<Timestamp>,
183    /// Whether these bounds can still change.
184    sealed: bool,
185}
186
187impl AsOfBounds {
188    /// Creates an `AsOfBounds` that only allows the given `frontier`.
189    fn single(frontier: Antichain<Timestamp>) -> Self {
190        Self {
191            lower: frontier.clone(),
192            upper: frontier,
193            sealed: false,
194        }
195    }
196
197    /// Get the bound of the given type.
198    fn get(&self, type_: BoundType) -> &Antichain<Timestamp> {
199        match type_ {
200            BoundType::Lower => &self.lower,
201            BoundType::Upper => &self.upper,
202        }
203    }
204}
205
206impl Default for AsOfBounds {
207    fn default() -> Self {
208        Self {
209            lower: Antichain::from_elem(Timestamp::MIN),
210            upper: Antichain::new(),
211            sealed: false,
212        }
213    }
214}
215
216impl fmt::Display for AsOfBounds {
217    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218        write!(
219            f,
220            "[{:?} .. {:?}]",
221            self.lower.elements(),
222            self.upper.elements()
223        )
224    }
225}
226
227/// Types of bounds.
228#[derive(Clone, Copy, Debug, PartialEq, Eq)]
229enum BoundType {
230    Lower,
231    Upper,
232}
233
234impl fmt::Display for BoundType {
235    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236        match self {
237            Self::Lower => f.write_str("lower"),
238            Self::Upper => f.write_str("upper"),
239        }
240    }
241}
242
243/// Types of constraints.
244#[derive(Clone, Copy, Debug, PartialEq, Eq)]
245enum ConstraintType {
246    /// Hard constraints are applied to enforce correctness properties, and failing to apply them is
247    /// an error.
248    Hard,
249    /// Soft constraints are applied to improve performance or UX, and failing to apply them is
250    /// undesirable but not an error.
251    Soft,
252}
253
254/// A constraint that can be applied to the `AsOfBounds` of a collection.
255#[derive(Debug)]
256struct Constraint<'a> {
257    type_: ConstraintType,
258    /// Which bound this constraint applies to.
259    bound_type: BoundType,
260    /// The frontier by which the bound should be constrained.
261    frontier: &'a Antichain<Timestamp>,
262    /// A short description of the reason for applying this constraint.
263    ///
264    /// Used only for logging.
265    reason: &'a str,
266}
267
268impl Constraint<'_> {
269    /// Applies this constraint to the given bounds.
270    ///
271    /// Returns a bool indicating whether the given bounds were changed as a result.
272    ///
273    /// Applying a constraint can fail, if the constraint frontier is incompatible with the
274    /// existing bounds. In this case, the constraint still gets partially applied by moving one of
275    /// the bounds up/down to the other, depending on the `bound_type`.
276    ///
277    /// Applying a constraint to sealed bounds is a no-op.
278    fn apply(&self, bounds: &mut AsOfBounds) -> Result<bool, bool> {
279        if bounds.sealed {
280            return Ok(false);
281        }
282
283        match self.bound_type {
284            BoundType::Lower => {
285                if PartialOrder::less_than(&bounds.upper, self.frontier) {
286                    bounds.sealed = true;
287                    if PartialOrder::less_than(&bounds.lower, &bounds.upper) {
288                        bounds.lower.clone_from(&bounds.upper);
289                        Err(true)
290                    } else {
291                        Err(false)
292                    }
293                } else if PartialOrder::less_equal(self.frontier, &bounds.lower) {
294                    Ok(false)
295                } else {
296                    bounds.lower.clone_from(self.frontier);
297                    Ok(true)
298                }
299            }
300            BoundType::Upper => {
301                if PartialOrder::less_than(self.frontier, &bounds.lower) {
302                    bounds.sealed = true;
303                    if PartialOrder::less_than(&bounds.lower, &bounds.upper) {
304                        bounds.upper.clone_from(&bounds.lower);
305                        Err(true)
306                    } else {
307                        Err(false)
308                    }
309                } else if PartialOrder::less_equal(&bounds.upper, self.frontier) {
310                    Ok(false)
311                } else {
312                    bounds.upper.clone_from(self.frontier);
313                    Ok(true)
314                }
315            }
316        }
317    }
318}
319
320/// State tracked for a compute collection during as-of selection.
321struct Collection<'a> {
322    storage_inputs: Vec<GlobalId>,
323    compute_inputs: Vec<GlobalId>,
324    read_policy: Option<&'a ReadPolicy>,
325    /// The currently known as-of bounds.
326    ///
327    /// Shared between collections exported by the same dataflow.
328    bounds: Rc<RefCell<AsOfBounds>>,
329    /// Whether this collection is an index.
330    is_index: bool,
331}
332
333/// The as-of selection context.
334struct Context<'a> {
335    collections: BTreeMap<GlobalId, Collection<'a>>,
336    storage_collections: &'a dyn StorageCollections,
337    current_time: Timestamp,
338}
339
340impl<'a> Context<'a> {
341    /// Initializes an as-of selection context for the given `dataflows`.
342    fn new(
343        dataflows: &[DataflowDescription<Plan, ()>],
344        storage_collections: &'a dyn StorageCollections,
345        read_policies: &'a BTreeMap<GlobalId, ReadPolicy>,
346        current_time: Timestamp,
347    ) -> Self {
348        // Construct initial collection state for each dataflow export. Dataflows might have their
349        // as-ofs already fixed, which we need to take into account when constructing `AsOfBounds`.
350        let mut collections = BTreeMap::new();
351        for dataflow in dataflows {
352            let storage_inputs: Vec<_> = dataflow.source_imports.keys().copied().collect();
353            let compute_inputs: Vec<_> = dataflow.index_imports.keys().copied().collect();
354
355            let bounds = match dataflow.as_of.clone() {
356                Some(frontier) => AsOfBounds::single(frontier),
357                None => AsOfBounds::default(),
358            };
359            let bounds = Rc::new(RefCell::new(bounds));
360
361            for id in dataflow.export_ids() {
362                let collection = Collection {
363                    storage_inputs: storage_inputs.clone(),
364                    compute_inputs: compute_inputs.clone(),
365                    read_policy: read_policies.get(&id),
366                    bounds: Rc::clone(&bounds),
367                    is_index: dataflow.index_exports.contains_key(&id),
368                };
369                collections.insert(id, collection);
370            }
371        }
372
373        Self {
374            collections,
375            storage_collections,
376            current_time,
377        }
378    }
379
380    /// Returns the state of the identified collection.
381    ///
382    /// # Panics
383    ///
384    /// Panics if the identified collection doesn't exist.
385    fn expect_collection(&self, id: GlobalId) -> &Collection<'_> {
386        self.collections
387            .get(&id)
388            .unwrap_or_else(|| panic!("collection missing: {id}"))
389    }
390
391    /// Applies the given as-of constraint to the identified collection.
392    ///
393    /// Returns whether the collection's as-of bounds where changed as a result.
394    fn apply_constraint(&self, id: GlobalId, constraint: Constraint) -> bool {
395        let collection = self.expect_collection(id);
396        let mut bounds = collection.bounds.borrow_mut();
397        match constraint.apply(&mut bounds) {
398            Ok(changed) => {
399                if changed {
400                    info!(%id, %bounds, reason = %constraint.reason, "applied as-of constraint");
401                }
402                changed
403            }
404            Err(changed) => {
405                match constraint.type_ {
406                    ConstraintType::Hard => {
407                        soft_panic_or_log!(
408                            "failed to apply hard as-of constraint \
409                             (id={id}, bounds={bounds}, constraint={constraint:?})"
410                        );
411                    }
412                    ConstraintType::Soft => {
413                        info!(%id, %bounds, ?constraint, "failed to apply soft as-of constraint");
414                    }
415                }
416                changed
417            }
418        }
419    }
420
421    /// Apply as-of constraints imposed by the frontiers of upstream storage collections.
422    ///
423    /// A collection's as-of _must_ be >= the read frontier of each of its (transitive) storage
424    /// inputs.
425    ///
426    /// Failing to apply this constraint to a collection is an error. The affected dataflow will
427    /// not be able to hydrate successfully.
428    fn apply_upstream_storage_constraints(
429        &self,
430        storage_read_holds: &BTreeMap<GlobalId, ReadHold>,
431    ) {
432        // Apply direct constraints from storage inputs.
433        for (id, collection) in &self.collections {
434            for input_id in &collection.storage_inputs {
435                let read_hold = &storage_read_holds[input_id];
436                let constraint = Constraint {
437                    type_: ConstraintType::Hard,
438                    bound_type: BoundType::Lower,
439                    frontier: read_hold.since(),
440                    reason: &format!("storage input {input_id} read frontier"),
441                };
442                self.apply_constraint(*id, constraint);
443            }
444        }
445
446        // Propagate constraints downstream, restoring `AsOfBounds` invariant (1).
447        self.propagate_bounds_downstream(BoundType::Lower);
448    }
449
450    /// Apply as-of constraints imposed by the frontiers of downstream storage collections.
451    ///
452    /// A collection's as-of _must_ be < the write frontier of the storage collection it exports to
453    /// (if any) if it is non-empty, and <= the storage collection's read frontier otherwise.
454    ///
455    /// Rationale:
456    ///
457    /// * A collection's as-of must be <= the write frontier of its dependent storage collection,
458    ///   because we need to pick up computing the contents of storage collections where we left
459    ///   off previously, to avoid skipped times observable in the durable output.
460    /// * Some dataflows feeding into storage collections (specifically: continual tasks) need to
461    ///   be able to observe input changes at times they write to the output. If we selected the
462    ///   as-of to be equal to the write frontier of the output storage collection, we wouldn't be
463    ///   able to produce the correct output at that frontier. Thus the selected as-of must be
464    ///   strictly less than the write frontier.
465    /// * As an exception to the above, if the output storage collection is empty (i.e. its write
466    ///   frontier is <= its read frontier), we need to allow the as-of to be equal to the read
467    ///   frontier. This is correct in the sense that it mirrors the timestamp selection behavior
468    ///   of the sequencer when it created the collection. Chances are that the sequencer chose the
469    ///   initial as-of (and therefore the initial read frontier of the storage collection) as the
470    ///   smallest possible time that can still be read from the collection inputs, so forcing the
471    ///   upper bound any lower than that read frontier would produce a hard constraint violation.
472    ///
473    /// Failing to apply this constraint to a collection is an error. The storage collection it
474    /// exports to may have times visible to readers skipped in its output, violating correctness.
475    fn apply_downstream_storage_constraints(&self) {
476        // Apply direct constraints from storage exports.
477        for id in self.collections.keys() {
478            let Ok(frontiers) = self.storage_collections.collection_frontiers(*id) else {
479                continue;
480            };
481
482            let collection_empty =
483                PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
484            let upper = if collection_empty {
485                frontiers.read_capabilities
486            } else {
487                step_back_frontier(&frontiers.write_frontier)
488            };
489
490            let constraint = Constraint {
491                type_: ConstraintType::Hard,
492                bound_type: BoundType::Upper,
493                frontier: &upper,
494                reason: &format!("storage export {id} write frontier"),
495            };
496            self.apply_constraint(*id, constraint);
497        }
498
499        // Propagate constraints upstream, restoring `AsOfBounds` invariant (2).
500        self.propagate_bounds_upstream(BoundType::Upper);
501    }
502
503    /// Apply as-of constraints to ensure collections can hydrate immediately.
504    ///
505    /// A collection's as-of _should_ be < the write frontier of each of its (transitive) storage
506    /// inputs.
507    ///
508    /// Failing to apply this constraint is not an error. The affected dataflow will not be able to
509    /// hydrate immediately, but it will be able to hydrate once its inputs have sufficiently
510    /// advanced.
511    fn apply_warmup_constraints(&self) {
512        // Collect write frontiers from storage inputs.
513        let mut write_frontiers = BTreeMap::new();
514        for (id, collection) in &self.collections {
515            let storage_frontiers = self
516                .storage_collections
517                .collections_frontiers(collection.storage_inputs.clone())
518                .expect("storage collections exist");
519
520            let mut write_frontier = Antichain::new();
521            for frontiers in storage_frontiers {
522                write_frontier.extend(frontiers.write_frontier);
523            }
524
525            write_frontiers.insert(*id, write_frontier);
526        }
527
528        // Propagate write frontiers through compute inputs.
529        fixpoint(|changed| {
530            for (id, collection) in &self.collections {
531                let mut write_frontier = write_frontiers.remove(id).expect("inserted above");
532                for input_id in &collection.compute_inputs {
533                    let frontier = &write_frontiers[input_id];
534                    *changed |= write_frontier.extend(frontier.iter().cloned());
535                }
536                write_frontiers.insert(*id, write_frontier);
537            }
538        });
539
540        // Apply the warmup constraint.
541        for (id, write_frontier) in write_frontiers {
542            let upper = step_back_frontier(&write_frontier);
543            let constraint = Constraint {
544                type_: ConstraintType::Soft,
545                bound_type: BoundType::Upper,
546                frontier: &upper,
547                reason: &format!(
548                    "warmup frontier derived from storage write frontier {:?}",
549                    write_frontier.elements()
550                ),
551            };
552            self.apply_constraint(id, constraint);
553        }
554
555        // Restore `AsOfBounds` invariant (2).
556        self.propagate_bounds_upstream(BoundType::Upper);
557    }
558
559    /// Apply as-of constraints to ensure indexes contain historical data as requested by their
560    /// associated read policies.
561    ///
562    /// An index's as-of _should_ be <= the frontier determined by its read policy applied to its
563    /// write frontier.
564    ///
565    /// Failing to apply this constraint is not an error. The affected index will not contain
566    /// historical times for its entire compaction window initially, but will do so once sufficient
567    /// time has passed.
568    fn apply_index_read_policy_constraints(&self) {
569        // For the write frontier of an index, we'll use the least write frontier of its
570        // (transitive) storage inputs. This is an upper bound for the write frontier the index
571        // could have had before the restart. For indexes without storage inputs we use the current
572        // time.
573
574        // Collect write frontiers from storage inputs.
575        let mut write_frontiers = BTreeMap::new();
576        for (id, collection) in &self.collections {
577            let storage_frontiers = self
578                .storage_collections
579                .collections_frontiers(collection.storage_inputs.clone())
580                .expect("storage collections exist");
581
582            let mut write_frontier = Antichain::new();
583            for frontiers in storage_frontiers {
584                write_frontier.extend(frontiers.write_frontier);
585            }
586
587            write_frontiers.insert(*id, write_frontier);
588        }
589
590        // Propagate write frontiers through compute inputs.
591        fixpoint(|changed| {
592            for (id, collection) in &self.collections {
593                let mut write_frontier = write_frontiers.remove(id).expect("inserted above");
594                for input_id in &collection.compute_inputs {
595                    let frontier = &write_frontiers[input_id];
596                    *changed |= write_frontier.extend(frontier.iter().cloned());
597                }
598                write_frontiers.insert(*id, write_frontier);
599            }
600        });
601
602        // Apply the read policy constraint to indexes.
603        for (id, collection) in &self.collections {
604            if let (true, Some(read_policy)) = (collection.is_index, &collection.read_policy) {
605                let mut write_frontier = write_frontiers.remove(id).expect("inserted above");
606                if write_frontier.is_empty() {
607                    write_frontier = Antichain::from_elem(self.current_time.clone());
608                }
609                let upper = read_policy.frontier(write_frontier.borrow());
610                let constraint = Constraint {
611                    type_: ConstraintType::Soft,
612                    bound_type: BoundType::Upper,
613                    frontier: &upper,
614                    reason: &format!(
615                        "read policy applied to write frontier {:?}",
616                        write_frontier.elements()
617                    ),
618                };
619                self.apply_constraint(*id, constraint);
620            }
621        }
622
623        // Restore `AsOfBounds` invariant (2).
624        self.propagate_bounds_upstream(BoundType::Upper);
625    }
626
627    /// Apply as-of constraints to ensure indexes are immediately readable.
628    ///
629    /// An index's as-of _should_ be <= the current time.
630    ///
631    /// Failing to apply this constraint is not an error. The affected index will not be readable
632    /// immediately, but will be readable once sufficient time has passed.
633    fn apply_index_current_time_constraints(&self) {
634        // Apply the current time constraint to indexes.
635        let upper = Antichain::from_elem(self.current_time.clone());
636        for (id, collection) in &self.collections {
637            if collection.is_index {
638                let constraint = Constraint {
639                    type_: ConstraintType::Soft,
640                    bound_type: BoundType::Upper,
641                    frontier: &upper,
642                    reason: "index current time",
643                };
644                self.apply_constraint(*id, constraint);
645            }
646        }
647
648        // Restore `AsOfBounds` invariant (2).
649        self.propagate_bounds_upstream(BoundType::Upper);
650    }
651
652    /// Propagate as-of bounds through the dependency graph, in downstream direction.
653    fn propagate_bounds_downstream(&self, bound_type: BoundType) {
654        // Propagating `lower` bounds downstream restores `AsOfBounds` invariant (1) and must
655        // therefore always succeed.
656        let constraint_type = match bound_type {
657            BoundType::Lower => ConstraintType::Hard,
658            BoundType::Upper => ConstraintType::Soft,
659        };
660
661        // We don't want to rely on a correspondence between `GlobalId` order and dependency order,
662        // so we use a fixpoint loop here.
663        fixpoint(|changed| {
664            self.propagate_bounds_downstream_inner(bound_type, constraint_type, changed);
665
666            // Propagating `upper` bounds downstream might break `AsOfBounds` invariant (2), so we
667            // need to restore it.
668            if bound_type == BoundType::Upper {
669                self.propagate_bounds_upstream_inner(
670                    BoundType::Upper,
671                    ConstraintType::Hard,
672                    changed,
673                );
674            }
675        });
676    }
677
678    fn propagate_bounds_downstream_inner(
679        &self,
680        bound_type: BoundType,
681        constraint_type: ConstraintType,
682        changed: &mut bool,
683    ) {
684        for (id, collection) in &self.collections {
685            for input_id in &collection.compute_inputs {
686                let input_collection = self.expect_collection(*input_id);
687                let bounds = input_collection.bounds.borrow();
688                let constraint = Constraint {
689                    type_: constraint_type,
690                    bound_type,
691                    frontier: bounds.get(bound_type),
692                    reason: &format!("upstream {input_id} {bound_type} as-of bound"),
693                };
694                *changed |= self.apply_constraint(*id, constraint);
695            }
696        }
697    }
698
699    /// Propagate as-of bounds through the dependency graph, in upstream direction.
700    fn propagate_bounds_upstream(&self, bound_type: BoundType) {
701        // Propagating `upper` bounds upstream restores `AsOfBounds` invariant (2) and must
702        // therefore always succeed.
703        let constraint_type = match bound_type {
704            BoundType::Lower => ConstraintType::Soft,
705            BoundType::Upper => ConstraintType::Hard,
706        };
707
708        // We don't want to rely on a correspondence between `GlobalId` order and dependency order,
709        // so we use a fixpoint loop here.
710        fixpoint(|changed| {
711            self.propagate_bounds_upstream_inner(bound_type, constraint_type, changed);
712
713            // Propagating `lower` bounds upstream might break `AsOfBounds` invariant (1), so we
714            // need to restore it.
715            if bound_type == BoundType::Lower {
716                self.propagate_bounds_downstream_inner(
717                    BoundType::Lower,
718                    ConstraintType::Hard,
719                    changed,
720                );
721            }
722        });
723    }
724
725    fn propagate_bounds_upstream_inner(
726        &self,
727        bound_type: BoundType,
728        constraint_type: ConstraintType,
729        changed: &mut bool,
730    ) {
731        for (id, collection) in self.collections.iter().rev() {
732            let bounds = collection.bounds.borrow();
733            for input_id in &collection.compute_inputs {
734                let constraint = Constraint {
735                    type_: constraint_type,
736                    bound_type,
737                    frontier: bounds.get(bound_type),
738                    reason: &format!("downstream {id} {bound_type} as-of bound"),
739                };
740                *changed |= self.apply_constraint(*input_id, constraint);
741            }
742        }
743    }
744
745    /// Selects the "best" as-of for the identified collection, based on its currently known
746    /// bounds.
747    ///
748    /// We simply use the upper bound here, to maximize the chances of compute reconciliation
749    /// succeeding. Choosing the latest possible as-of also minimizes the amount of work the
750    /// dataflow has to spend processing historical data from its sources.
751    fn best_as_of(&self, id: GlobalId) -> Antichain<Timestamp> {
752        if let Some(collection) = self.collections.get(&id) {
753            let bounds = collection.bounds.borrow();
754            bounds.upper.clone()
755        } else {
756            Antichain::new()
757        }
758    }
759
760    /// Removes collections that sink into sealed persist shards from the context.
761    ///
762    /// The dataflows of these collections will get an empty default as-of assigned at the end of
763    /// the as-of selection process, ensuring that they won't get installed unnecessarily.
764    ///
765    /// Note that it is valid to remove these collections from consideration because they don't
766    /// impose as-of constraints on other compute collections.
767    fn prune_sealed_persist_sinks(&mut self) {
768        self.collections.retain(|id, _| {
769            self.storage_collections
770                .collection_frontiers(*id)
771                .map_or(true, |f| !f.write_frontier.is_empty())
772        });
773    }
774
775    /// Removes collections depending on storage collections with empty read frontiers.
776    ///
777    /// The dataflows of these collections will get an empty default as-of assigned at the end of
778    /// the as-of selection process, ensuring that they won't get installed.
779    ///
780    /// This exists only to work around database-issues#8836.
781    fn prune_dropped_collections(&mut self) {
782        // Remove collections with dropped storage inputs.
783        let mut pruned = BTreeSet::new();
784        self.collections.retain(|id, c| {
785            let input_dropped = c.storage_inputs.iter().any(|id| {
786                let frontiers = self
787                    .storage_collections
788                    .collection_frontiers(*id)
789                    .expect("storage collection exists");
790                frontiers.read_capabilities.is_empty()
791            });
792
793            if input_dropped {
794                pruned.insert(*id);
795                false
796            } else {
797                true
798            }
799        });
800
801        warn!(?pruned, "pruned dependants of dropped storage collections");
802
803        // Remove (transitive) dependants of pruned collections.
804        while !pruned.is_empty() {
805            let pruned_inputs = std::mem::take(&mut pruned);
806
807            self.collections.retain(|id, c| {
808                if c.compute_inputs.iter().any(|id| pruned_inputs.contains(id)) {
809                    pruned.insert(*id);
810                    false
811                } else {
812                    true
813                }
814            });
815
816            warn!(?pruned, "pruned collections with pruned inputs");
817        }
818    }
819}
820
821/// Runs `step` in a loop until it stops reporting changes.
822fn fixpoint(mut step: impl FnMut(&mut bool)) {
823    loop {
824        let mut changed = false;
825        step(&mut changed);
826        if !changed {
827            break;
828        }
829    }
830}
831
832/// Step back the given frontier.
833///
834/// This method is saturating: If the frontier contains `T::minimum()` times, these are kept
835/// unchanged.
836fn step_back_frontier(frontier: &Antichain<Timestamp>) -> Antichain<Timestamp> {
837    frontier
838        .iter()
839        .map(|t| t.step_back().unwrap_or(Timestamp::MIN))
840        .collect()
841}
842
843#[cfg(test)]
844mod tests {
845    use std::collections::BTreeSet;
846
847    use async_trait::async_trait;
848    use futures::future::BoxFuture;
849    use futures::stream::BoxStream;
850    use mz_compute_types::dataflows::{IndexDesc, IndexImport};
851    use mz_compute_types::sinks::ComputeSinkConnection;
852    use mz_compute_types::sinks::ComputeSinkDesc;
853    use mz_compute_types::sinks::MaterializedViewSinkConnection;
854    use mz_compute_types::sources::SourceInstanceArguments;
855    use mz_compute_types::sources::SourceInstanceDesc;
856    use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
857    use mz_persist_types::ShardId;
858    use mz_repr::{RelationDesc, RelationVersion, Row, SqlRelationType};
859    use mz_repr::{ReprRelationType, Timestamp};
860    use mz_storage_client::client::TimestamplessUpdateBuilder;
861    use mz_storage_client::controller::{CollectionDescription, StorageMetadata, StorageTxn};
862    use mz_storage_client::storage_collections::{CollectionFrontiers, SnapshotCursor};
863    use mz_storage_types::StorageDiff;
864    use mz_storage_types::controller::{CollectionMetadata, StorageError};
865    use mz_storage_types::errors::CollectionMissing;
866    use mz_storage_types::parameters::StorageParameters;
867    use mz_storage_types::sources::SourceData;
868    use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
869
870    use super::*;
871
872    const SEALED: u64 = 0x5ea1ed;
873
874    fn ts_to_frontier(ts: u64) -> Antichain<Timestamp> {
875        if ts == SEALED {
876            Antichain::new()
877        } else {
878            Antichain::from_elem(ts.into())
879        }
880    }
881
882    #[derive(Debug)]
883    struct StorageFrontiers(BTreeMap<GlobalId, (Antichain<Timestamp>, Antichain<Timestamp>)>);
884
885    #[async_trait]
886    impl StorageCollections for StorageFrontiers {
887        async fn initialize_state(
888            &self,
889            _txn: &mut (dyn StorageTxn + Send),
890            _init_ids: BTreeSet<GlobalId>,
891        ) -> Result<(), StorageError> {
892            unimplemented!()
893        }
894
895        fn update_parameters(&self, _config_params: StorageParameters) {
896            unimplemented!()
897        }
898
899        fn collection_metadata(
900            &self,
901            _id: GlobalId,
902        ) -> Result<CollectionMetadata, CollectionMissing> {
903            unimplemented!()
904        }
905
906        fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
907            unimplemented!()
908        }
909
910        fn collections_frontiers(
911            &self,
912            ids: Vec<GlobalId>,
913        ) -> Result<Vec<CollectionFrontiers>, CollectionMissing> {
914            let mut frontiers = Vec::with_capacity(ids.len());
915            for id in ids {
916                let (read, write) = self.0.get(&id).ok_or(CollectionMissing(id))?;
917                frontiers.push(CollectionFrontiers {
918                    id,
919                    write_frontier: write.clone(),
920                    implied_capability: read.clone(),
921                    read_capabilities: read.clone(),
922                })
923            }
924            Ok(frontiers)
925        }
926
927        fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers> {
928            unimplemented!()
929        }
930
931        fn check_exists(&self, _id: GlobalId) -> Result<(), StorageError> {
932            unimplemented!()
933        }
934
935        async fn snapshot_stats(
936            &self,
937            _id: GlobalId,
938            _as_of: Antichain<Timestamp>,
939        ) -> Result<SnapshotStats, StorageError> {
940            unimplemented!()
941        }
942
943        async fn snapshot_parts_stats(
944            &self,
945            _id: GlobalId,
946            _as_of: Antichain<Timestamp>,
947        ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError>> {
948            unimplemented!()
949        }
950
951        fn snapshot(
952            &self,
953            _id: GlobalId,
954            _as_of: Timestamp,
955        ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError>> {
956            unimplemented!()
957        }
958
959        async fn snapshot_latest(&self, _id: GlobalId) -> Result<Vec<Row>, StorageError> {
960            unimplemented!()
961        }
962
963        fn snapshot_cursor(
964            &self,
965            _id: GlobalId,
966            _as_of: Timestamp,
967        ) -> BoxFuture<'static, Result<SnapshotCursor, StorageError>> {
968            unimplemented!()
969        }
970
971        fn snapshot_and_stream(
972            &self,
973            _id: GlobalId,
974            _as_of: Timestamp,
975        ) -> BoxFuture<
976            'static,
977            Result<BoxStream<'static, (SourceData, Timestamp, StorageDiff)>, StorageError>,
978        > {
979            unimplemented!()
980        }
981
982        fn create_update_builder(
983            &self,
984            _id: GlobalId,
985        ) -> BoxFuture<
986            'static,
987            Result<TimestamplessUpdateBuilder<SourceData, (), StorageDiff>, StorageError>,
988        > {
989            unimplemented!()
990        }
991
992        async fn prepare_state(
993            &self,
994            _txn: &mut (dyn StorageTxn + Send),
995            _ids_to_add: BTreeSet<GlobalId>,
996            _ids_to_drop: BTreeSet<GlobalId>,
997            _ids_to_register: BTreeMap<GlobalId, ShardId>,
998        ) -> Result<(), StorageError> {
999            unimplemented!()
1000        }
1001
1002        async fn create_collections_for_bootstrap(
1003            &self,
1004            _storage_metadata: &StorageMetadata,
1005            _register_ts: Option<Timestamp>,
1006            _collections: Vec<(GlobalId, CollectionDescription)>,
1007            _migrated_storage_collections: &BTreeSet<GlobalId>,
1008        ) -> Result<(), StorageError> {
1009            unimplemented!()
1010        }
1011
1012        async fn alter_table_desc(
1013            &self,
1014            _existing_collection: GlobalId,
1015            _new_collection: GlobalId,
1016            _new_desc: RelationDesc,
1017            _expected_version: RelationVersion,
1018        ) -> Result<(), StorageError> {
1019            unimplemented!()
1020        }
1021
1022        fn drop_collections_unvalidated(
1023            &self,
1024            _storage_metadata: &StorageMetadata,
1025            _identifiers: Vec<GlobalId>,
1026        ) {
1027            unimplemented!()
1028        }
1029
1030        fn set_read_policies(&self, _policies: Vec<(GlobalId, ReadPolicy)>) {
1031            unimplemented!()
1032        }
1033
1034        fn acquire_read_holds(
1035            &self,
1036            desired_holds: Vec<GlobalId>,
1037        ) -> Result<Vec<ReadHold>, CollectionMissing> {
1038            let mut holds = Vec::with_capacity(desired_holds.len());
1039            for id in desired_holds {
1040                let (read, _write) = self.0.get(&id).ok_or(CollectionMissing(id))?;
1041                let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
1042                holds.push(ReadHold::with_channel(id, read.clone(), tx));
1043            }
1044            Ok(holds)
1045        }
1046
1047        fn determine_time_dependence(
1048            &self,
1049            _id: GlobalId,
1050        ) -> Result<Option<TimeDependence>, TimeDependenceError> {
1051            unimplemented!()
1052        }
1053
1054        fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
1055            unimplemented!()
1056        }
1057    }
1058
1059    fn dataflow(
1060        export_id: &str,
1061        input_ids: &[&str],
1062        storage_ids: &BTreeSet<&str>,
1063    ) -> DataflowDescription<Plan> {
1064        let source_imports = input_ids
1065            .iter()
1066            .filter(|s| storage_ids.contains(*s))
1067            .map(|s| {
1068                let id = s.parse().unwrap();
1069                let desc = SourceInstanceDesc {
1070                    arguments: SourceInstanceArguments {
1071                        operators: Default::default(),
1072                    },
1073                    storage_metadata: Default::default(),
1074                    typ: SqlRelationType::empty(),
1075                };
1076                (
1077                    id,
1078                    mz_compute_types::dataflows::SourceImport {
1079                        desc,
1080                        monotonic: Default::default(),
1081                        with_snapshot: true,
1082                        upper: Default::default(),
1083                    },
1084                )
1085            })
1086            .collect();
1087        let index_imports = input_ids
1088            .iter()
1089            .filter(|s| !storage_ids.contains(*s))
1090            .map(|s| {
1091                let id = s.parse().unwrap();
1092                let import = IndexImport {
1093                    desc: IndexDesc {
1094                        on_id: GlobalId::Transient(0),
1095                        key: Default::default(),
1096                    },
1097                    typ: ReprRelationType::empty(),
1098                    monotonic: Default::default(),
1099                    with_snapshot: true,
1100                };
1101                (id, import)
1102            })
1103            .collect();
1104        let index_exports = std::iter::once(export_id)
1105            .filter(|s| !storage_ids.contains(*s))
1106            .map(|sid| {
1107                let id = sid.parse().unwrap();
1108                let desc = IndexDesc {
1109                    on_id: GlobalId::Transient(0),
1110                    key: Default::default(),
1111                };
1112                let typ = ReprRelationType::empty();
1113                (id, (desc, typ))
1114            })
1115            .collect();
1116        let sink_exports = std::iter::once(export_id)
1117            .filter(|s| storage_ids.contains(*s))
1118            .map(|sid| {
1119                let id = sid.parse().unwrap();
1120                let desc = ComputeSinkDesc {
1121                    from: GlobalId::Transient(0),
1122                    from_desc: RelationDesc::empty(),
1123                    connection: ComputeSinkConnection::MaterializedView(
1124                        MaterializedViewSinkConnection {
1125                            value_desc: RelationDesc::empty(),
1126                            storage_metadata: Default::default(),
1127                        },
1128                    ),
1129                    with_snapshot: Default::default(),
1130                    up_to: Default::default(),
1131                    non_null_assertions: Default::default(),
1132                    refresh_schedule: Default::default(),
1133                };
1134                (id, desc)
1135            })
1136            .collect();
1137
1138        DataflowDescription {
1139            source_imports,
1140            index_imports,
1141            objects_to_build: Default::default(),
1142            index_exports,
1143            sink_exports,
1144            as_of: None,
1145            until: Default::default(),
1146            initial_storage_as_of: Default::default(),
1147            refresh_schedule: Default::default(),
1148            debug_name: Default::default(),
1149            time_dependence: None,
1150        }
1151    }
1152
1153    macro_rules! testcase {
1154        ($name:ident, {
1155            storage: { $( $storage_id:literal: ($read:expr, $write:expr), )* },
1156            dataflows: [ $( $export_id:literal <- $inputs:expr => $as_of:expr, )* ],
1157            current_time: $current_time:literal,
1158            $( read_policies: { $( $policy_id:literal: $policy:expr, )* }, )?
1159            $( read_only: $read_only:expr, )?
1160        }) => {
1161            #[mz_ore::test]
1162            fn $name() {
1163                let storage_ids = [$( $storage_id, )*].into();
1164
1165                let storage_frontiers = StorageFrontiers(BTreeMap::from([
1166                    $(
1167                        (
1168                            $storage_id.parse().unwrap(),
1169                            (ts_to_frontier($read), ts_to_frontier($write)),
1170                        ),
1171                    )*
1172                ]));
1173
1174                let mut dataflows = [
1175                    $(
1176                        dataflow($export_id, &$inputs, &storage_ids),
1177                    )*
1178                ];
1179
1180                let read_policies = BTreeMap::from([
1181                    $($( ($policy_id.parse().unwrap(), $policy), )*)?
1182                ]);
1183
1184                #[allow(unused_variables)]
1185                let read_only = false;
1186                $( let read_only = $read_only; )?
1187
1188                super::run(
1189                    &mut dataflows,
1190                    &read_policies,
1191                    &storage_frontiers,
1192                    $current_time.into(),
1193                    read_only,
1194                );
1195
1196                let actual_as_ofs: Vec<_> = dataflows
1197                    .into_iter()
1198                    .map(|d| d.as_of.unwrap())
1199                    .collect();
1200                let expected_as_ofs = [ $( ts_to_frontier($as_of), )* ];
1201
1202                assert_eq!(actual_as_ofs, expected_as_ofs);
1203            }
1204        };
1205    }
1206
1207    testcase!(upstream_storage_constraints, {
1208        storage: {
1209            "s1": (10, 20),
1210            "s2": (20, 30),
1211        },
1212        dataflows: [
1213            "u1" <- ["s1"]       => 10,
1214            "u2" <- ["s2"]       => 20,
1215            "u3" <- ["s1", "s2"] => 20,
1216            "u4" <- ["u1", "u2"] => 20,
1217        ],
1218        current_time: 0,
1219    });
1220
1221    testcase!(downstream_storage_constraints, {
1222        storage: {
1223            "s1": (10, 20),
1224            "u3": (10, 15),
1225            "u4": (10, 13),
1226        },
1227        dataflows: [
1228            "u1" <- ["s1"] => 19,
1229            "u2" <- ["s1"] => 12,
1230            "u3" <- ["u2"] => 14,
1231            "u4" <- ["u2"] => 12,
1232        ],
1233        current_time: 100,
1234    });
1235
1236    testcase!(warmup_constraints, {
1237        storage: {
1238            "s1": (10, 20),
1239            "s2": (10, 30),
1240            "s3": (10, 40),
1241            "s4": (10, 50),
1242        },
1243        dataflows: [
1244            "u1" <- ["s1"]       => 19,
1245            "u2" <- ["s2"]       => 19,
1246            "u3" <- ["s3"]       => 39,
1247            "u4" <- ["s4"]       => 39,
1248            "u5" <- ["u1", "u2"] => 19,
1249            "u6" <- ["u3", "u4"] => 39,
1250        ],
1251        current_time: 100,
1252    });
1253
1254    testcase!(index_read_policy_constraints, {
1255        storage: {
1256            "s1": (10, 20),
1257            "u6": (10, 18),
1258        },
1259        dataflows: [
1260            "u1" <- ["s1"] => 15,
1261            "u2" <- ["s1"] => 10,
1262            "u3" <- ["s1"] => 13,
1263            "u4" <- ["s1"] => 10,
1264            "u5" <- []     => 95,
1265            "u6" <- ["s1"] => 17,
1266        ],
1267        current_time: 100,
1268        read_policies: {
1269            "u1": ReadPolicy::lag_writes_by(5.into(), 1.into()),
1270            "u2": ReadPolicy::lag_writes_by(15.into(), 1.into()),
1271            "u3": ReadPolicy::ValidFrom(Antichain::from_elem(13.into())),
1272            "u4": ReadPolicy::ValidFrom(Antichain::from_elem(5.into())),
1273            "u5": ReadPolicy::lag_writes_by(5.into(), 1.into()),
1274            "u6": ReadPolicy::ValidFrom(Antichain::from_elem(13.into())),
1275        },
1276    });
1277
1278    testcase!(index_current_time_constraints, {
1279        storage: {
1280            "s1": (10, 20),
1281            "s2": (20, 30),
1282            "u4": (10, 12),
1283            "u5": (10, 18),
1284        },
1285        dataflows: [
1286            "u1" <- ["s1"] => 15,
1287            "u2" <- ["s2"] => 20,
1288            "u3" <- ["s1"] => 11,
1289            "u4" <- ["u3"] => 11,
1290            "u5" <- ["s1"] => 17,
1291            "u6" <- []     => 15,
1292        ],
1293        current_time: 15,
1294    });
1295
1296    testcase!(sealed_storage_sink, {
1297        storage: {
1298            "s1": (10, 20),
1299            "u1": (10, SEALED),
1300        },
1301        dataflows: [
1302            "u1" <- ["s1"] => SEALED,
1303        ],
1304        current_time: 100,
1305    });
1306
1307    testcase!(read_only_dropped_storage_inputs, {
1308        storage: {
1309            "s1": (10, 20),
1310            "s2": (SEALED, SEALED),
1311            "u4": (10, 20),
1312        },
1313        dataflows: [
1314            "u1" <- ["s1"] => 15,
1315            "u2" <- ["s2"] => SEALED,
1316            "u3" <- ["s1", "s2"] => SEALED,
1317            "u4" <- ["u2"] => SEALED,
1318        ],
1319        current_time: 15,
1320        read_only: true,
1321    });
1322
1323    // Regression test for database-issues#9273.
1324    testcase!(github_9273, {
1325        storage: {
1326            "s1": (10, 20),
1327            "u3": (14, 15),
1328        },
1329        dataflows: [
1330            "u1" <- ["s1"] => 14,
1331            "u2" <- ["u1"] => 19,
1332            "u3" <- ["u1"] => 14,
1333        ],
1334        current_time: 100,
1335        read_policies: {
1336            "u1": ReadPolicy::lag_writes_by(1.into(), 1.into()),
1337            "u2": ReadPolicy::lag_writes_by(1.into(), 1.into()),
1338        },
1339    });
1340}