Skip to main content

mz_compute_client/
as_of_selection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for selecting as-ofs of compute dataflows during system initialization.
11//!
12//! The functionality implemented here is invoked by the coordinator during its bootstrap process.
13//! Ideally, it would be part of the controller and transparent to the coordinator, but that's
14//! difficult to reconcile with the current controller API. For now, we still make the coordinator
15//! worry about as-of selection but keep the implementation in a compute crate because it really is
16//! a compute implementation concern.
17//!
18//! The as-of selection process takes a list of `DataflowDescription`s, determines compatible
19//! as-ofs for the compute collections they export, and augments the `DataflowDescription`s with
20//! these as-ofs.
21//!
22//! For each compute collection, the as-of selection process keeps an `AsOfBounds` instance that
23//! tracks a lower and an upper bound for the as-of the collection may get assigned. Throughout the
24//! process, a collection's `AsOfBounds` get repeatedly refined, by increasing the lower bound and
25//! decreasing the upper bound. The final upper bound is then used for the collection as-of. Using
26//! the upper bound maximizes the chances of compute reconciliation being effective, and minimizes
27//! the amount of historical data that must be read from the dataflow sources.
28//!
29//! Refinement of `AsOfBounds` is performed by applying `Constraint`s to collections. A
30//! `Constraint` specifies which bound should be refined to which frontier. A `Constraint` may be
31//! "hard" or "soft", which determines how failure to apply it is handled. Failing to apply a hard
32//! constraint is treated as an error, failing to apply a soft constraint is not. If a constraint
33//! fails to apply, the respective `AsOfBounds` are refined as much as possible (to a single
34//! frontier) and marked as "sealed". Subsequent constraint applications against the sealed bounds
35//! are no-ops. This is done to avoid log noise from repeated constraint application failures.
36//!
37//! Note that failing to apply a hard constraint does not abort the as-of selection process for the
38//! affected collection. Instead the failure is handled gracefully by logging an error and
39//! assigning the collection a best-effort as-of. This is done, rather than panicking or returning
40//! an error and letting the coordinator panic, to ensure the availability of the system. Ideally,
41//! we would instead mark the affected dataflow as failed/poisoned, but such a mechanism doesn't
42//! currently exist.
43//!
44//! The as-of selection process applies constraints in order of importance, because once a
45//! constraint application fails, the respective `AsOfBounds` are sealed and later applications
46//! won't have any effect. This means hard constraints must be applied before soft constraints, and
47//! more desirable soft constraints should be applied before less desirable ones.
48//!
49//! # `AsOfBounds` Invariants
50//!
51//! Correctness requires two invariants of `AsOfBounds` of dependent collections:
52//!
53//!  (1) The lower bound of a collection is >= the lower bound of each of its inputs.
54//!  (2) The upper bound of a collection is >= the upper bound of each of its inputs.
55//!
56//! Each step of the as-of selection process needs to ensure that these invariants are upheld once
57//! it completes. The expectation is that each step (a) performs local changes to either the
58//! `lower` _or_ the `upper` bounds of some collections and (b) invokes the appropriate
59//! `propagate_bounds_*` method to restore the invariant broken by (a).
60//!
61//! For steps that behave as described in (a), we can prove that (b) will always succeed in
62//! applying the bounds propagation constraints:
63//!
64//! | Let `A` and `B` be any pair of collections where `A` is an input of `B`.
65//! | Before (a), both invariants are upheld, i.e. `A.lower <= B.lower` and `A.upper <= B.upper`.
66//! |
67//! | Case 1: (a) increases `A.lower` and/or `B.lower` to `A.lower'` and `B.lower'`
68//! |     Invariant (1) might be broken, need to prove that it can be restored.
69//! |     Case 1.a: `A.lower' <= B.lower'`
70//! |         Invariant (1) is still upheld without propagation.
71//! |     Case 1.b: `A.lower' > B.lower'`
72//! |         A collection's lower bound can only be increased up to its upper bound.
73//! |         Therefore, and from invariant (2): `A.lower' <= A.upper <= B.upper`
74//! |         Therefore, propagation can set `B.lower' = A.lower'`, restoring invariant (1).
75//! | Case 2: (a) decreases `A.upper` and/or `B.upper`
76//! |     Invariant (2) might be broken, need to prove that it can be restored.
77//! |     The proof is equivalent to Case 1.
78
79use std::cell::RefCell;
80use std::collections::{BTreeMap, BTreeSet};
81use std::fmt;
82use std::rc::Rc;
83
84use mz_compute_types::dataflows::DataflowDescription;
85use mz_compute_types::plan::Plan;
86use mz_ore::collections::CollectionExt;
87use mz_ore::soft_panic_or_log;
88use mz_repr::{GlobalId, TimestampManipulation};
89use mz_storage_client::storage_collections::StorageCollections;
90use mz_storage_types::read_holds::ReadHold;
91use mz_storage_types::read_policy::ReadPolicy;
92use timely::PartialOrder;
93use timely::progress::{Antichain, Timestamp};
94use tracing::{info, warn};
95
96/// Runs as-of selection for the given dataflows.
97///
98/// Assigns the selected as-of to the provided dataflow descriptions and returns a set of
99/// `ReadHold`s that must not be dropped nor downgraded until the dataflows have been installed
100/// with the compute controller.
101pub fn run<T: TimestampManipulation>(
102    dataflows: &mut [DataflowDescription<Plan<T>, (), T>],
103    read_policies: &BTreeMap<GlobalId, ReadPolicy<T>>,
104    storage_collections: &dyn StorageCollections<Timestamp = T>,
105    current_time: T,
106    read_only_mode: bool,
107) -> BTreeMap<GlobalId, ReadHold<T>> {
108    // Get read holds for the storage inputs of the dataflows.
109    // This ensures that storage frontiers don't advance past the selected as-ofs.
110    let mut storage_read_holds = BTreeMap::new();
111    for dataflow in &*dataflows {
112        for id in dataflow.source_imports.keys() {
113            if !storage_read_holds.contains_key(id) {
114                let read_hold = storage_collections
115                    .acquire_read_holds(vec![*id])
116                    .expect("storage collection exists")
117                    .into_element();
118                storage_read_holds.insert(*id, read_hold);
119            }
120        }
121    }
122
123    let mut ctx = Context::new(dataflows, storage_collections, read_policies, current_time);
124
125    // Dataflows that sink into a storage collection that has advanced to the empty frontier don't
126    // need to be installed at all. So we can apply an optimization where we prune them here and
127    // assign them an empty as-of at the end.
128    ctx.prune_sealed_persist_sinks();
129
130    // During 0dt upgrades, it can happen that the leader environment drops storage collections,
131    // making the read-only environment observe inconsistent read frontiers (database-issues#8836).
132    // To avoid hard-constraint failures, we prune all collections depending on these dropped
133    // storage collections.
134    if read_only_mode {
135        ctx.prune_dropped_collections();
136    }
137
138    // Apply hard constraints from upstream and downstream storage collections.
139    ctx.apply_upstream_storage_constraints(&storage_read_holds);
140    ctx.apply_downstream_storage_constraints();
141
142    // At this point all collections have as-of bounds that reflect what is required for
143    // correctness. The current state isn't very usable though. In particular, most of the upper
144    // bounds are likely to be the empty frontier, so if we'd select as-ofs on this basis, the
145    // resulting dataflows would never hydrate. Instead we'll apply a number of soft constraints to
146    // end up in a better place.
147
148    // Constrain collection as-ofs to times that are currently available in the inputs. This
149    // ensures that dataflows can immediately start hydrating. It also ensures that dataflows don't
150    // get an empty as-of, except when they exclusively depend on constant collections.
151    //
152    // Allowing dataflows to hydrate immediately is desirable. It allows them to complete the most
153    // resource-intensive phase of their lifecycle as early as possible. Ideally we want this phase
154    // to occur during a 0dt upgrade, where we still have the option to roll back if a cluster
155    // doesn't come up successfully. For dataflows with a refresh schedule, hydrating early also
156    // ensures that there isn't a large output delay when the refresh time is reached.
157    ctx.apply_warmup_constraints();
158
159    // Constrain as-ofs of indexes according to their read policies.
160    ctx.apply_index_read_policy_constraints();
161
162    // Constrain as-ofs of indexes to the current time. This ensures that indexes are immediately
163    // readable.
164    ctx.apply_index_current_time_constraints();
165
166    // Apply the derived as-of bounds to the dataflows.
167    for dataflow in dataflows {
168        // `AsOfBounds` are shared between the exports of a dataflow, so looking at just the first
169        // export is sufficient.
170        let first_export = dataflow.export_ids().next();
171        let as_of = first_export.map_or_else(Antichain::new, |id| ctx.best_as_of(id));
172        dataflow.as_of = Some(as_of);
173    }
174
175    storage_read_holds
176}
177
178/// Bounds for possible as-of values of a dataflow.
179#[derive(Debug)]
180struct AsOfBounds<T> {
181    lower: Antichain<T>,
182    upper: Antichain<T>,
183    /// Whether these bounds can still change.
184    sealed: bool,
185}
186
187impl<T: Clone> AsOfBounds<T> {
188    /// Creates an `AsOfBounds` that only allows the given `frontier`.
189    fn single(frontier: Antichain<T>) -> Self {
190        Self {
191            lower: frontier.clone(),
192            upper: frontier,
193            sealed: false,
194        }
195    }
196
197    /// Get the bound of the given type.
198    fn get(&self, type_: BoundType) -> &Antichain<T> {
199        match type_ {
200            BoundType::Lower => &self.lower,
201            BoundType::Upper => &self.upper,
202        }
203    }
204}
205
206impl<T: Timestamp> Default for AsOfBounds<T> {
207    fn default() -> Self {
208        Self {
209            lower: Antichain::from_elem(T::minimum()),
210            upper: Antichain::new(),
211            sealed: false,
212        }
213    }
214}
215
216impl<T: fmt::Debug> fmt::Display for AsOfBounds<T> {
217    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218        write!(
219            f,
220            "[{:?} .. {:?}]",
221            self.lower.elements(),
222            self.upper.elements()
223        )
224    }
225}
226
227/// Types of bounds.
228#[derive(Clone, Copy, Debug, PartialEq, Eq)]
229enum BoundType {
230    Lower,
231    Upper,
232}
233
234impl fmt::Display for BoundType {
235    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236        match self {
237            Self::Lower => f.write_str("lower"),
238            Self::Upper => f.write_str("upper"),
239        }
240    }
241}
242
243/// Types of constraints.
244#[derive(Clone, Copy, Debug, PartialEq, Eq)]
245enum ConstraintType {
246    /// Hard constraints are applied to enforce correctness properties, and failing to apply them is
247    /// an error.
248    Hard,
249    /// Soft constraints are applied to improve performance or UX, and failing to apply them is
250    /// undesirable but not an error.
251    Soft,
252}
253
254/// A constraint that can be applied to the `AsOfBounds` of a collection.
255#[derive(Debug)]
256struct Constraint<'a, T> {
257    type_: ConstraintType,
258    /// Which bound this constraint applies to.
259    bound_type: BoundType,
260    /// The frontier by which the bound should be constrained.
261    frontier: &'a Antichain<T>,
262    /// A short description of the reason for applying this constraint.
263    ///
264    /// Used only for logging.
265    reason: &'a str,
266}
267
268impl<T: Timestamp> Constraint<'_, T> {
269    /// Applies this constraint to the given bounds.
270    ///
271    /// Returns a bool indicating whether the given bounds were changed as a result.
272    ///
273    /// Applying a constraint can fail, if the constraint frontier is incompatible with the
274    /// existing bounds. In this case, the constraint still gets partially applied by moving one of
275    /// the bounds up/down to the other, depending on the `bound_type`.
276    ///
277    /// Applying a constraint to sealed bounds is a no-op.
278    fn apply(&self, bounds: &mut AsOfBounds<T>) -> Result<bool, bool> {
279        if bounds.sealed {
280            return Ok(false);
281        }
282
283        match self.bound_type {
284            BoundType::Lower => {
285                if PartialOrder::less_than(&bounds.upper, self.frontier) {
286                    bounds.sealed = true;
287                    if PartialOrder::less_than(&bounds.lower, &bounds.upper) {
288                        bounds.lower.clone_from(&bounds.upper);
289                        Err(true)
290                    } else {
291                        Err(false)
292                    }
293                } else if PartialOrder::less_equal(self.frontier, &bounds.lower) {
294                    Ok(false)
295                } else {
296                    bounds.lower.clone_from(self.frontier);
297                    Ok(true)
298                }
299            }
300            BoundType::Upper => {
301                if PartialOrder::less_than(self.frontier, &bounds.lower) {
302                    bounds.sealed = true;
303                    if PartialOrder::less_than(&bounds.lower, &bounds.upper) {
304                        bounds.upper.clone_from(&bounds.lower);
305                        Err(true)
306                    } else {
307                        Err(false)
308                    }
309                } else if PartialOrder::less_equal(&bounds.upper, self.frontier) {
310                    Ok(false)
311                } else {
312                    bounds.upper.clone_from(self.frontier);
313                    Ok(true)
314                }
315            }
316        }
317    }
318}
319
320/// State tracked for a compute collection during as-of selection.
321struct Collection<'a, T> {
322    storage_inputs: Vec<GlobalId>,
323    compute_inputs: Vec<GlobalId>,
324    read_policy: Option<&'a ReadPolicy<T>>,
325    /// The currently known as-of bounds.
326    ///
327    /// Shared between collections exported by the same dataflow.
328    bounds: Rc<RefCell<AsOfBounds<T>>>,
329    /// Whether this collection is an index.
330    is_index: bool,
331}
332
333/// The as-of selection context.
334struct Context<'a, T> {
335    collections: BTreeMap<GlobalId, Collection<'a, T>>,
336    storage_collections: &'a dyn StorageCollections<Timestamp = T>,
337    current_time: T,
338}
339
340impl<'a, T: TimestampManipulation> Context<'a, T> {
341    /// Initializes an as-of selection context for the given `dataflows`.
342    fn new(
343        dataflows: &[DataflowDescription<Plan<T>, (), T>],
344        storage_collections: &'a dyn StorageCollections<Timestamp = T>,
345        read_policies: &'a BTreeMap<GlobalId, ReadPolicy<T>>,
346        current_time: T,
347    ) -> Self {
348        // Construct initial collection state for each dataflow export. Dataflows might have their
349        // as-ofs already fixed, which we need to take into account when constructing `AsOfBounds`.
350        let mut collections = BTreeMap::new();
351        for dataflow in dataflows {
352            let storage_inputs: Vec<_> = dataflow.source_imports.keys().copied().collect();
353            let compute_inputs: Vec<_> = dataflow.index_imports.keys().copied().collect();
354
355            let bounds = match dataflow.as_of.clone() {
356                Some(frontier) => AsOfBounds::single(frontier),
357                None => AsOfBounds::default(),
358            };
359            let bounds = Rc::new(RefCell::new(bounds));
360
361            for id in dataflow.export_ids() {
362                let collection = Collection {
363                    storage_inputs: storage_inputs.clone(),
364                    compute_inputs: compute_inputs.clone(),
365                    read_policy: read_policies.get(&id),
366                    bounds: Rc::clone(&bounds),
367                    is_index: dataflow.index_exports.contains_key(&id),
368                };
369                collections.insert(id, collection);
370            }
371        }
372
373        Self {
374            collections,
375            storage_collections,
376            current_time,
377        }
378    }
379
380    /// Returns the state of the identified collection.
381    ///
382    /// # Panics
383    ///
384    /// Panics if the identified collection doesn't exist.
385    fn expect_collection(&self, id: GlobalId) -> &Collection<'_, T> {
386        self.collections
387            .get(&id)
388            .unwrap_or_else(|| panic!("collection missing: {id}"))
389    }
390
391    /// Applies the given as-of constraint to the identified collection.
392    ///
393    /// Returns whether the collection's as-of bounds where changed as a result.
394    fn apply_constraint(&self, id: GlobalId, constraint: Constraint<T>) -> bool {
395        let collection = self.expect_collection(id);
396        let mut bounds = collection.bounds.borrow_mut();
397        match constraint.apply(&mut bounds) {
398            Ok(changed) => {
399                if changed {
400                    info!(%id, %bounds, reason = %constraint.reason, "applied as-of constraint");
401                }
402                changed
403            }
404            Err(changed) => {
405                match constraint.type_ {
406                    ConstraintType::Hard => {
407                        soft_panic_or_log!(
408                            "failed to apply hard as-of constraint \
409                             (id={id}, bounds={bounds}, constraint={constraint:?})"
410                        );
411                    }
412                    ConstraintType::Soft => {
413                        info!(%id, %bounds, ?constraint, "failed to apply soft as-of constraint");
414                    }
415                }
416                changed
417            }
418        }
419    }
420
421    /// Apply as-of constraints imposed by the frontiers of upstream storage collections.
422    ///
423    /// A collection's as-of _must_ be >= the read frontier of each of its (transitive) storage
424    /// inputs.
425    ///
426    /// Failing to apply this constraint to a collection is an error. The affected dataflow will
427    /// not be able to hydrate successfully.
428    fn apply_upstream_storage_constraints(
429        &self,
430        storage_read_holds: &BTreeMap<GlobalId, ReadHold<T>>,
431    ) {
432        // Apply direct constraints from storage inputs.
433        for (id, collection) in &self.collections {
434            for input_id in &collection.storage_inputs {
435                let read_hold = &storage_read_holds[input_id];
436                let constraint = Constraint {
437                    type_: ConstraintType::Hard,
438                    bound_type: BoundType::Lower,
439                    frontier: read_hold.since(),
440                    reason: &format!("storage input {input_id} read frontier"),
441                };
442                self.apply_constraint(*id, constraint);
443            }
444        }
445
446        // Propagate constraints downstream, restoring `AsOfBounds` invariant (1).
447        self.propagate_bounds_downstream(BoundType::Lower);
448    }
449
450    /// Apply as-of constraints imposed by the frontiers of downstream storage collections.
451    ///
452    /// A collection's as-of _must_ be < the write frontier of the storage collection it exports to
453    /// (if any) if it is non-empty, and <= the storage collection's read frontier otherwise.
454    ///
455    /// Rationale:
456    ///
457    /// * A collection's as-of must be <= the write frontier of its dependent storage collection,
458    ///   because we need to pick up computing the contents of storage collections where we left
459    ///   off previously, to avoid skipped times observable in the durable output.
460    /// * Some dataflows feeding into storage collections (specifically: continual tasks) need to
461    ///   be able to observe input changes at times they write to the output. If we selected the
462    ///   as-of to be equal to the write frontier of the output storage collection, we wouldn't be
463    ///   able to produce the correct output at that frontier. Thus the selected as-of must be
464    ///   strictly less than the write frontier.
465    /// * As an exception to the above, if the output storage collection is empty (i.e. its write
466    ///   frontier is <= its read frontier), we need to allow the as-of to be equal to the read
467    ///   frontier. This is correct in the sense that it mirrors the timestamp selection behavior
468    ///   of the sequencer when it created the collection. Chances are that the sequencer chose the
469    ///   initial as-of (and therefore the initial read frontier of the storage collection) as the
470    ///   smallest possible time that can still be read from the collection inputs, so forcing the
471    ///   upper bound any lower than that read frontier would produce a hard constraint violation.
472    ///
473    /// Failing to apply this constraint to a collection is an error. The storage collection it
474    /// exports to may have times visible to readers skipped in its output, violating correctness.
475    fn apply_downstream_storage_constraints(&self) {
476        // Apply direct constraints from storage exports.
477        for id in self.collections.keys() {
478            let Ok(frontiers) = self.storage_collections.collection_frontiers(*id) else {
479                continue;
480            };
481
482            let collection_empty =
483                PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
484            let upper = if collection_empty {
485                frontiers.read_capabilities
486            } else {
487                Antichain::from_iter(
488                    frontiers
489                        .write_frontier
490                        .iter()
491                        .map(|t| t.step_back().unwrap_or_else(T::minimum)),
492                )
493            };
494
495            let constraint = Constraint {
496                type_: ConstraintType::Hard,
497                bound_type: BoundType::Upper,
498                frontier: &upper,
499                reason: &format!("storage export {id} write frontier"),
500            };
501            self.apply_constraint(*id, constraint);
502        }
503
504        // Propagate constraints upstream, restoring `AsOfBounds` invariant (2).
505        self.propagate_bounds_upstream(BoundType::Upper);
506    }
507
508    /// Apply as-of constraints to ensure collections can hydrate immediately.
509    ///
510    /// A collection's as-of _should_ be < the write frontier of each of its (transitive) storage
511    /// inputs.
512    ///
513    /// Failing to apply this constraint is not an error. The affected dataflow will not be able to
514    /// hydrate immediately, but it will be able to hydrate once its inputs have sufficiently
515    /// advanced.
516    fn apply_warmup_constraints(&self) {
517        // Collect write frontiers from storage inputs.
518        let mut write_frontiers = BTreeMap::new();
519        for (id, collection) in &self.collections {
520            let storage_frontiers = self
521                .storage_collections
522                .collections_frontiers(collection.storage_inputs.clone())
523                .expect("storage collections exist");
524
525            let mut write_frontier = Antichain::new();
526            for frontiers in storage_frontiers {
527                write_frontier.extend(frontiers.write_frontier);
528            }
529
530            write_frontiers.insert(*id, write_frontier);
531        }
532
533        // Propagate write frontiers through compute inputs.
534        fixpoint(|changed| {
535            for (id, collection) in &self.collections {
536                let mut write_frontier = write_frontiers.remove(id).expect("inserted above");
537                for input_id in &collection.compute_inputs {
538                    let frontier = &write_frontiers[input_id];
539                    *changed |= write_frontier.extend(frontier.iter().cloned());
540                }
541                write_frontiers.insert(*id, write_frontier);
542            }
543        });
544
545        // Apply the warmup constraint.
546        for (id, write_frontier) in write_frontiers {
547            let upper = step_back_frontier(&write_frontier);
548            let constraint = Constraint {
549                type_: ConstraintType::Soft,
550                bound_type: BoundType::Upper,
551                frontier: &upper,
552                reason: &format!(
553                    "warmup frontier derived from storage write frontier {:?}",
554                    write_frontier.elements()
555                ),
556            };
557            self.apply_constraint(id, constraint);
558        }
559
560        // Restore `AsOfBounds` invariant (2).
561        self.propagate_bounds_upstream(BoundType::Upper);
562    }
563
564    /// Apply as-of constraints to ensure indexes contain historical data as requested by their
565    /// associated read policies.
566    ///
567    /// An index's as-of _should_ be <= the frontier determined by its read policy applied to its
568    /// write frontier.
569    ///
570    /// Failing to apply this constraint is not an error. The affected index will not contain
571    /// historical times for its entire compaction window initially, but will do so once sufficient
572    /// time has passed.
573    fn apply_index_read_policy_constraints(&self) {
574        // For the write frontier of an index, we'll use the least write frontier of its
575        // (transitive) storage inputs. This is an upper bound for the write frontier the index
576        // could have had before the restart. For indexes without storage inputs we use the current
577        // time.
578
579        // Collect write frontiers from storage inputs.
580        let mut write_frontiers = BTreeMap::new();
581        for (id, collection) in &self.collections {
582            let storage_frontiers = self
583                .storage_collections
584                .collections_frontiers(collection.storage_inputs.clone())
585                .expect("storage collections exist");
586
587            let mut write_frontier = Antichain::new();
588            for frontiers in storage_frontiers {
589                write_frontier.extend(frontiers.write_frontier);
590            }
591
592            write_frontiers.insert(*id, write_frontier);
593        }
594
595        // Propagate write frontiers through compute inputs.
596        fixpoint(|changed| {
597            for (id, collection) in &self.collections {
598                let mut write_frontier = write_frontiers.remove(id).expect("inserted above");
599                for input_id in &collection.compute_inputs {
600                    let frontier = &write_frontiers[input_id];
601                    *changed |= write_frontier.extend(frontier.iter().cloned());
602                }
603                write_frontiers.insert(*id, write_frontier);
604            }
605        });
606
607        // Apply the read policy constraint to indexes.
608        for (id, collection) in &self.collections {
609            if let (true, Some(read_policy)) = (collection.is_index, &collection.read_policy) {
610                let mut write_frontier = write_frontiers.remove(id).expect("inserted above");
611                if write_frontier.is_empty() {
612                    write_frontier = Antichain::from_elem(self.current_time.clone());
613                }
614                let upper = read_policy.frontier(write_frontier.borrow());
615                let constraint = Constraint {
616                    type_: ConstraintType::Soft,
617                    bound_type: BoundType::Upper,
618                    frontier: &upper,
619                    reason: &format!(
620                        "read policy applied to write frontier {:?}",
621                        write_frontier.elements()
622                    ),
623                };
624                self.apply_constraint(*id, constraint);
625            }
626        }
627
628        // Restore `AsOfBounds` invariant (2).
629        self.propagate_bounds_upstream(BoundType::Upper);
630    }
631
632    /// Apply as-of constraints to ensure indexes are immediately readable.
633    ///
634    /// An index's as-of _should_ be <= the current time.
635    ///
636    /// Failing to apply this constraint is not an error. The affected index will not be readable
637    /// immediately, but will be readable once sufficient time has passed.
638    fn apply_index_current_time_constraints(&self) {
639        // Apply the current time constraint to indexes.
640        let upper = Antichain::from_elem(self.current_time.clone());
641        for (id, collection) in &self.collections {
642            if collection.is_index {
643                let constraint = Constraint {
644                    type_: ConstraintType::Soft,
645                    bound_type: BoundType::Upper,
646                    frontier: &upper,
647                    reason: "index current time",
648                };
649                self.apply_constraint(*id, constraint);
650            }
651        }
652
653        // Restore `AsOfBounds` invariant (2).
654        self.propagate_bounds_upstream(BoundType::Upper);
655    }
656
657    /// Propagate as-of bounds through the dependency graph, in downstream direction.
658    fn propagate_bounds_downstream(&self, bound_type: BoundType) {
659        // Propagating `lower` bounds downstream restores `AsOfBounds` invariant (1) and must
660        // therefore always succeed.
661        let constraint_type = match bound_type {
662            BoundType::Lower => ConstraintType::Hard,
663            BoundType::Upper => ConstraintType::Soft,
664        };
665
666        // We don't want to rely on a correspondence between `GlobalId` order and dependency order,
667        // so we use a fixpoint loop here.
668        fixpoint(|changed| {
669            self.propagate_bounds_downstream_inner(bound_type, constraint_type, changed);
670
671            // Propagating `upper` bounds downstream might break `AsOfBounds` invariant (2), so we
672            // need to restore it.
673            if bound_type == BoundType::Upper {
674                self.propagate_bounds_upstream_inner(
675                    BoundType::Upper,
676                    ConstraintType::Hard,
677                    changed,
678                );
679            }
680        });
681    }
682
683    fn propagate_bounds_downstream_inner(
684        &self,
685        bound_type: BoundType,
686        constraint_type: ConstraintType,
687        changed: &mut bool,
688    ) {
689        for (id, collection) in &self.collections {
690            for input_id in &collection.compute_inputs {
691                let input_collection = self.expect_collection(*input_id);
692                let bounds = input_collection.bounds.borrow();
693                let constraint = Constraint {
694                    type_: constraint_type,
695                    bound_type,
696                    frontier: bounds.get(bound_type),
697                    reason: &format!("upstream {input_id} {bound_type} as-of bound"),
698                };
699                *changed |= self.apply_constraint(*id, constraint);
700            }
701        }
702    }
703
704    /// Propagate as-of bounds through the dependency graph, in upstream direction.
705    fn propagate_bounds_upstream(&self, bound_type: BoundType) {
706        // Propagating `upper` bounds upstream restores `AsOfBounds` invariant (2) and must
707        // therefore always succeed.
708        let constraint_type = match bound_type {
709            BoundType::Lower => ConstraintType::Soft,
710            BoundType::Upper => ConstraintType::Hard,
711        };
712
713        // We don't want to rely on a correspondence between `GlobalId` order and dependency order,
714        // so we use a fixpoint loop here.
715        fixpoint(|changed| {
716            self.propagate_bounds_upstream_inner(bound_type, constraint_type, changed);
717
718            // Propagating `lower` bounds upstream might break `AsOfBounds` invariant (1), so we
719            // need to restore it.
720            if bound_type == BoundType::Lower {
721                self.propagate_bounds_downstream_inner(
722                    BoundType::Lower,
723                    ConstraintType::Hard,
724                    changed,
725                );
726            }
727        });
728    }
729
730    fn propagate_bounds_upstream_inner(
731        &self,
732        bound_type: BoundType,
733        constraint_type: ConstraintType,
734        changed: &mut bool,
735    ) {
736        for (id, collection) in self.collections.iter().rev() {
737            let bounds = collection.bounds.borrow();
738            for input_id in &collection.compute_inputs {
739                let constraint = Constraint {
740                    type_: constraint_type,
741                    bound_type,
742                    frontier: bounds.get(bound_type),
743                    reason: &format!("downstream {id} {bound_type} as-of bound"),
744                };
745                *changed |= self.apply_constraint(*input_id, constraint);
746            }
747        }
748    }
749
750    /// Selects the "best" as-of for the identified collection, based on its currently known
751    /// bounds.
752    ///
753    /// We simply use the upper bound here, to maximize the chances of compute reconciliation
754    /// succeeding. Choosing the latest possible as-of also minimizes the amount of work the
755    /// dataflow has to spend processing historical data from its sources.
756    fn best_as_of(&self, id: GlobalId) -> Antichain<T> {
757        if let Some(collection) = self.collections.get(&id) {
758            let bounds = collection.bounds.borrow();
759            bounds.upper.clone()
760        } else {
761            Antichain::new()
762        }
763    }
764
765    /// Removes collections that sink into sealed persist shards from the context.
766    ///
767    /// The dataflows of these collections will get an empty default as-of assigned at the end of
768    /// the as-of selection process, ensuring that they won't get installed unnecessarily.
769    ///
770    /// Note that it is valid to remove these collections from consideration because they don't
771    /// impose as-of constraints on other compute collections.
772    fn prune_sealed_persist_sinks(&mut self) {
773        self.collections.retain(|id, _| {
774            self.storage_collections
775                .collection_frontiers(*id)
776                .map_or(true, |f| !f.write_frontier.is_empty())
777        });
778    }
779
780    /// Removes collections depending on storage collections with empty read frontiers.
781    ///
782    /// The dataflows of these collections will get an empty default as-of assigned at the end of
783    /// the as-of selection process, ensuring that they won't get installed.
784    ///
785    /// This exists only to work around database-issues#8836.
786    fn prune_dropped_collections(&mut self) {
787        // Remove collections with dropped storage inputs.
788        let mut pruned = BTreeSet::new();
789        self.collections.retain(|id, c| {
790            let input_dropped = c.storage_inputs.iter().any(|id| {
791                let frontiers = self
792                    .storage_collections
793                    .collection_frontiers(*id)
794                    .expect("storage collection exists");
795                frontiers.read_capabilities.is_empty()
796            });
797
798            if input_dropped {
799                pruned.insert(*id);
800                false
801            } else {
802                true
803            }
804        });
805
806        warn!(?pruned, "pruned dependants of dropped storage collections");
807
808        // Remove (transitive) dependants of pruned collections.
809        while !pruned.is_empty() {
810            let pruned_inputs = std::mem::take(&mut pruned);
811
812            self.collections.retain(|id, c| {
813                if c.compute_inputs.iter().any(|id| pruned_inputs.contains(id)) {
814                    pruned.insert(*id);
815                    false
816                } else {
817                    true
818                }
819            });
820
821            warn!(?pruned, "pruned collections with pruned inputs");
822        }
823    }
824}
825
826/// Runs `step` in a loop until it stops reporting changes.
827fn fixpoint(mut step: impl FnMut(&mut bool)) {
828    loop {
829        let mut changed = false;
830        step(&mut changed);
831        if !changed {
832            break;
833        }
834    }
835}
836
837/// Step back the given frontier.
838///
839/// This method is saturating: If the frontier contains `T::minimum()` times, these are kept
840/// unchanged.
841fn step_back_frontier<T: TimestampManipulation>(frontier: &Antichain<T>) -> Antichain<T> {
842    frontier
843        .iter()
844        .map(|t| t.step_back().unwrap_or_else(T::minimum))
845        .collect()
846}
847
848#[cfg(test)]
849mod tests {
850    use std::collections::BTreeSet;
851
852    use async_trait::async_trait;
853    use differential_dataflow::lattice::Lattice;
854    use futures::future::BoxFuture;
855    use futures::stream::BoxStream;
856    use mz_compute_types::dataflows::{IndexDesc, IndexImport};
857    use mz_compute_types::sinks::ComputeSinkConnection;
858    use mz_compute_types::sinks::ComputeSinkDesc;
859    use mz_compute_types::sinks::MaterializedViewSinkConnection;
860    use mz_compute_types::sources::SourceInstanceArguments;
861    use mz_compute_types::sources::SourceInstanceDesc;
862    use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
863    use mz_persist_types::{Codec64, ShardId};
864    use mz_repr::Timestamp;
865    use mz_repr::{RelationDesc, RelationVersion, Row, SqlRelationType};
866    use mz_storage_client::client::TimestamplessUpdateBuilder;
867    use mz_storage_client::controller::{CollectionDescription, StorageMetadata, StorageTxn};
868    use mz_storage_client::storage_collections::{CollectionFrontiers, SnapshotCursor};
869    use mz_storage_types::StorageDiff;
870    use mz_storage_types::controller::{CollectionMetadata, StorageError};
871    use mz_storage_types::errors::CollectionMissing;
872    use mz_storage_types::parameters::StorageParameters;
873    use mz_storage_types::sources::SourceData;
874    use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
875    use timely::progress::Timestamp as TimelyTimestamp;
876
877    use super::*;
878
879    const SEALED: u64 = 0x5ea1ed;
880
881    fn ts_to_frontier(ts: u64) -> Antichain<Timestamp> {
882        if ts == SEALED {
883            Antichain::new()
884        } else {
885            Antichain::from_elem(ts.into())
886        }
887    }
888
889    #[derive(Debug)]
890    struct StorageFrontiers(BTreeMap<GlobalId, (Antichain<Timestamp>, Antichain<Timestamp>)>);
891
892    #[async_trait]
893    impl StorageCollections for StorageFrontiers {
894        type Timestamp = Timestamp;
895
896        async fn initialize_state(
897            &self,
898            _txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
899            _init_ids: BTreeSet<GlobalId>,
900        ) -> Result<(), StorageError<Self::Timestamp>> {
901            unimplemented!()
902        }
903
904        fn update_parameters(&self, _config_params: StorageParameters) {
905            unimplemented!()
906        }
907
908        fn collection_metadata(
909            &self,
910            _id: GlobalId,
911        ) -> Result<CollectionMetadata, CollectionMissing> {
912            unimplemented!()
913        }
914
915        fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
916            unimplemented!()
917        }
918
919        fn collections_frontiers(
920            &self,
921            ids: Vec<GlobalId>,
922        ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, CollectionMissing> {
923            let mut frontiers = Vec::with_capacity(ids.len());
924            for id in ids {
925                let (read, write) = self.0.get(&id).ok_or(CollectionMissing(id))?;
926                frontiers.push(CollectionFrontiers {
927                    id,
928                    write_frontier: write.clone(),
929                    implied_capability: read.clone(),
930                    read_capabilities: read.clone(),
931                })
932            }
933            Ok(frontiers)
934        }
935
936        fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
937            unimplemented!()
938        }
939
940        fn check_exists(&self, _id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
941            unimplemented!()
942        }
943
944        async fn snapshot_stats(
945            &self,
946            _id: GlobalId,
947            _as_of: Antichain<Self::Timestamp>,
948        ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
949            unimplemented!()
950        }
951
952        async fn snapshot_parts_stats(
953            &self,
954            _id: GlobalId,
955            _as_of: Antichain<Self::Timestamp>,
956        ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
957            unimplemented!()
958        }
959
960        fn snapshot(
961            &self,
962            _id: GlobalId,
963            _as_of: Self::Timestamp,
964        ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>
965        {
966            unimplemented!()
967        }
968
969        async fn snapshot_latest(
970            &self,
971            _id: GlobalId,
972        ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
973            unimplemented!()
974        }
975
976        fn snapshot_cursor(
977            &self,
978            _id: GlobalId,
979            _as_of: Self::Timestamp,
980        ) -> BoxFuture<
981            'static,
982            Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>,
983        >
984        where
985            Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
986        {
987            unimplemented!()
988        }
989
990        fn snapshot_and_stream(
991            &self,
992            _id: GlobalId,
993            _as_of: Self::Timestamp,
994        ) -> BoxFuture<
995            'static,
996            Result<
997                BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
998                StorageError<Self::Timestamp>,
999            >,
1000        > {
1001            unimplemented!()
1002        }
1003
1004        /// Create a [`TimestamplessUpdateBuilder`] that can be used to stage
1005        /// updates for the provided [`GlobalId`].
1006        fn create_update_builder(
1007            &self,
1008            _id: GlobalId,
1009        ) -> BoxFuture<
1010            'static,
1011            Result<
1012                TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
1013                StorageError<Self::Timestamp>,
1014            >,
1015        >
1016        where
1017            Self::Timestamp: Lattice + Codec64,
1018        {
1019            unimplemented!()
1020        }
1021
1022        async fn prepare_state(
1023            &self,
1024            _txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
1025            _ids_to_add: BTreeSet<GlobalId>,
1026            _ids_to_drop: BTreeSet<GlobalId>,
1027            _ids_to_register: BTreeMap<GlobalId, ShardId>,
1028        ) -> Result<(), StorageError<Self::Timestamp>> {
1029            unimplemented!()
1030        }
1031
1032        async fn create_collections_for_bootstrap(
1033            &self,
1034            _storage_metadata: &StorageMetadata,
1035            _register_ts: Option<Self::Timestamp>,
1036            _collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
1037            _migrated_storage_collections: &BTreeSet<GlobalId>,
1038        ) -> Result<(), StorageError<Self::Timestamp>> {
1039            unimplemented!()
1040        }
1041
1042        async fn alter_table_desc(
1043            &self,
1044            _existing_collection: GlobalId,
1045            _new_collection: GlobalId,
1046            _new_desc: RelationDesc,
1047            _expected_version: RelationVersion,
1048        ) -> Result<(), StorageError<Self::Timestamp>> {
1049            unimplemented!()
1050        }
1051
1052        fn drop_collections_unvalidated(
1053            &self,
1054            _storage_metadata: &StorageMetadata,
1055            _identifiers: Vec<GlobalId>,
1056        ) {
1057            unimplemented!()
1058        }
1059
1060        fn set_read_policies(&self, _policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
1061            unimplemented!()
1062        }
1063
1064        fn acquire_read_holds(
1065            &self,
1066            desired_holds: Vec<GlobalId>,
1067        ) -> Result<Vec<ReadHold<Self::Timestamp>>, CollectionMissing> {
1068            let mut holds = Vec::with_capacity(desired_holds.len());
1069            for id in desired_holds {
1070                let (read, _write) = self.0.get(&id).ok_or(CollectionMissing(id))?;
1071                let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
1072                holds.push(ReadHold::with_channel(id, read.clone(), tx));
1073            }
1074            Ok(holds)
1075        }
1076
1077        fn determine_time_dependence(
1078            &self,
1079            _id: GlobalId,
1080        ) -> Result<Option<TimeDependence>, TimeDependenceError> {
1081            unimplemented!()
1082        }
1083
1084        fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
1085            unimplemented!()
1086        }
1087    }
1088
1089    fn dataflow(
1090        export_id: &str,
1091        input_ids: &[&str],
1092        storage_ids: &BTreeSet<&str>,
1093    ) -> DataflowDescription<Plan> {
1094        let source_imports = input_ids
1095            .iter()
1096            .filter(|s| storage_ids.contains(*s))
1097            .map(|s| {
1098                let id = s.parse().unwrap();
1099                let desc = SourceInstanceDesc {
1100                    arguments: SourceInstanceArguments {
1101                        operators: Default::default(),
1102                    },
1103                    storage_metadata: Default::default(),
1104                    typ: SqlRelationType::empty(),
1105                };
1106                (
1107                    id,
1108                    mz_compute_types::dataflows::SourceImport {
1109                        desc,
1110                        monotonic: Default::default(),
1111                        with_snapshot: true,
1112                        upper: Default::default(),
1113                    },
1114                )
1115            })
1116            .collect();
1117        let index_imports = input_ids
1118            .iter()
1119            .filter(|s| !storage_ids.contains(*s))
1120            .map(|s| {
1121                let id = s.parse().unwrap();
1122                let import = IndexImport {
1123                    desc: IndexDesc {
1124                        on_id: GlobalId::Transient(0),
1125                        key: Default::default(),
1126                    },
1127                    typ: SqlRelationType::empty(),
1128                    monotonic: Default::default(),
1129                    with_snapshot: true,
1130                };
1131                (id, import)
1132            })
1133            .collect();
1134        let index_exports = std::iter::once(export_id)
1135            .filter(|s| !storage_ids.contains(*s))
1136            .map(|sid| {
1137                let id = sid.parse().unwrap();
1138                let desc = IndexDesc {
1139                    on_id: GlobalId::Transient(0),
1140                    key: Default::default(),
1141                };
1142                let typ = SqlRelationType::empty();
1143                (id, (desc, typ))
1144            })
1145            .collect();
1146        let sink_exports = std::iter::once(export_id)
1147            .filter(|s| storage_ids.contains(*s))
1148            .map(|sid| {
1149                let id = sid.parse().unwrap();
1150                let desc = ComputeSinkDesc {
1151                    from: GlobalId::Transient(0),
1152                    from_desc: RelationDesc::empty(),
1153                    connection: ComputeSinkConnection::MaterializedView(
1154                        MaterializedViewSinkConnection {
1155                            value_desc: RelationDesc::empty(),
1156                            storage_metadata: Default::default(),
1157                        },
1158                    ),
1159                    with_snapshot: Default::default(),
1160                    up_to: Default::default(),
1161                    non_null_assertions: Default::default(),
1162                    refresh_schedule: Default::default(),
1163                };
1164                (id, desc)
1165            })
1166            .collect();
1167
1168        DataflowDescription {
1169            source_imports,
1170            index_imports,
1171            objects_to_build: Default::default(),
1172            index_exports,
1173            sink_exports,
1174            as_of: None,
1175            until: Default::default(),
1176            initial_storage_as_of: Default::default(),
1177            refresh_schedule: Default::default(),
1178            debug_name: Default::default(),
1179            time_dependence: None,
1180        }
1181    }
1182
1183    macro_rules! testcase {
1184        ($name:ident, {
1185            storage: { $( $storage_id:literal: ($read:expr, $write:expr), )* },
1186            dataflows: [ $( $export_id:literal <- $inputs:expr => $as_of:expr, )* ],
1187            current_time: $current_time:literal,
1188            $( read_policies: { $( $policy_id:literal: $policy:expr, )* }, )?
1189            $( read_only: $read_only:expr, )?
1190        }) => {
1191            #[mz_ore::test]
1192            fn $name() {
1193                let storage_ids = [$( $storage_id, )*].into();
1194
1195                let storage_frontiers = StorageFrontiers(BTreeMap::from([
1196                    $(
1197                        (
1198                            $storage_id.parse().unwrap(),
1199                            (ts_to_frontier($read), ts_to_frontier($write)),
1200                        ),
1201                    )*
1202                ]));
1203
1204                let mut dataflows = [
1205                    $(
1206                        dataflow($export_id, &$inputs, &storage_ids),
1207                    )*
1208                ];
1209
1210                let read_policies = BTreeMap::from([
1211                    $($( ($policy_id.parse().unwrap(), $policy), )*)?
1212                ]);
1213
1214                #[allow(unused_variables)]
1215                let read_only = false;
1216                $( let read_only = $read_only; )?
1217
1218                super::run(
1219                    &mut dataflows,
1220                    &read_policies,
1221                    &storage_frontiers,
1222                    $current_time.into(),
1223                    read_only,
1224                );
1225
1226                let actual_as_ofs: Vec<_> = dataflows
1227                    .into_iter()
1228                    .map(|d| d.as_of.unwrap())
1229                    .collect();
1230                let expected_as_ofs = [ $( ts_to_frontier($as_of), )* ];
1231
1232                assert_eq!(actual_as_ofs, expected_as_ofs);
1233            }
1234        };
1235    }
1236
1237    testcase!(upstream_storage_constraints, {
1238        storage: {
1239            "s1": (10, 20),
1240            "s2": (20, 30),
1241        },
1242        dataflows: [
1243            "u1" <- ["s1"]       => 10,
1244            "u2" <- ["s2"]       => 20,
1245            "u3" <- ["s1", "s2"] => 20,
1246            "u4" <- ["u1", "u2"] => 20,
1247        ],
1248        current_time: 0,
1249    });
1250
1251    testcase!(downstream_storage_constraints, {
1252        storage: {
1253            "s1": (10, 20),
1254            "u3": (10, 15),
1255            "u4": (10, 13),
1256        },
1257        dataflows: [
1258            "u1" <- ["s1"] => 19,
1259            "u2" <- ["s1"] => 12,
1260            "u3" <- ["u2"] => 14,
1261            "u4" <- ["u2"] => 12,
1262        ],
1263        current_time: 100,
1264    });
1265
1266    testcase!(warmup_constraints, {
1267        storage: {
1268            "s1": (10, 20),
1269            "s2": (10, 30),
1270            "s3": (10, 40),
1271            "s4": (10, 50),
1272        },
1273        dataflows: [
1274            "u1" <- ["s1"]       => 19,
1275            "u2" <- ["s2"]       => 19,
1276            "u3" <- ["s3"]       => 39,
1277            "u4" <- ["s4"]       => 39,
1278            "u5" <- ["u1", "u2"] => 19,
1279            "u6" <- ["u3", "u4"] => 39,
1280        ],
1281        current_time: 100,
1282    });
1283
1284    testcase!(index_read_policy_constraints, {
1285        storage: {
1286            "s1": (10, 20),
1287            "u6": (10, 18),
1288        },
1289        dataflows: [
1290            "u1" <- ["s1"] => 15,
1291            "u2" <- ["s1"] => 10,
1292            "u3" <- ["s1"] => 13,
1293            "u4" <- ["s1"] => 10,
1294            "u5" <- []     => 95,
1295            "u6" <- ["s1"] => 17,
1296        ],
1297        current_time: 100,
1298        read_policies: {
1299            "u1": ReadPolicy::lag_writes_by(5.into(), 1.into()),
1300            "u2": ReadPolicy::lag_writes_by(15.into(), 1.into()),
1301            "u3": ReadPolicy::ValidFrom(Antichain::from_elem(13.into())),
1302            "u4": ReadPolicy::ValidFrom(Antichain::from_elem(5.into())),
1303            "u5": ReadPolicy::lag_writes_by(5.into(), 1.into()),
1304            "u6": ReadPolicy::ValidFrom(Antichain::from_elem(13.into())),
1305        },
1306    });
1307
1308    testcase!(index_current_time_constraints, {
1309        storage: {
1310            "s1": (10, 20),
1311            "s2": (20, 30),
1312            "u4": (10, 12),
1313            "u5": (10, 18),
1314        },
1315        dataflows: [
1316            "u1" <- ["s1"] => 15,
1317            "u2" <- ["s2"] => 20,
1318            "u3" <- ["s1"] => 11,
1319            "u4" <- ["u3"] => 11,
1320            "u5" <- ["s1"] => 17,
1321            "u6" <- []     => 15,
1322        ],
1323        current_time: 15,
1324    });
1325
1326    testcase!(sealed_storage_sink, {
1327        storage: {
1328            "s1": (10, 20),
1329            "u1": (10, SEALED),
1330        },
1331        dataflows: [
1332            "u1" <- ["s1"] => SEALED,
1333        ],
1334        current_time: 100,
1335    });
1336
1337    testcase!(read_only_dropped_storage_inputs, {
1338        storage: {
1339            "s1": (10, 20),
1340            "s2": (SEALED, SEALED),
1341            "u4": (10, 20),
1342        },
1343        dataflows: [
1344            "u1" <- ["s1"] => 15,
1345            "u2" <- ["s2"] => SEALED,
1346            "u3" <- ["s1", "s2"] => SEALED,
1347            "u4" <- ["u2"] => SEALED,
1348        ],
1349        current_time: 15,
1350        read_only: true,
1351    });
1352
1353    // Regression test for database-issues#9273.
1354    testcase!(github_9273, {
1355        storage: {
1356            "s1": (10, 20),
1357            "u3": (14, 15),
1358        },
1359        dataflows: [
1360            "u1" <- ["s1"] => 14,
1361            "u2" <- ["u1"] => 19,
1362            "u3" <- ["u1"] => 14,
1363        ],
1364        current_time: 100,
1365        read_policies: {
1366            "u1": ReadPolicy::lag_writes_by(1.into(), 1.into()),
1367            "u2": ReadPolicy::lag_writes_by(1.into(), 1.into()),
1368        },
1369    });
1370}