mz_compute_client/
as_of_selection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for selecting as-ofs of compute dataflows during system initialization.
11//!
12//! The functionality implemented here is invoked by the coordinator during its bootstrap process.
13//! Ideally, it would be part of the controller and transparent to the coordinator, but that's
14//! difficult to reconcile with the current controller API. For now, we still make the coordinator
15//! worry about as-of selection but keep the implementation in a compute crate because it really is
16//! a compute implementation concern.
17//!
18//! The as-of selection process takes a list of `DataflowDescription`s, determines compatible
19//! as-ofs for the compute collections they export, and augments the `DataflowDescription`s with
20//! these as-ofs.
21//!
22//! For each compute collection, the as-of selection process keeps an `AsOfBounds` instance that
23//! tracks a lower and an upper bound for the as-of the collection may get assigned. Throughout the
24//! process, a collection's `AsOfBounds` get repeatedly refined, by increasing the lower bound and
25//! decreasing the upper bound. The final upper bound is then used for the collection as-of. Using
26//! the upper bound maximizes the chances of compute reconciliation being effective, and minimizes
27//! the amount of historical data that must be read from the dataflow sources.
28//!
29//! Refinement of `AsOfBounds` is performed by applying `Constraint`s to collections. A
30//! `Constraint` specifies which bound should be refined to which frontier. A `Constraint` may be
31//! "hard" or "soft", which determines how failure to apply it is handled. Failing to apply a hard
32//! constraint is treated as an error, failing to apply a soft constraint is not. If a constraint
33//! fails to apply, the respective `AsOfBounds` are refined as much as possible (to a single
34//! frontier) and marked as "sealed". Subsequent constraint applications against the sealed bounds
35//! are no-ops. This is done to avoid log noise from repeated constraint application failures.
36//!
37//! Note that failing to apply a hard constraint does not abort the as-of selection process for the
38//! affected collection. Instead the failure is handled gracefully by logging an error and
39//! assigning the collection a best-effort as-of. This is done, rather than panicking or returning
40//! an error and letting the coordinator panic, to ensure the availability of the system. Ideally,
41//! we would instead mark the affected dataflow as failed/poisoned, but such a mechanism doesn't
42//! currently exist.
43//!
44//! The as-of selection process applies constraints in order of importance, because once a
45//! constraint application fails, the respective `AsOfBounds` are sealed and later applications
46//! won't have any effect. This means hard constraints must be applied before soft constraints, and
47//! more desirable soft constraints should be applied before less desirable ones.
48//!
49//! # `AsOfBounds` Invariants
50//!
51//! Correctness requires two invariants of `AsOfBounds` of dependent collections:
52//!
53//!  (1) The lower bound of a collection is >= the lower bound of each of its inputs.
54//!  (2) The upper bound of a collection is >= the upper bound of each of its inputs.
55//!
56//! Each step of the as-of selection process needs to ensure that these invariants are upheld once
57//! it completes. The expectation is that each step (a) performs local changes to either the
58//! `lower` _or_ the `upper` bounds of some collections and (b) invokes the appropriate
59//! `propagate_bounds_*` method to restore the invariant broken by (a).
60//!
61//! For steps that behave as described in (a), we can prove that (b) will always succeed in
62//! applying the bounds propagation constraints:
63//!
64//! | Let `A` and `B` be any pair of collections where `A` is an input of `B`.
65//! | Before (a), both invariants are upheld, i.e. `A.lower <= B.lower` and `A.upper <= B.upper`.
66//! |
67//! | Case 1: (a) increases `A.lower` and/or `B.lower` to `A.lower'` and `B.lower'`
68//! |     Invariant (1) might be broken, need to prove that it can be restored.
69//! |     Case 1.a: `A.lower' <= B.lower'`
70//! |         Invariant (1) is still upheld without propagation.
71//! |     Case 1.b: `A.lower' > B.lower'`
72//! |         A collection's lower bound can only be increased up to its upper bound.
73//! |         Therefore, and from invariant (2): `A.lower' <= A.upper <= B.upper`
74//! |         Therefore, propagation can set `B.lower' = A.lower'`, restoring invariant (1).
75//! | Case 2: (a) decreases `A.upper` and/or `B.upper`
76//! |     Invariant (2) might be broken, need to prove that it can be restored.
77//! |     The proof is equivalent to Case 1.
78
79use std::cell::RefCell;
80use std::collections::{BTreeMap, BTreeSet};
81use std::fmt;
82use std::rc::Rc;
83
84use mz_compute_types::dataflows::DataflowDescription;
85use mz_compute_types::plan::Plan;
86use mz_ore::collections::CollectionExt;
87use mz_ore::soft_panic_or_log;
88use mz_repr::{GlobalId, TimestampManipulation};
89use mz_storage_client::storage_collections::StorageCollections;
90use mz_storage_types::read_holds::ReadHold;
91use mz_storage_types::read_policy::ReadPolicy;
92use timely::PartialOrder;
93use timely::progress::{Antichain, Timestamp};
94use tracing::{info, warn};
95
96/// Runs as-of selection for the given dataflows.
97///
98/// Assigns the selected as-of to the provided dataflow descriptions and returns a set of
99/// `ReadHold`s that must not be dropped nor downgraded until the dataflows have been installed
100/// with the compute controller.
101pub fn run<T: TimestampManipulation>(
102    dataflows: &mut [DataflowDescription<Plan<T>, (), T>],
103    read_policies: &BTreeMap<GlobalId, ReadPolicy<T>>,
104    storage_collections: &dyn StorageCollections<Timestamp = T>,
105    current_time: T,
106    read_only_mode: bool,
107) -> BTreeMap<GlobalId, ReadHold<T>> {
108    // Get read holds for the storage inputs of the dataflows.
109    // This ensures that storage frontiers don't advance past the selected as-ofs.
110    let mut storage_read_holds = BTreeMap::new();
111    for dataflow in &*dataflows {
112        for id in dataflow.source_imports.keys() {
113            if !storage_read_holds.contains_key(id) {
114                let read_hold = storage_collections
115                    .acquire_read_holds(vec![*id])
116                    .expect("storage collection exists")
117                    .into_element();
118                storage_read_holds.insert(*id, read_hold);
119            }
120        }
121    }
122
123    let mut ctx = Context::new(dataflows, storage_collections, read_policies, current_time);
124
125    // Dataflows that sink into a storage collection that has advanced to the empty frontier don't
126    // need to be installed at all. So we can apply an optimization where we prune them here and
127    // assign them an empty as-of at the end.
128    ctx.prune_sealed_persist_sinks();
129
130    // During 0dt upgrades, it can happen that the leader environment drops storage collections,
131    // making the read-only environment observe inconsistent read frontiers (database-issues#8836).
132    // To avoid hard-constraint failures, we prune all collections depending on these dropped
133    // storage collections.
134    if read_only_mode {
135        ctx.prune_dropped_collections();
136    }
137
138    // Apply hard constraints from upstream and downstream storage collections.
139    ctx.apply_upstream_storage_constraints(&storage_read_holds);
140    ctx.apply_downstream_storage_constraints();
141
142    // At this point all collections have as-of bounds that reflect what is required for
143    // correctness. The current state isn't very usable though. In particular, most of the upper
144    // bounds are likely to be the empty frontier, so if we'd select as-ofs on this basis, the
145    // resulting dataflows would never hydrate. Instead we'll apply a number of soft constraints to
146    // end up in a better place.
147
148    // Constrain collection as-ofs to times that are currently available in the inputs. This
149    // ensures that dataflows can immediately start hydrating. It also ensures that dataflows don't
150    // get an empty as-of, except when they exclusively depend on constant collections.
151    //
152    // Allowing dataflows to hydrate immediately is desirable. It allows them to complete the most
153    // resource-intensive phase of their lifecycle as early as possible. Ideally we want this phase
154    // to occur during a 0dt upgrade, where we still have the option to roll back if a cluster
155    // doesn't come up successfully. For dataflows with a refresh schedule, hydrating early also
156    // ensures that there isn't a large output delay when the refresh time is reached.
157    ctx.apply_warmup_constraints();
158
159    // Constrain as-ofs of indexes according to their read policies.
160    ctx.apply_index_read_policy_constraints();
161
162    // Constrain as-ofs of indexes to the current time. This ensures that indexes are immediately
163    // readable.
164    ctx.apply_index_current_time_constraints();
165
166    // Apply the derived as-of bounds to the dataflows.
167    for dataflow in dataflows {
168        // `AsOfBounds` are shared between the exports of a dataflow, so looking at just the first
169        // export is sufficient.
170        let first_export = dataflow.export_ids().next();
171        let as_of = first_export.map_or_else(Antichain::new, |id| ctx.best_as_of(id));
172        dataflow.as_of = Some(as_of);
173    }
174
175    storage_read_holds
176}
177
178/// Bounds for possible as-of values of a dataflow.
179#[derive(Debug)]
180struct AsOfBounds<T> {
181    lower: Antichain<T>,
182    upper: Antichain<T>,
183    /// Whether these bounds can still change.
184    sealed: bool,
185}
186
187impl<T: Clone> AsOfBounds<T> {
188    /// Creates an `AsOfBounds` that only allows the given `frontier`.
189    fn single(frontier: Antichain<T>) -> Self {
190        Self {
191            lower: frontier.clone(),
192            upper: frontier,
193            sealed: false,
194        }
195    }
196
197    /// Get the bound of the given type.
198    fn get(&self, type_: BoundType) -> &Antichain<T> {
199        match type_ {
200            BoundType::Lower => &self.lower,
201            BoundType::Upper => &self.upper,
202        }
203    }
204}
205
206impl<T: Timestamp> Default for AsOfBounds<T> {
207    fn default() -> Self {
208        Self {
209            lower: Antichain::from_elem(T::minimum()),
210            upper: Antichain::new(),
211            sealed: false,
212        }
213    }
214}
215
216impl<T: fmt::Debug> fmt::Display for AsOfBounds<T> {
217    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218        write!(
219            f,
220            "[{:?} .. {:?}]",
221            self.lower.elements(),
222            self.upper.elements()
223        )
224    }
225}
226
227/// Types of bounds.
228#[derive(Clone, Copy, Debug, PartialEq, Eq)]
229enum BoundType {
230    Lower,
231    Upper,
232}
233
234impl fmt::Display for BoundType {
235    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236        match self {
237            Self::Lower => f.write_str("lower"),
238            Self::Upper => f.write_str("upper"),
239        }
240    }
241}
242
243/// Types of constraints.
244#[derive(Clone, Copy, Debug, PartialEq, Eq)]
245enum ConstraintType {
246    /// Hard constraints are applied to enforce correctness properties, and failing to apply them is
247    /// an error.
248    Hard,
249    /// Soft constraints are applied to improve performance or UX, and failing to apply them is
250    /// undesirable but not an error.
251    Soft,
252}
253
254/// A constraint that can be applied to the `AsOfBounds` of a collection.
255#[derive(Debug)]
256struct Constraint<'a, T> {
257    type_: ConstraintType,
258    /// Which bound this constraint applies to.
259    bound_type: BoundType,
260    /// The frontier by which the bound should be constrained.
261    frontier: &'a Antichain<T>,
262    /// A short description of the reason for applying this constraint.
263    ///
264    /// Used only for logging.
265    reason: &'a str,
266}
267
268impl<T: Timestamp> Constraint<'_, T> {
269    /// Applies this constraint to the given bounds.
270    ///
271    /// Returns a bool indicating whether the given bounds were changed as a result.
272    ///
273    /// Applying a constraint can fail, if the constraint frontier is incompatible with the
274    /// existing bounds. In this case, the constraint still gets partially applied by moving one of
275    /// the bounds up/down to the other, depending on the `bound_type`.
276    ///
277    /// Applying a constraint to sealed bounds is a no-op.
278    fn apply(&self, bounds: &mut AsOfBounds<T>) -> Result<bool, bool> {
279        if bounds.sealed {
280            return Ok(false);
281        }
282
283        match self.bound_type {
284            BoundType::Lower => {
285                if PartialOrder::less_than(&bounds.upper, self.frontier) {
286                    bounds.sealed = true;
287                    if PartialOrder::less_than(&bounds.lower, &bounds.upper) {
288                        bounds.lower.clone_from(&bounds.upper);
289                        Err(true)
290                    } else {
291                        Err(false)
292                    }
293                } else if PartialOrder::less_equal(self.frontier, &bounds.lower) {
294                    Ok(false)
295                } else {
296                    bounds.lower.clone_from(self.frontier);
297                    Ok(true)
298                }
299            }
300            BoundType::Upper => {
301                if PartialOrder::less_than(self.frontier, &bounds.lower) {
302                    bounds.sealed = true;
303                    if PartialOrder::less_than(&bounds.lower, &bounds.upper) {
304                        bounds.upper.clone_from(&bounds.lower);
305                        Err(true)
306                    } else {
307                        Err(false)
308                    }
309                } else if PartialOrder::less_equal(&bounds.upper, self.frontier) {
310                    Ok(false)
311                } else {
312                    bounds.upper.clone_from(self.frontier);
313                    Ok(true)
314                }
315            }
316        }
317    }
318}
319
320/// State tracked for a compute collection during as-of selection.
321struct Collection<'a, T> {
322    storage_inputs: Vec<GlobalId>,
323    compute_inputs: Vec<GlobalId>,
324    read_policy: Option<&'a ReadPolicy<T>>,
325    /// The currently known as-of bounds.
326    ///
327    /// Shared between collections exported by the same dataflow.
328    bounds: Rc<RefCell<AsOfBounds<T>>>,
329    /// Whether this collection is an index.
330    is_index: bool,
331}
332
333/// The as-of selection context.
334struct Context<'a, T> {
335    collections: BTreeMap<GlobalId, Collection<'a, T>>,
336    storage_collections: &'a dyn StorageCollections<Timestamp = T>,
337    current_time: T,
338}
339
340impl<'a, T: TimestampManipulation> Context<'a, T> {
341    /// Initializes an as-of selection context for the given `dataflows`.
342    fn new(
343        dataflows: &[DataflowDescription<Plan<T>, (), T>],
344        storage_collections: &'a dyn StorageCollections<Timestamp = T>,
345        read_policies: &'a BTreeMap<GlobalId, ReadPolicy<T>>,
346        current_time: T,
347    ) -> Self {
348        // Construct initial collection state for each dataflow export. Dataflows might have their
349        // as-ofs already fixed, which we need to take into account when constructing `AsOfBounds`.
350        let mut collections = BTreeMap::new();
351        for dataflow in dataflows {
352            let storage_inputs: Vec<_> = dataflow.source_imports.keys().copied().collect();
353            let compute_inputs: Vec<_> = dataflow.index_imports.keys().copied().collect();
354
355            let bounds = match dataflow.as_of.clone() {
356                Some(frontier) => AsOfBounds::single(frontier),
357                None => AsOfBounds::default(),
358            };
359            let bounds = Rc::new(RefCell::new(bounds));
360
361            for id in dataflow.export_ids() {
362                let collection = Collection {
363                    storage_inputs: storage_inputs.clone(),
364                    compute_inputs: compute_inputs.clone(),
365                    read_policy: read_policies.get(&id),
366                    bounds: Rc::clone(&bounds),
367                    is_index: dataflow.index_exports.contains_key(&id),
368                };
369                collections.insert(id, collection);
370            }
371        }
372
373        Self {
374            collections,
375            storage_collections,
376            current_time,
377        }
378    }
379
380    /// Returns the state of the identified collection.
381    ///
382    /// # Panics
383    ///
384    /// Panics if the identified collection doesn't exist.
385    fn expect_collection(&self, id: GlobalId) -> &Collection<'_, T> {
386        self.collections
387            .get(&id)
388            .unwrap_or_else(|| panic!("collection missing: {id}"))
389    }
390
391    /// Applies the given as-of constraint to the identified collection.
392    ///
393    /// Returns whether the collection's as-of bounds where changed as a result.
394    fn apply_constraint(&self, id: GlobalId, constraint: Constraint<T>) -> bool {
395        let collection = self.expect_collection(id);
396        let mut bounds = collection.bounds.borrow_mut();
397        match constraint.apply(&mut bounds) {
398            Ok(changed) => {
399                if changed {
400                    info!(%id, %bounds, reason = %constraint.reason, "applied as-of constraint");
401                }
402                changed
403            }
404            Err(changed) => {
405                match constraint.type_ {
406                    ConstraintType::Hard => {
407                        soft_panic_or_log!(
408                            "failed to apply hard as-of constraint \
409                             (id={id}, bounds={bounds}, constraint={constraint:?})"
410                        );
411                    }
412                    ConstraintType::Soft => {
413                        info!(%id, %bounds, ?constraint, "failed to apply soft as-of constraint");
414                    }
415                }
416                changed
417            }
418        }
419    }
420
421    /// Apply as-of constraints imposed by the frontiers of upstream storage collections.
422    ///
423    /// A collection's as-of _must_ be >= the read frontier of each of its (transitive) storage
424    /// inputs.
425    ///
426    /// Failing to apply this constraint to a collection is an error. The affected dataflow will
427    /// not be able to hydrate successfully.
428    fn apply_upstream_storage_constraints(
429        &self,
430        storage_read_holds: &BTreeMap<GlobalId, ReadHold<T>>,
431    ) {
432        // Apply direct constraints from storage inputs.
433        for (id, collection) in &self.collections {
434            for input_id in &collection.storage_inputs {
435                let read_hold = &storage_read_holds[input_id];
436                let constraint = Constraint {
437                    type_: ConstraintType::Hard,
438                    bound_type: BoundType::Lower,
439                    frontier: read_hold.since(),
440                    reason: &format!("storage input {input_id} read frontier"),
441                };
442                self.apply_constraint(*id, constraint);
443            }
444        }
445
446        // Propagate constraints downstream, restoring `AsOfBounds` invariant (1).
447        self.propagate_bounds_downstream(BoundType::Lower);
448    }
449
450    /// Apply as-of constraints imposed by the frontiers of downstream storage collections.
451    ///
452    /// A collection's as-of _must_ be < the write frontier of the storage collection it exports to
453    /// (if any) if it is non-empty, and <= the storage collection's read frontier otherwise.
454    ///
455    /// Rationale:
456    ///
457    /// * A collection's as-of must be <= the write frontier of its dependent storage collection,
458    ///   because we need to pick up computing the contents of storage collections where we left
459    ///   off previously, to avoid skipped times observable in the durable output.
460    /// * Some dataflows feeding into storage collections (specifically: continual tasks) need to
461    ///   be able to observe input changes at times they write to the output. If we selected the
462    ///   as-of to be equal to the write frontier of the output storage collection, we wouldn't be
463    ///   able to produce the correct output at that frontier. Thus the selected as-of must be
464    ///   strictly less than the write frontier.
465    /// * As an exception to the above, if the output storage collection is empty (i.e. its write
466    ///   frontier is <= its read frontier), we need to allow the as-of to be equal to the read
467    ///   frontier. This is correct in the sense that it mirrors the timestamp selection behavior
468    ///   of the sequencer when it created the collection. Chances are that the sequencer chose the
469    ///   initial as-of (and therefore the initial read frontier of the storage collection) as the
470    ///   smallest possible time that can still be read from the collection inputs, so forcing the
471    ///   upper bound any lower than that read frontier would produce a hard constraint violation.
472    ///
473    /// Failing to apply this constraint to a collection is an error. The storage collection it
474    /// exports to may have times visible to readers skipped in its output, violating correctness.
475    fn apply_downstream_storage_constraints(&self) {
476        // Apply direct constraints from storage exports.
477        for id in self.collections.keys() {
478            let Ok(frontiers) = self.storage_collections.collection_frontiers(*id) else {
479                continue;
480            };
481
482            let collection_empty =
483                PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
484            let upper = if collection_empty {
485                frontiers.read_capabilities
486            } else {
487                Antichain::from_iter(
488                    frontiers
489                        .write_frontier
490                        .iter()
491                        .map(|t| t.step_back().unwrap_or_else(T::minimum)),
492                )
493            };
494
495            let constraint = Constraint {
496                type_: ConstraintType::Hard,
497                bound_type: BoundType::Upper,
498                frontier: &upper,
499                reason: &format!("storage export {id} write frontier"),
500            };
501            self.apply_constraint(*id, constraint);
502        }
503
504        // Propagate constraints upstream, restoring `AsOfBounds` invariant (2).
505        self.propagate_bounds_upstream(BoundType::Upper);
506    }
507
508    /// Apply as-of constraints to ensure collections can hydrate immediately.
509    ///
510    /// A collection's as-of _should_ be < the write frontier of each of its (transitive) storage
511    /// inputs.
512    ///
513    /// Failing to apply this constraint is not an error. The affected dataflow will not be able to
514    /// hydrate immediately, but it will be able to hydrate once its inputs have sufficiently
515    /// advanced.
516    fn apply_warmup_constraints(&self) {
517        // Collect write frontiers from storage inputs.
518        let mut write_frontiers = BTreeMap::new();
519        for (id, collection) in &self.collections {
520            let storage_frontiers = self
521                .storage_collections
522                .collections_frontiers(collection.storage_inputs.clone())
523                .expect("storage collections exist");
524
525            let mut write_frontier = Antichain::new();
526            for frontiers in storage_frontiers {
527                write_frontier.extend(frontiers.write_frontier);
528            }
529
530            write_frontiers.insert(*id, write_frontier);
531        }
532
533        // Propagate write frontiers through compute inputs.
534        fixpoint(|changed| {
535            for (id, collection) in &self.collections {
536                let mut write_frontier = write_frontiers.remove(id).expect("inserted above");
537                for input_id in &collection.compute_inputs {
538                    let frontier = &write_frontiers[input_id];
539                    *changed |= write_frontier.extend(frontier.iter().cloned());
540                }
541                write_frontiers.insert(*id, write_frontier);
542            }
543        });
544
545        // Apply the warmup constraint.
546        for (id, write_frontier) in write_frontiers {
547            let upper = step_back_frontier(&write_frontier);
548            let constraint = Constraint {
549                type_: ConstraintType::Soft,
550                bound_type: BoundType::Upper,
551                frontier: &upper,
552                reason: &format!(
553                    "warmup frontier derived from storage write frontier {:?}",
554                    write_frontier.elements()
555                ),
556            };
557            self.apply_constraint(id, constraint);
558        }
559
560        // Restore `AsOfBounds` invariant (2).
561        self.propagate_bounds_upstream(BoundType::Upper);
562    }
563
564    /// Apply as-of constraints to ensure indexes contain historical data as requested by their
565    /// associated read policies.
566    ///
567    /// An index's as-of _should_ be <= the frontier determined by its read policy applied to its
568    /// write frontier.
569    ///
570    /// Failing to apply this constraint is not an error. The affected index will not contain
571    /// historical times for its entire compaction window initially, but will do so once sufficient
572    /// time has passed.
573    fn apply_index_read_policy_constraints(&self) {
574        // For the write frontier of an index, we'll use the least write frontier of its
575        // (transitive) storage inputs. This is an upper bound for the write frontier the index
576        // could have had before the restart. For indexes without storage inputs we use the current
577        // time.
578
579        // Collect write frontiers from storage inputs.
580        let mut write_frontiers = BTreeMap::new();
581        for (id, collection) in &self.collections {
582            let storage_frontiers = self
583                .storage_collections
584                .collections_frontiers(collection.storage_inputs.clone())
585                .expect("storage collections exist");
586
587            let mut write_frontier = Antichain::new();
588            for frontiers in storage_frontiers {
589                write_frontier.extend(frontiers.write_frontier);
590            }
591
592            write_frontiers.insert(*id, write_frontier);
593        }
594
595        // Propagate write frontiers through compute inputs.
596        fixpoint(|changed| {
597            for (id, collection) in &self.collections {
598                let mut write_frontier = write_frontiers.remove(id).expect("inserted above");
599                for input_id in &collection.compute_inputs {
600                    let frontier = &write_frontiers[input_id];
601                    *changed |= write_frontier.extend(frontier.iter().cloned());
602                }
603                write_frontiers.insert(*id, write_frontier);
604            }
605        });
606
607        // Apply the read policy constraint to indexes.
608        for (id, collection) in &self.collections {
609            if let (true, Some(read_policy)) = (collection.is_index, &collection.read_policy) {
610                let mut write_frontier = write_frontiers.remove(id).expect("inserted above");
611                if write_frontier.is_empty() {
612                    write_frontier = Antichain::from_elem(self.current_time.clone());
613                }
614                let upper = read_policy.frontier(write_frontier.borrow());
615                let constraint = Constraint {
616                    type_: ConstraintType::Soft,
617                    bound_type: BoundType::Upper,
618                    frontier: &upper,
619                    reason: &format!(
620                        "read policy applied to write frontier {:?}",
621                        write_frontier.elements()
622                    ),
623                };
624                self.apply_constraint(*id, constraint);
625            }
626        }
627
628        // Restore `AsOfBounds` invariant (2).
629        self.propagate_bounds_upstream(BoundType::Upper);
630    }
631
632    /// Apply as-of constraints to ensure indexes are immediately readable.
633    ///
634    /// An index's as-of _should_ be <= the current time.
635    ///
636    /// Failing to apply this constraint is not an error. The affected index will not be readable
637    /// immediately, but will be readable once sufficient time has passed.
638    fn apply_index_current_time_constraints(&self) {
639        // Apply the current time constraint to indexes.
640        let upper = Antichain::from_elem(self.current_time.clone());
641        for (id, collection) in &self.collections {
642            if collection.is_index {
643                let constraint = Constraint {
644                    type_: ConstraintType::Soft,
645                    bound_type: BoundType::Upper,
646                    frontier: &upper,
647                    reason: "index current time",
648                };
649                self.apply_constraint(*id, constraint);
650            }
651        }
652
653        // Restore `AsOfBounds` invariant (2).
654        self.propagate_bounds_upstream(BoundType::Upper);
655    }
656
657    /// Propagate as-of bounds through the dependency graph, in downstream direction.
658    fn propagate_bounds_downstream(&self, bound_type: BoundType) {
659        // Propagating `lower` bounds downstream restores `AsOfBounds` invariant (1) and must
660        // therefore always succeed.
661        let constraint_type = match bound_type {
662            BoundType::Lower => ConstraintType::Hard,
663            BoundType::Upper => ConstraintType::Soft,
664        };
665
666        // We don't want to rely on a correspondence between `GlobalId` order and dependency order,
667        // so we use a fixpoint loop here.
668        fixpoint(|changed| {
669            self.propagate_bounds_downstream_inner(bound_type, constraint_type, changed);
670
671            // Propagating `upper` bounds downstream might break `AsOfBounds` invariant (2), so we
672            // need to restore it.
673            if bound_type == BoundType::Upper {
674                self.propagate_bounds_upstream_inner(
675                    BoundType::Upper,
676                    ConstraintType::Hard,
677                    changed,
678                );
679            }
680        });
681    }
682
683    fn propagate_bounds_downstream_inner(
684        &self,
685        bound_type: BoundType,
686        constraint_type: ConstraintType,
687        changed: &mut bool,
688    ) {
689        for (id, collection) in &self.collections {
690            for input_id in &collection.compute_inputs {
691                let input_collection = self.expect_collection(*input_id);
692                let bounds = input_collection.bounds.borrow();
693                let constraint = Constraint {
694                    type_: constraint_type,
695                    bound_type,
696                    frontier: bounds.get(bound_type),
697                    reason: &format!("upstream {input_id} {bound_type} as-of bound"),
698                };
699                *changed |= self.apply_constraint(*id, constraint);
700            }
701        }
702    }
703
704    /// Propagate as-of bounds through the dependency graph, in upstream direction.
705    fn propagate_bounds_upstream(&self, bound_type: BoundType) {
706        // Propagating `upper` bounds upstream restores `AsOfBounds` invariant (2) and must
707        // therefore always succeed.
708        let constraint_type = match bound_type {
709            BoundType::Lower => ConstraintType::Soft,
710            BoundType::Upper => ConstraintType::Hard,
711        };
712
713        // We don't want to rely on a correspondence between `GlobalId` order and dependency order,
714        // so we use a fixpoint loop here.
715        fixpoint(|changed| {
716            self.propagate_bounds_upstream_inner(bound_type, constraint_type, changed);
717
718            // Propagating `lower` bounds upstream might break `AsOfBounds` invariant (1), so we
719            // need to restore it.
720            if bound_type == BoundType::Lower {
721                self.propagate_bounds_downstream_inner(
722                    BoundType::Lower,
723                    ConstraintType::Hard,
724                    changed,
725                );
726            }
727        });
728    }
729
730    fn propagate_bounds_upstream_inner(
731        &self,
732        bound_type: BoundType,
733        constraint_type: ConstraintType,
734        changed: &mut bool,
735    ) {
736        for (id, collection) in self.collections.iter().rev() {
737            let bounds = collection.bounds.borrow();
738            for input_id in &collection.compute_inputs {
739                let constraint = Constraint {
740                    type_: constraint_type,
741                    bound_type,
742                    frontier: bounds.get(bound_type),
743                    reason: &format!("downstream {id} {bound_type} as-of bound"),
744                };
745                *changed |= self.apply_constraint(*input_id, constraint);
746            }
747        }
748    }
749
750    /// Selects the "best" as-of for the identified collection, based on its currently known
751    /// bounds.
752    ///
753    /// We simply use the upper bound here, to maximize the chances of compute reconciliation
754    /// succeeding. Choosing the latest possible as-of also minimizes the amount of work the
755    /// dataflow has to spend processing historical data from its sources.
756    fn best_as_of(&self, id: GlobalId) -> Antichain<T> {
757        if let Some(collection) = self.collections.get(&id) {
758            let bounds = collection.bounds.borrow();
759            bounds.upper.clone()
760        } else {
761            Antichain::new()
762        }
763    }
764
765    /// Removes collections that sink into sealed persist shards from the context.
766    ///
767    /// The dataflows of these collections will get an empty default as-of assigned at the end of
768    /// the as-of selection process, ensuring that they won't get installed unnecessarily.
769    ///
770    /// Note that it is valid to remove these collections from consideration because they don't
771    /// impose as-of constraints on other compute collections.
772    fn prune_sealed_persist_sinks(&mut self) {
773        self.collections.retain(|id, _| {
774            self.storage_collections
775                .collection_frontiers(*id)
776                .map_or(true, |f| !f.write_frontier.is_empty())
777        });
778    }
779
780    /// Removes collections depending on storage collections with empty read frontiers.
781    ///
782    /// The dataflows of these collections will get an empty default as-of assigned at the end of
783    /// the as-of selection process, ensuring that they won't get installed.
784    ///
785    /// This exists only to work around database-issues#8836.
786    fn prune_dropped_collections(&mut self) {
787        // Remove collections with dropped storage inputs.
788        let mut pruned = BTreeSet::new();
789        self.collections.retain(|id, c| {
790            let input_dropped = c.storage_inputs.iter().any(|id| {
791                let frontiers = self
792                    .storage_collections
793                    .collection_frontiers(*id)
794                    .expect("storage collection exists");
795                frontiers.read_capabilities.is_empty()
796            });
797
798            if input_dropped {
799                pruned.insert(*id);
800                false
801            } else {
802                true
803            }
804        });
805
806        warn!(?pruned, "pruned dependants of dropped storage collections");
807
808        // Remove (transitive) dependants of pruned collections.
809        while !pruned.is_empty() {
810            let pruned_inputs = std::mem::take(&mut pruned);
811
812            self.collections.retain(|id, c| {
813                if c.compute_inputs.iter().any(|id| pruned_inputs.contains(id)) {
814                    pruned.insert(*id);
815                    false
816                } else {
817                    true
818                }
819            });
820
821            warn!(?pruned, "pruned collections with pruned inputs");
822        }
823    }
824}
825
826/// Runs `step` in a loop until it stops reporting changes.
827fn fixpoint(mut step: impl FnMut(&mut bool)) {
828    loop {
829        let mut changed = false;
830        step(&mut changed);
831        if !changed {
832            break;
833        }
834    }
835}
836
837/// Step back the given frontier.
838///
839/// This method is saturating: If the frontier contains `T::minimum()` times, these are kept
840/// unchanged.
841fn step_back_frontier<T: TimestampManipulation>(frontier: &Antichain<T>) -> Antichain<T> {
842    frontier
843        .iter()
844        .map(|t| t.step_back().unwrap_or_else(T::minimum))
845        .collect()
846}
847
848#[cfg(test)]
849mod tests {
850    use std::collections::BTreeSet;
851
852    use async_trait::async_trait;
853    use differential_dataflow::lattice::Lattice;
854    use futures::future::BoxFuture;
855    use futures::stream::BoxStream;
856    use mz_compute_types::dataflows::{IndexDesc, IndexImport};
857    use mz_compute_types::sinks::ComputeSinkConnection;
858    use mz_compute_types::sinks::ComputeSinkDesc;
859    use mz_compute_types::sinks::MaterializedViewSinkConnection;
860    use mz_compute_types::sources::SourceInstanceArguments;
861    use mz_compute_types::sources::SourceInstanceDesc;
862    use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
863    use mz_persist_types::{Codec64, ShardId};
864    use mz_repr::Timestamp;
865    use mz_repr::{RelationDesc, RelationVersion, Row, SqlRelationType};
866    use mz_storage_client::client::TimestamplessUpdateBuilder;
867    use mz_storage_client::controller::{CollectionDescription, StorageMetadata, StorageTxn};
868    use mz_storage_client::storage_collections::{CollectionFrontiers, SnapshotCursor};
869    use mz_storage_types::StorageDiff;
870    use mz_storage_types::controller::{CollectionMetadata, StorageError};
871    use mz_storage_types::errors::CollectionMissing;
872    use mz_storage_types::parameters::StorageParameters;
873    use mz_storage_types::sources::SourceData;
874    use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
875    use timely::progress::Timestamp as TimelyTimestamp;
876
877    use super::*;
878
879    const SEALED: u64 = 0x5ea1ed;
880
881    fn ts_to_frontier(ts: u64) -> Antichain<Timestamp> {
882        if ts == SEALED {
883            Antichain::new()
884        } else {
885            Antichain::from_elem(ts.into())
886        }
887    }
888
889    #[derive(Debug)]
890    struct StorageFrontiers(BTreeMap<GlobalId, (Antichain<Timestamp>, Antichain<Timestamp>)>);
891
892    #[async_trait]
893    impl StorageCollections for StorageFrontiers {
894        type Timestamp = Timestamp;
895
896        async fn initialize_state(
897            &self,
898            _txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
899            _init_ids: BTreeSet<GlobalId>,
900        ) -> Result<(), StorageError<Self::Timestamp>> {
901            unimplemented!()
902        }
903
904        fn update_parameters(&self, _config_params: StorageParameters) {
905            unimplemented!()
906        }
907
908        fn collection_metadata(
909            &self,
910            _id: GlobalId,
911        ) -> Result<CollectionMetadata, CollectionMissing> {
912            unimplemented!()
913        }
914
915        fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
916            unimplemented!()
917        }
918
919        fn collections_frontiers(
920            &self,
921            ids: Vec<GlobalId>,
922        ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, CollectionMissing> {
923            let mut frontiers = Vec::with_capacity(ids.len());
924            for id in ids {
925                let (read, write) = self.0.get(&id).ok_or(CollectionMissing(id))?;
926                frontiers.push(CollectionFrontiers {
927                    id,
928                    write_frontier: write.clone(),
929                    implied_capability: read.clone(),
930                    read_capabilities: read.clone(),
931                })
932            }
933            Ok(frontiers)
934        }
935
936        fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
937            unimplemented!()
938        }
939
940        fn check_exists(&self, _id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
941            unimplemented!()
942        }
943
944        async fn snapshot_stats(
945            &self,
946            _id: GlobalId,
947            _as_of: Antichain<Self::Timestamp>,
948        ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
949            unimplemented!()
950        }
951
952        async fn snapshot_parts_stats(
953            &self,
954            _id: GlobalId,
955            _as_of: Antichain<Self::Timestamp>,
956        ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
957            unimplemented!()
958        }
959
960        fn snapshot(
961            &self,
962            _id: GlobalId,
963            _as_of: Self::Timestamp,
964        ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>
965        {
966            unimplemented!()
967        }
968
969        async fn snapshot_latest(
970            &self,
971            _id: GlobalId,
972        ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
973            unimplemented!()
974        }
975
976        fn snapshot_cursor(
977            &self,
978            _id: GlobalId,
979            _as_of: Self::Timestamp,
980        ) -> BoxFuture<
981            'static,
982            Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>,
983        >
984        where
985            Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
986        {
987            unimplemented!()
988        }
989
990        fn snapshot_and_stream(
991            &self,
992            _id: GlobalId,
993            _as_of: Self::Timestamp,
994        ) -> BoxFuture<
995            'static,
996            Result<
997                BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
998                StorageError<Self::Timestamp>,
999            >,
1000        > {
1001            unimplemented!()
1002        }
1003
1004        /// Create a [`TimestamplessUpdateBuilder`] that can be used to stage
1005        /// updates for the provided [`GlobalId`].
1006        fn create_update_builder(
1007            &self,
1008            _id: GlobalId,
1009        ) -> BoxFuture<
1010            'static,
1011            Result<
1012                TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
1013                StorageError<Self::Timestamp>,
1014            >,
1015        >
1016        where
1017            Self::Timestamp: Lattice + Codec64,
1018        {
1019            unimplemented!()
1020        }
1021
1022        async fn prepare_state(
1023            &self,
1024            _txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
1025            _ids_to_add: BTreeSet<GlobalId>,
1026            _ids_to_drop: BTreeSet<GlobalId>,
1027            _ids_to_register: BTreeMap<GlobalId, ShardId>,
1028        ) -> Result<(), StorageError<Self::Timestamp>> {
1029            unimplemented!()
1030        }
1031
1032        async fn create_collections_for_bootstrap(
1033            &self,
1034            _storage_metadata: &StorageMetadata,
1035            _register_ts: Option<Self::Timestamp>,
1036            _collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
1037            _migrated_storage_collections: &BTreeSet<GlobalId>,
1038        ) -> Result<(), StorageError<Self::Timestamp>> {
1039            unimplemented!()
1040        }
1041
1042        async fn alter_table_desc(
1043            &self,
1044            _existing_collection: GlobalId,
1045            _new_collection: GlobalId,
1046            _new_desc: RelationDesc,
1047            _expected_version: RelationVersion,
1048        ) -> Result<(), StorageError<Self::Timestamp>> {
1049            unimplemented!()
1050        }
1051
1052        fn drop_collections_unvalidated(
1053            &self,
1054            _storage_metadata: &StorageMetadata,
1055            _identifiers: Vec<GlobalId>,
1056        ) {
1057            unimplemented!()
1058        }
1059
1060        fn set_read_policies(&self, _policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
1061            unimplemented!()
1062        }
1063
1064        fn acquire_read_holds(
1065            &self,
1066            desired_holds: Vec<GlobalId>,
1067        ) -> Result<Vec<ReadHold<Self::Timestamp>>, CollectionMissing> {
1068            let mut holds = Vec::with_capacity(desired_holds.len());
1069            for id in desired_holds {
1070                let (read, _write) = self.0.get(&id).ok_or(CollectionMissing(id))?;
1071                let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
1072                holds.push(ReadHold::with_channel(id, read.clone(), tx));
1073            }
1074            Ok(holds)
1075        }
1076
1077        fn determine_time_dependence(
1078            &self,
1079            _id: GlobalId,
1080        ) -> Result<Option<TimeDependence>, TimeDependenceError> {
1081            unimplemented!()
1082        }
1083
1084        fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
1085            unimplemented!()
1086        }
1087    }
1088
1089    fn dataflow(
1090        export_id: &str,
1091        input_ids: &[&str],
1092        storage_ids: &BTreeSet<&str>,
1093    ) -> DataflowDescription<Plan> {
1094        let source_imports = input_ids
1095            .iter()
1096            .filter(|s| storage_ids.contains(*s))
1097            .map(|s| {
1098                let id = s.parse().unwrap();
1099                let desc = SourceInstanceDesc {
1100                    arguments: SourceInstanceArguments {
1101                        operators: Default::default(),
1102                    },
1103                    storage_metadata: Default::default(),
1104                    typ: SqlRelationType::empty(),
1105                };
1106                (id, (desc, Default::default(), Default::default()))
1107            })
1108            .collect();
1109        let index_imports = input_ids
1110            .iter()
1111            .filter(|s| !storage_ids.contains(*s))
1112            .map(|s| {
1113                let id = s.parse().unwrap();
1114                let import = IndexImport {
1115                    desc: IndexDesc {
1116                        on_id: GlobalId::Transient(0),
1117                        key: Default::default(),
1118                    },
1119                    typ: SqlRelationType::empty(),
1120                    monotonic: Default::default(),
1121                };
1122                (id, import)
1123            })
1124            .collect();
1125        let index_exports = std::iter::once(export_id)
1126            .filter(|s| !storage_ids.contains(*s))
1127            .map(|sid| {
1128                let id = sid.parse().unwrap();
1129                let desc = IndexDesc {
1130                    on_id: GlobalId::Transient(0),
1131                    key: Default::default(),
1132                };
1133                let typ = SqlRelationType::empty();
1134                (id, (desc, typ))
1135            })
1136            .collect();
1137        let sink_exports = std::iter::once(export_id)
1138            .filter(|s| storage_ids.contains(*s))
1139            .map(|sid| {
1140                let id = sid.parse().unwrap();
1141                let desc = ComputeSinkDesc {
1142                    from: GlobalId::Transient(0),
1143                    from_desc: RelationDesc::empty(),
1144                    connection: ComputeSinkConnection::MaterializedView(
1145                        MaterializedViewSinkConnection {
1146                            value_desc: RelationDesc::empty(),
1147                            storage_metadata: Default::default(),
1148                        },
1149                    ),
1150                    with_snapshot: Default::default(),
1151                    up_to: Default::default(),
1152                    non_null_assertions: Default::default(),
1153                    refresh_schedule: Default::default(),
1154                };
1155                (id, desc)
1156            })
1157            .collect();
1158
1159        DataflowDescription {
1160            source_imports,
1161            index_imports,
1162            objects_to_build: Default::default(),
1163            index_exports,
1164            sink_exports,
1165            as_of: None,
1166            until: Default::default(),
1167            initial_storage_as_of: Default::default(),
1168            refresh_schedule: Default::default(),
1169            debug_name: Default::default(),
1170            time_dependence: None,
1171        }
1172    }
1173
1174    macro_rules! testcase {
1175        ($name:ident, {
1176            storage: { $( $storage_id:literal: ($read:expr, $write:expr), )* },
1177            dataflows: [ $( $export_id:literal <- $inputs:expr => $as_of:expr, )* ],
1178            current_time: $current_time:literal,
1179            $( read_policies: { $( $policy_id:literal: $policy:expr, )* }, )?
1180            $( read_only: $read_only:expr, )?
1181        }) => {
1182            #[mz_ore::test]
1183            fn $name() {
1184                let storage_ids = [$( $storage_id, )*].into();
1185
1186                let storage_frontiers = StorageFrontiers(BTreeMap::from([
1187                    $(
1188                        (
1189                            $storage_id.parse().unwrap(),
1190                            (ts_to_frontier($read), ts_to_frontier($write)),
1191                        ),
1192                    )*
1193                ]));
1194
1195                let mut dataflows = [
1196                    $(
1197                        dataflow($export_id, &$inputs, &storage_ids),
1198                    )*
1199                ];
1200
1201                let read_policies = BTreeMap::from([
1202                    $($( ($policy_id.parse().unwrap(), $policy), )*)?
1203                ]);
1204
1205                #[allow(unused_variables)]
1206                let read_only = false;
1207                $( let read_only = $read_only; )?
1208
1209                super::run(
1210                    &mut dataflows,
1211                    &read_policies,
1212                    &storage_frontiers,
1213                    $current_time.into(),
1214                    read_only,
1215                );
1216
1217                let actual_as_ofs: Vec<_> = dataflows
1218                    .into_iter()
1219                    .map(|d| d.as_of.unwrap())
1220                    .collect();
1221                let expected_as_ofs = [ $( ts_to_frontier($as_of), )* ];
1222
1223                assert_eq!(actual_as_ofs, expected_as_ofs);
1224            }
1225        };
1226    }
1227
1228    testcase!(upstream_storage_constraints, {
1229        storage: {
1230            "s1": (10, 20),
1231            "s2": (20, 30),
1232        },
1233        dataflows: [
1234            "u1" <- ["s1"]       => 10,
1235            "u2" <- ["s2"]       => 20,
1236            "u3" <- ["s1", "s2"] => 20,
1237            "u4" <- ["u1", "u2"] => 20,
1238        ],
1239        current_time: 0,
1240    });
1241
1242    testcase!(downstream_storage_constraints, {
1243        storage: {
1244            "s1": (10, 20),
1245            "u3": (10, 15),
1246            "u4": (10, 13),
1247        },
1248        dataflows: [
1249            "u1" <- ["s1"] => 19,
1250            "u2" <- ["s1"] => 12,
1251            "u3" <- ["u2"] => 14,
1252            "u4" <- ["u2"] => 12,
1253        ],
1254        current_time: 100,
1255    });
1256
1257    testcase!(warmup_constraints, {
1258        storage: {
1259            "s1": (10, 20),
1260            "s2": (10, 30),
1261            "s3": (10, 40),
1262            "s4": (10, 50),
1263        },
1264        dataflows: [
1265            "u1" <- ["s1"]       => 19,
1266            "u2" <- ["s2"]       => 19,
1267            "u3" <- ["s3"]       => 39,
1268            "u4" <- ["s4"]       => 39,
1269            "u5" <- ["u1", "u2"] => 19,
1270            "u6" <- ["u3", "u4"] => 39,
1271        ],
1272        current_time: 100,
1273    });
1274
1275    testcase!(index_read_policy_constraints, {
1276        storage: {
1277            "s1": (10, 20),
1278            "u6": (10, 18),
1279        },
1280        dataflows: [
1281            "u1" <- ["s1"] => 15,
1282            "u2" <- ["s1"] => 10,
1283            "u3" <- ["s1"] => 13,
1284            "u4" <- ["s1"] => 10,
1285            "u5" <- []     => 95,
1286            "u6" <- ["s1"] => 17,
1287        ],
1288        current_time: 100,
1289        read_policies: {
1290            "u1": ReadPolicy::lag_writes_by(5.into(), 1.into()),
1291            "u2": ReadPolicy::lag_writes_by(15.into(), 1.into()),
1292            "u3": ReadPolicy::ValidFrom(Antichain::from_elem(13.into())),
1293            "u4": ReadPolicy::ValidFrom(Antichain::from_elem(5.into())),
1294            "u5": ReadPolicy::lag_writes_by(5.into(), 1.into()),
1295            "u6": ReadPolicy::ValidFrom(Antichain::from_elem(13.into())),
1296        },
1297    });
1298
1299    testcase!(index_current_time_constraints, {
1300        storage: {
1301            "s1": (10, 20),
1302            "s2": (20, 30),
1303            "u4": (10, 12),
1304            "u5": (10, 18),
1305        },
1306        dataflows: [
1307            "u1" <- ["s1"] => 15,
1308            "u2" <- ["s2"] => 20,
1309            "u3" <- ["s1"] => 11,
1310            "u4" <- ["u3"] => 11,
1311            "u5" <- ["s1"] => 17,
1312            "u6" <- []     => 15,
1313        ],
1314        current_time: 15,
1315    });
1316
1317    testcase!(sealed_storage_sink, {
1318        storage: {
1319            "s1": (10, 20),
1320            "u1": (10, SEALED),
1321        },
1322        dataflows: [
1323            "u1" <- ["s1"] => SEALED,
1324        ],
1325        current_time: 100,
1326    });
1327
1328    testcase!(read_only_dropped_storage_inputs, {
1329        storage: {
1330            "s1": (10, 20),
1331            "s2": (SEALED, SEALED),
1332            "u4": (10, 20),
1333        },
1334        dataflows: [
1335            "u1" <- ["s1"] => 15,
1336            "u2" <- ["s2"] => SEALED,
1337            "u3" <- ["s1", "s2"] => SEALED,
1338            "u4" <- ["u2"] => SEALED,
1339        ],
1340        current_time: 15,
1341        read_only: true,
1342    });
1343
1344    // Regression test for database-issues#9273.
1345    testcase!(github_9273, {
1346        storage: {
1347            "s1": (10, 20),
1348            "u3": (14, 15),
1349        },
1350        dataflows: [
1351            "u1" <- ["s1"] => 14,
1352            "u2" <- ["u1"] => 19,
1353            "u3" <- ["u1"] => 14,
1354        ],
1355        current_time: 100,
1356        read_policies: {
1357            "u1": ReadPolicy::lag_writes_by(1.into(), 1.into()),
1358            "u2": ReadPolicy::lag_writes_by(1.into(), 1.into()),
1359        },
1360    });
1361}