mz_compute_client/
as_of_selection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for selecting as-ofs of compute dataflows during system initialization.
11//!
12//! The functionality implemented here is invoked by the coordinator during its bootstrap process.
13//! Ideally, it would be part of the controller and transparent to the coordinator, but that's
14//! difficult to reconcile with the current controller API. For now, we still make the coordinator
15//! worry about as-of selection but keep the implementation in a compute crate because it really is
16//! a compute implementation concern.
17//!
18//! The as-of selection process takes a list of `DataflowDescription`s, determines compatible
19//! as-ofs for the compute collections they export, and augments the `DataflowDescription`s with
20//! these as-ofs.
21//!
22//! For each compute collection, the as-of selection process keeps an `AsOfBounds` instance that
23//! tracks a lower and an upper bound for the as-of the collection may get assigned. Throughout the
24//! process, a collection's `AsOfBounds` get repeatedly refined, by increasing the lower bound and
25//! decreasing the upper bound. The final upper bound is then used for the collection as-of. Using
26//! the upper bound maximizes the chances of compute reconciliation being effective, and minimizes
27//! the amount of historical data that must be read from the dataflow sources.
28//!
29//! Refinement of `AsOfBounds` is performed by applying `Constraint`s to collections. A
30//! `Constraint` specifies which bound should be refined to which frontier. A `Constraint` may be
31//! "hard" or "soft", which determines how failure to apply it is handled. Failing to apply a hard
32//! constraint is treated as an error, failing to apply a soft constraint is not. If a constraint
33//! fails to apply, the respective `AsOfBounds` are refined as much as possible (to a single
34//! frontier) and marked as "sealed". Subsequent constraint applications against the sealed bounds
35//! are no-ops. This is done to avoid log noise from repeated constraint application failures.
36//!
37//! Note that failing to apply a hard constraint does not abort the as-of selection process for the
38//! affected collection. Instead the failure is handled gracefully by logging an error and
39//! assigning the collection a best-effort as-of. This is done, rather than panicking or returning
40//! an error and letting the coordinator panic, to ensure the availability of the system. Ideally,
41//! we would instead mark the affected dataflow as failed/poisoned, but such a mechanism doesn't
42//! currently exist.
43//!
44//! The as-of selection process applies constraints in order of importance, because once a
45//! constraint application fails, the respective `AsOfBounds` are sealed and later applications
46//! won't have any effect. This means hard constraints must be applied before soft constraints, and
47//! more desirable soft constraints should be applied before less desirable ones.
48//!
49//! # `AsOfBounds` Invariants
50//!
51//! Correctness requires two invariants of `AsOfBounds` of dependent collections:
52//!
53//!  (1) The lower bound of a collection is >= the lower bound of each of its inputs.
54//!  (2) The upper bound of a collection is >= the upper bound of each of its inputs.
55//!
56//! Each step of the as-of selection process needs to ensure that these invariants are upheld once
57//! it completes. The expectation is that each step (a) performs local changes to either the
58//! `lower` _or_ the `upper` bounds of some collections and (b) invokes the appropriate
59//! `propagate_bounds_*` method to restore the invariant broken by (a).
60//!
61//! For steps that behave as described in (a), we can prove that (b) will always succeed in
62//! applying the bounds propagation constraints:
63//!
64//! | Let `A` and `B` be any pair of collections where `A` is an input of `B`.
65//! | Before (a), both invariants are upheld, i.e. `A.lower <= B.lower` and `A.upper <= B.upper`.
66//! |
67//! | Case 1: (a) increases `A.lower` and/or `B.lower` to `A.lower'` and `B.lower'`
68//! |     Invariant (1) might be broken, need to prove that it can be restored.
69//! |     Case 1.a: `A.lower' <= B.lower'`
70//! |         Invariant (1) is still upheld without propagation.
71//! |     Case 1.b: `A.lower' > B.lower'`
72//! |         A collection's lower bound can only be increased up to its upper bound.
73//! |         Therefore, and from invariant (2): `A.lower' <= A.upper <= B.upper`
74//! |         Therefore, propagation can set `B.lower' = A.lower'`, restoring invariant (1).
75//! | Case 2: (a) decreases `A.upper` and/or `B.upper`
76//! |     Invariant (2) might be broken, need to prove that it can be restored.
77//! |     The proof is equivalent to Case 1.
78
79use std::cell::RefCell;
80use std::collections::{BTreeMap, BTreeSet};
81use std::fmt;
82use std::rc::Rc;
83
84use mz_compute_types::dataflows::DataflowDescription;
85use mz_compute_types::plan::Plan;
86use mz_ore::collections::CollectionExt;
87use mz_ore::soft_panic_or_log;
88use mz_repr::{GlobalId, TimestampManipulation};
89use mz_storage_client::storage_collections::StorageCollections;
90use mz_storage_types::read_holds::ReadHold;
91use mz_storage_types::read_policy::ReadPolicy;
92use timely::PartialOrder;
93use timely::progress::{Antichain, Timestamp};
94use tracing::{info, warn};
95
96/// Runs as-of selection for the given dataflows.
97///
98/// Assigns the selected as-of to the provided dataflow descriptions and returns a set of
99/// `ReadHold`s that must not be dropped nor downgraded until the dataflows have been installed
100/// with the compute controller.
101pub fn run<T: TimestampManipulation>(
102    dataflows: &mut [DataflowDescription<Plan<T>, (), T>],
103    read_policies: &BTreeMap<GlobalId, ReadPolicy<T>>,
104    storage_collections: &dyn StorageCollections<Timestamp = T>,
105    current_time: T,
106    read_only_mode: bool,
107) -> BTreeMap<GlobalId, ReadHold<T>> {
108    // Get read holds for the storage inputs of the dataflows.
109    // This ensures that storage frontiers don't advance past the selected as-ofs.
110    let mut storage_read_holds = BTreeMap::new();
111    for dataflow in &*dataflows {
112        for id in dataflow.source_imports.keys() {
113            if !storage_read_holds.contains_key(id) {
114                let read_hold = storage_collections
115                    .acquire_read_holds(vec![*id])
116                    .expect("storage collection exists")
117                    .into_element();
118                storage_read_holds.insert(*id, read_hold);
119            }
120        }
121    }
122
123    let mut ctx = Context::new(dataflows, storage_collections, read_policies, current_time);
124
125    // Dataflows that sink into a storage collection that has advanced to the empty frontier don't
126    // need to be installed at all. So we can apply an optimization where we prune them here and
127    // assign them an empty as-of at the end.
128    ctx.prune_sealed_persist_sinks();
129
130    // During 0dt upgrades, it can happen that the leader environment drops storage collections,
131    // making the read-only environment observe inconsistent read frontiers (database-issues#8836).
132    // To avoid hard-constraint failures, we prune all collections depending on these dropped
133    // storage collections.
134    if read_only_mode {
135        ctx.prune_dropped_collections();
136    }
137
138    // Apply hard constraints from upstream and downstream storage collections.
139    ctx.apply_upstream_storage_constraints(&storage_read_holds);
140    ctx.apply_downstream_storage_constraints();
141
142    // At this point all collections have as-of bounds that reflect what is required for
143    // correctness. The current state isn't very usable though. In particular, most of the upper
144    // bounds are likely to be the empty frontier, so if we'd select as-ofs on this basis, the
145    // resulting dataflows would never hydrate. Instead we'll apply a number of soft constraints to
146    // end up in a better place.
147
148    // Constrain collection as-ofs to times that are currently available in the inputs. This
149    // ensures that dataflows can immediately start hydrating. It also ensures that dataflows don't
150    // get an empty as-of, except when they exclusively depend on constant collections.
151    //
152    // Allowing dataflows to hydrate immediately is desirable. It allows them to complete the most
153    // resource-intensive phase of their lifecycle as early as possible. Ideally we want this phase
154    // to occur during a 0dt upgrade, where we still have the option to roll back if a cluster
155    // doesn't come up successfully. For dataflows with a refresh schedule, hydrating early also
156    // ensures that there isn't a large output delay when the refresh time is reached.
157    ctx.apply_warmup_constraints();
158
159    // Constrain as-ofs of indexes according to their read policies.
160    ctx.apply_index_read_policy_constraints();
161
162    // Constrain as-ofs of indexes to the current time. This ensures that indexes are immediately
163    // readable.
164    ctx.apply_index_current_time_constraints();
165
166    // Apply the derived as-of bounds to the dataflows.
167    for dataflow in dataflows {
168        // `AsOfBounds` are shared between the exports of a dataflow, so looking at just the first
169        // export is sufficient.
170        let first_export = dataflow.export_ids().next();
171        let as_of = first_export.map_or_else(Antichain::new, |id| ctx.best_as_of(id));
172        dataflow.as_of = Some(as_of);
173    }
174
175    storage_read_holds
176}
177
178/// Bounds for possible as-of values of a dataflow.
179#[derive(Debug)]
180struct AsOfBounds<T> {
181    lower: Antichain<T>,
182    upper: Antichain<T>,
183    /// Whether these bounds can still change.
184    sealed: bool,
185}
186
187impl<T: Clone> AsOfBounds<T> {
188    /// Creates an `AsOfBounds` that only allows the given `frontier`.
189    fn single(frontier: Antichain<T>) -> Self {
190        Self {
191            lower: frontier.clone(),
192            upper: frontier,
193            sealed: false,
194        }
195    }
196
197    /// Get the bound of the given type.
198    fn get(&self, type_: BoundType) -> &Antichain<T> {
199        match type_ {
200            BoundType::Lower => &self.lower,
201            BoundType::Upper => &self.upper,
202        }
203    }
204}
205
206impl<T: Timestamp> Default for AsOfBounds<T> {
207    fn default() -> Self {
208        Self {
209            lower: Antichain::from_elem(T::minimum()),
210            upper: Antichain::new(),
211            sealed: false,
212        }
213    }
214}
215
216impl<T: fmt::Debug> fmt::Display for AsOfBounds<T> {
217    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218        write!(
219            f,
220            "[{:?} .. {:?}]",
221            self.lower.elements(),
222            self.upper.elements()
223        )
224    }
225}
226
227/// Types of bounds.
228#[derive(Clone, Copy, Debug, PartialEq, Eq)]
229enum BoundType {
230    Lower,
231    Upper,
232}
233
234impl fmt::Display for BoundType {
235    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236        match self {
237            Self::Lower => f.write_str("lower"),
238            Self::Upper => f.write_str("upper"),
239        }
240    }
241}
242
243/// Types of constraints.
244#[derive(Clone, Copy, Debug, PartialEq, Eq)]
245enum ConstraintType {
246    /// Hard constraints are applied to enforce correctness properties, and failing to apply them is
247    /// an error.
248    Hard,
249    /// Soft constraints are applied to improve performance or UX, and failing to apply them is
250    /// undesirable but not an error.
251    Soft,
252}
253
254/// A constraint that can be applied to the `AsOfBounds` of a collection.
255#[derive(Debug)]
256struct Constraint<'a, T> {
257    type_: ConstraintType,
258    /// Which bound this constraint applies to.
259    bound_type: BoundType,
260    /// The frontier by which the bound should be constrained.
261    frontier: &'a Antichain<T>,
262    /// A short description of the reason for applying this constraint.
263    ///
264    /// Used only for logging.
265    reason: &'a str,
266}
267
268impl<T: Timestamp> Constraint<'_, T> {
269    /// Applies this constraint to the given bounds.
270    ///
271    /// Returns a bool indicating whether the given bounds were changed as a result.
272    ///
273    /// Applying a constraint can fail, if the constraint frontier is incompatible with the
274    /// existing bounds. In this case, the constraint still gets partially applied by moving one of
275    /// the bounds up/down to the other, depending on the `bound_type`.
276    ///
277    /// Applying a constraint to sealed bounds is a no-op.
278    fn apply(&self, bounds: &mut AsOfBounds<T>) -> Result<bool, bool> {
279        if bounds.sealed {
280            return Ok(false);
281        }
282
283        match self.bound_type {
284            BoundType::Lower => {
285                if PartialOrder::less_than(&bounds.upper, self.frontier) {
286                    bounds.sealed = true;
287                    if PartialOrder::less_than(&bounds.lower, &bounds.upper) {
288                        bounds.lower.clone_from(&bounds.upper);
289                        Err(true)
290                    } else {
291                        Err(false)
292                    }
293                } else if PartialOrder::less_equal(self.frontier, &bounds.lower) {
294                    Ok(false)
295                } else {
296                    bounds.lower.clone_from(self.frontier);
297                    Ok(true)
298                }
299            }
300            BoundType::Upper => {
301                if PartialOrder::less_than(self.frontier, &bounds.lower) {
302                    bounds.sealed = true;
303                    if PartialOrder::less_than(&bounds.lower, &bounds.upper) {
304                        bounds.upper.clone_from(&bounds.lower);
305                        Err(true)
306                    } else {
307                        Err(false)
308                    }
309                } else if PartialOrder::less_equal(&bounds.upper, self.frontier) {
310                    Ok(false)
311                } else {
312                    bounds.upper.clone_from(self.frontier);
313                    Ok(true)
314                }
315            }
316        }
317    }
318}
319
320/// State tracked for a compute collection during as-of selection.
321struct Collection<'a, T> {
322    storage_inputs: Vec<GlobalId>,
323    compute_inputs: Vec<GlobalId>,
324    read_policy: Option<&'a ReadPolicy<T>>,
325    /// The currently known as-of bounds.
326    ///
327    /// Shared between collections exported by the same dataflow.
328    bounds: Rc<RefCell<AsOfBounds<T>>>,
329    /// Whether this collection is an index.
330    is_index: bool,
331}
332
333/// The as-of selection context.
334struct Context<'a, T> {
335    collections: BTreeMap<GlobalId, Collection<'a, T>>,
336    storage_collections: &'a dyn StorageCollections<Timestamp = T>,
337    current_time: T,
338}
339
340impl<'a, T: TimestampManipulation> Context<'a, T> {
341    /// Initializes an as-of selection context for the given `dataflows`.
342    fn new(
343        dataflows: &[DataflowDescription<Plan<T>, (), T>],
344        storage_collections: &'a dyn StorageCollections<Timestamp = T>,
345        read_policies: &'a BTreeMap<GlobalId, ReadPolicy<T>>,
346        current_time: T,
347    ) -> Self {
348        // Construct initial collection state for each dataflow export. Dataflows might have their
349        // as-ofs already fixed, which we need to take into account when constructing `AsOfBounds`.
350        let mut collections = BTreeMap::new();
351        for dataflow in dataflows {
352            let storage_inputs: Vec<_> = dataflow.source_imports.keys().copied().collect();
353            let compute_inputs: Vec<_> = dataflow.index_imports.keys().copied().collect();
354
355            let bounds = match dataflow.as_of.clone() {
356                Some(frontier) => AsOfBounds::single(frontier),
357                None => AsOfBounds::default(),
358            };
359            let bounds = Rc::new(RefCell::new(bounds));
360
361            for id in dataflow.export_ids() {
362                let collection = Collection {
363                    storage_inputs: storage_inputs.clone(),
364                    compute_inputs: compute_inputs.clone(),
365                    read_policy: read_policies.get(&id),
366                    bounds: Rc::clone(&bounds),
367                    is_index: dataflow.index_exports.contains_key(&id),
368                };
369                collections.insert(id, collection);
370            }
371        }
372
373        Self {
374            collections,
375            storage_collections,
376            current_time,
377        }
378    }
379
380    /// Returns the state of the identified collection.
381    ///
382    /// # Panics
383    ///
384    /// Panics if the identified collection doesn't exist.
385    fn expect_collection(&self, id: GlobalId) -> &Collection<'_, T> {
386        self.collections
387            .get(&id)
388            .unwrap_or_else(|| panic!("collection missing: {id}"))
389    }
390
391    /// Applies the given as-of constraint to the identified collection.
392    ///
393    /// Returns whether the collection's as-of bounds where changed as a result.
394    fn apply_constraint(&self, id: GlobalId, constraint: Constraint<T>) -> bool {
395        let collection = self.expect_collection(id);
396        let mut bounds = collection.bounds.borrow_mut();
397        match constraint.apply(&mut bounds) {
398            Ok(changed) => {
399                if changed {
400                    info!(%id, %bounds, reason = %constraint.reason, "applied as-of constraint");
401                }
402                changed
403            }
404            Err(changed) => {
405                match constraint.type_ {
406                    ConstraintType::Hard => {
407                        soft_panic_or_log!(
408                            "failed to apply hard as-of constraint \
409                             (id={id}, bounds={bounds}, constraint={constraint:?})"
410                        );
411                    }
412                    ConstraintType::Soft => {
413                        info!(%id, %bounds, ?constraint, "failed to apply soft as-of constraint");
414                    }
415                }
416                changed
417            }
418        }
419    }
420
421    /// Apply as-of constraints imposed by the frontiers of upstream storage collections.
422    ///
423    /// A collection's as-of _must_ be >= the read frontier of each of its (transitive) storage
424    /// inputs.
425    ///
426    /// Failing to apply this constraint to a collection is an error. The affected dataflow will
427    /// not be able to hydrate successfully.
428    fn apply_upstream_storage_constraints(
429        &self,
430        storage_read_holds: &BTreeMap<GlobalId, ReadHold<T>>,
431    ) {
432        // Apply direct constraints from storage inputs.
433        for (id, collection) in &self.collections {
434            for input_id in &collection.storage_inputs {
435                let read_hold = &storage_read_holds[input_id];
436                let constraint = Constraint {
437                    type_: ConstraintType::Hard,
438                    bound_type: BoundType::Lower,
439                    frontier: read_hold.since(),
440                    reason: &format!("storage input {input_id} read frontier"),
441                };
442                self.apply_constraint(*id, constraint);
443            }
444        }
445
446        // Propagate constraints downstream, restoring `AsOfBounds` invariant (1).
447        self.propagate_bounds_downstream(BoundType::Lower);
448    }
449
450    /// Apply as-of constraints imposed by the frontiers of downstream storage collections.
451    ///
452    /// A collection's as-of _must_ be < the write frontier of the storage collection it exports to
453    /// (if any) if it is non-empty, and <= the storage collection's read frontier otherwise.
454    ///
455    /// Rationale:
456    ///
457    /// * A collection's as-of must be <= the write frontier of its dependent storage collection,
458    ///   because we need to pick up computing the contents of storage collections where we left
459    ///   off previously, to avoid skipped times observable in the durable output.
460    /// * Some dataflows feeding into storage collections (specifically: continual tasks) need to
461    ///   be able to observe input changes at times they write to the output. If we selected the
462    ///   as-of to be equal to the write frontier of the output storage collection, we wouldn't be
463    ///   able to produce the correct output at that frontier. Thus the selected as-of must be
464    ///   strictly less than the write frontier.
465    /// * As an exception to the above, if the output storage collection is empty (i.e. its write
466    ///   frontier is <= its read frontier), we need to allow the as-of to be equal to the read
467    ///   frontier. This is correct in the sense that it mirrors the timestamp selection behavior
468    ///   of the sequencer when it created the collection. Chances are that the sequencer chose the
469    ///   initial as-of (and therefore the initial read frontier of the storage collection) as the
470    ///   smallest possible time that can still be read from the collection inputs, so forcing the
471    ///   upper bound any lower than that read frontier would produce a hard constraint violation.
472    ///
473    /// Failing to apply this constraint to a collection is an error. The storage collection it
474    /// exports to may have times visible to readers skipped in its output, violating correctness.
475    fn apply_downstream_storage_constraints(&self) {
476        // Apply direct constraints from storage exports.
477        for id in self.collections.keys() {
478            let Ok(frontiers) = self.storage_collections.collection_frontiers(*id) else {
479                continue;
480            };
481
482            let collection_empty =
483                PartialOrder::less_equal(&frontiers.write_frontier, &frontiers.read_capabilities);
484            let upper = if collection_empty {
485                frontiers.read_capabilities
486            } else {
487                Antichain::from_iter(
488                    frontiers
489                        .write_frontier
490                        .iter()
491                        .map(|t| t.step_back().unwrap_or_else(T::minimum)),
492                )
493            };
494
495            let constraint = Constraint {
496                type_: ConstraintType::Hard,
497                bound_type: BoundType::Upper,
498                frontier: &upper,
499                reason: &format!("storage export {id} write frontier"),
500            };
501            self.apply_constraint(*id, constraint);
502        }
503
504        // Propagate constraints upstream, restoring `AsOfBounds` invariant (2).
505        self.propagate_bounds_upstream(BoundType::Upper);
506    }
507
508    /// Apply as-of constraints to ensure collections can hydrate immediately.
509    ///
510    /// A collection's as-of _should_ be < the write frontier of each of its (transitive) storage
511    /// inputs.
512    ///
513    /// Failing to apply this constraint is not an error. The affected dataflow will not be able to
514    /// hydrate immediately, but it will be able to hydrate once its inputs have sufficiently
515    /// advanced.
516    fn apply_warmup_constraints(&self) {
517        // Collect write frontiers from storage inputs.
518        let mut write_frontiers = BTreeMap::new();
519        for (id, collection) in &self.collections {
520            let storage_frontiers = self
521                .storage_collections
522                .collections_frontiers(collection.storage_inputs.clone())
523                .expect("storage collections exist");
524
525            let mut write_frontier = Antichain::new();
526            for frontiers in storage_frontiers {
527                write_frontier.extend(frontiers.write_frontier);
528            }
529
530            write_frontiers.insert(*id, write_frontier);
531        }
532
533        // Propagate write frontiers through compute inputs.
534        fixpoint(|changed| {
535            for (id, collection) in &self.collections {
536                let mut write_frontier = write_frontiers.remove(id).expect("inserted above");
537                for input_id in &collection.compute_inputs {
538                    let frontier = &write_frontiers[input_id];
539                    *changed |= write_frontier.extend(frontier.iter().cloned());
540                }
541                write_frontiers.insert(*id, write_frontier);
542            }
543        });
544
545        // Apply the warmup constraint.
546        for (id, write_frontier) in write_frontiers {
547            let upper = step_back_frontier(&write_frontier);
548            let constraint = Constraint {
549                type_: ConstraintType::Soft,
550                bound_type: BoundType::Upper,
551                frontier: &upper,
552                reason: &format!(
553                    "warmup frontier derived from storage write frontier {:?}",
554                    write_frontier.elements()
555                ),
556            };
557            self.apply_constraint(id, constraint);
558        }
559
560        // Restore `AsOfBounds` invariant (2).
561        self.propagate_bounds_upstream(BoundType::Upper);
562    }
563
564    /// Apply as-of constraints to ensure indexes contain historical data as requested by their
565    /// associated read policies.
566    ///
567    /// An index's as-of _should_ be <= the frontier determined by its read policy applied to its
568    /// write frontier.
569    ///
570    /// Failing to apply this constraint is not an error. The affected index will not contain
571    /// historical times for its entire compaction window initially, but will do so once sufficient
572    /// time has passed.
573    fn apply_index_read_policy_constraints(&self) {
574        // For the write frontier of an index, we'll use the least write frontier of its
575        // (transitive) storage inputs. This is an upper bound for the write frontier the index
576        // could have had before the restart. For indexes without storage inputs we use the current
577        // time.
578
579        // Collect write frontiers from storage inputs.
580        let mut write_frontiers = BTreeMap::new();
581        for (id, collection) in &self.collections {
582            let storage_frontiers = self
583                .storage_collections
584                .collections_frontiers(collection.storage_inputs.clone())
585                .expect("storage collections exist");
586
587            let mut write_frontier = Antichain::new();
588            for frontiers in storage_frontiers {
589                write_frontier.extend(frontiers.write_frontier);
590            }
591
592            write_frontiers.insert(*id, write_frontier);
593        }
594
595        // Propagate write frontiers through compute inputs.
596        fixpoint(|changed| {
597            for (id, collection) in &self.collections {
598                let mut write_frontier = write_frontiers.remove(id).expect("inserted above");
599                for input_id in &collection.compute_inputs {
600                    let frontier = &write_frontiers[input_id];
601                    *changed |= write_frontier.extend(frontier.iter().cloned());
602                }
603                write_frontiers.insert(*id, write_frontier);
604            }
605        });
606
607        // Apply the read policy constraint to indexes.
608        for (id, collection) in &self.collections {
609            if let (true, Some(read_policy)) = (collection.is_index, &collection.read_policy) {
610                let mut write_frontier = write_frontiers.remove(id).expect("inserted above");
611                if write_frontier.is_empty() {
612                    write_frontier = Antichain::from_elem(self.current_time.clone());
613                }
614                let upper = read_policy.frontier(write_frontier.borrow());
615                let constraint = Constraint {
616                    type_: ConstraintType::Soft,
617                    bound_type: BoundType::Upper,
618                    frontier: &upper,
619                    reason: &format!(
620                        "read policy applied to write frontier {:?}",
621                        write_frontier.elements()
622                    ),
623                };
624                self.apply_constraint(*id, constraint);
625            }
626        }
627
628        // Restore `AsOfBounds` invariant (2).
629        self.propagate_bounds_upstream(BoundType::Upper);
630    }
631
632    /// Apply as-of constraints to ensure indexes are immediately readable.
633    ///
634    /// An index's as-of _should_ be <= the current time.
635    ///
636    /// Failing to apply this constraint is not an error. The affected index will not be readable
637    /// immediately, but will be readable once sufficient time has passed.
638    fn apply_index_current_time_constraints(&self) {
639        // Apply the current time constraint to indexes.
640        let upper = Antichain::from_elem(self.current_time.clone());
641        for (id, collection) in &self.collections {
642            if collection.is_index {
643                let constraint = Constraint {
644                    type_: ConstraintType::Soft,
645                    bound_type: BoundType::Upper,
646                    frontier: &upper,
647                    reason: "index current time",
648                };
649                self.apply_constraint(*id, constraint);
650            }
651        }
652
653        // Restore `AsOfBounds` invariant (2).
654        self.propagate_bounds_upstream(BoundType::Upper);
655    }
656
657    /// Propagate as-of bounds through the dependency graph, in downstream direction.
658    fn propagate_bounds_downstream(&self, bound_type: BoundType) {
659        // Propagating `lower` bounds downstream restores `AsOfBounds` invariant (1) and must
660        // therefore always succeed.
661        let constraint_type = match bound_type {
662            BoundType::Lower => ConstraintType::Hard,
663            BoundType::Upper => ConstraintType::Soft,
664        };
665
666        // We don't want to rely on a correspondence between `GlobalId` order and dependency order,
667        // so we use a fixpoint loop here.
668        fixpoint(|changed| {
669            self.propagate_bounds_downstream_inner(bound_type, constraint_type, changed);
670
671            // Propagating `upper` bounds downstream might break `AsOfBounds` invariant (2), so we
672            // need to restore it.
673            if bound_type == BoundType::Upper {
674                self.propagate_bounds_upstream_inner(
675                    BoundType::Upper,
676                    ConstraintType::Hard,
677                    changed,
678                );
679            }
680        });
681    }
682
683    fn propagate_bounds_downstream_inner(
684        &self,
685        bound_type: BoundType,
686        constraint_type: ConstraintType,
687        changed: &mut bool,
688    ) {
689        for (id, collection) in &self.collections {
690            for input_id in &collection.compute_inputs {
691                let input_collection = self.expect_collection(*input_id);
692                let bounds = input_collection.bounds.borrow();
693                let constraint = Constraint {
694                    type_: constraint_type,
695                    bound_type,
696                    frontier: bounds.get(bound_type),
697                    reason: &format!("upstream {input_id} {bound_type} as-of bound"),
698                };
699                *changed |= self.apply_constraint(*id, constraint);
700            }
701        }
702    }
703
704    /// Propagate as-of bounds through the dependency graph, in upstream direction.
705    fn propagate_bounds_upstream(&self, bound_type: BoundType) {
706        // Propagating `upper` bounds upstream restores `AsOfBounds` invariant (2) and must
707        // therefore always succeed.
708        let constraint_type = match bound_type {
709            BoundType::Lower => ConstraintType::Soft,
710            BoundType::Upper => ConstraintType::Hard,
711        };
712
713        // We don't want to rely on a correspondence between `GlobalId` order and dependency order,
714        // so we use a fixpoint loop here.
715        fixpoint(|changed| {
716            self.propagate_bounds_upstream_inner(bound_type, constraint_type, changed);
717
718            // Propagating `lower` bounds upstream might break `AsOfBounds` invariant (1), so we
719            // need to restore it.
720            if bound_type == BoundType::Lower {
721                self.propagate_bounds_downstream_inner(
722                    BoundType::Lower,
723                    ConstraintType::Hard,
724                    changed,
725                );
726            }
727        });
728    }
729
730    fn propagate_bounds_upstream_inner(
731        &self,
732        bound_type: BoundType,
733        constraint_type: ConstraintType,
734        changed: &mut bool,
735    ) {
736        for (id, collection) in self.collections.iter().rev() {
737            let bounds = collection.bounds.borrow();
738            for input_id in &collection.compute_inputs {
739                let constraint = Constraint {
740                    type_: constraint_type,
741                    bound_type,
742                    frontier: bounds.get(bound_type),
743                    reason: &format!("downstream {id} {bound_type} as-of bound"),
744                };
745                *changed |= self.apply_constraint(*input_id, constraint);
746            }
747        }
748    }
749
750    /// Selects the "best" as-of for the identified collection, based on its currently known
751    /// bounds.
752    ///
753    /// We simply use the upper bound here, to maximize the chances of compute reconciliation
754    /// succeeding. Choosing the latest possible as-of also minimizes the amount of work the
755    /// dataflow has to spend processing historical data from its sources.
756    fn best_as_of(&self, id: GlobalId) -> Antichain<T> {
757        if let Some(collection) = self.collections.get(&id) {
758            let bounds = collection.bounds.borrow();
759            bounds.upper.clone()
760        } else {
761            Antichain::new()
762        }
763    }
764
765    /// Removes collections that sink into sealed persist shards from the context.
766    ///
767    /// The dataflows of these collections will get an empty default as-of assigned at the end of
768    /// the as-of selection process, ensuring that they won't get installed unnecessarily.
769    ///
770    /// Note that it is valid to remove these collections from consideration because they don't
771    /// impose as-of constraints on other compute collections.
772    fn prune_sealed_persist_sinks(&mut self) {
773        self.collections.retain(|id, _| {
774            self.storage_collections
775                .collection_frontiers(*id)
776                .map_or(true, |f| !f.write_frontier.is_empty())
777        });
778    }
779
780    /// Removes collections depending on storage collections with empty read frontiers.
781    ///
782    /// The dataflows of these collections will get an empty default as-of assigned at the end of
783    /// the as-of selection process, ensuring that they won't get installed.
784    ///
785    /// This exists only to work around database-issues#8836.
786    fn prune_dropped_collections(&mut self) {
787        // Remove collections with dropped storage inputs.
788        let mut pruned = BTreeSet::new();
789        self.collections.retain(|id, c| {
790            let input_dropped = c.storage_inputs.iter().any(|id| {
791                let frontiers = self
792                    .storage_collections
793                    .collection_frontiers(*id)
794                    .expect("storage collection exists");
795                frontiers.read_capabilities.is_empty()
796            });
797
798            if input_dropped {
799                pruned.insert(*id);
800                false
801            } else {
802                true
803            }
804        });
805
806        warn!(?pruned, "pruned dependants of dropped storage collections");
807
808        // Remove (transitive) dependants of pruned collections.
809        while !pruned.is_empty() {
810            let pruned_inputs = std::mem::take(&mut pruned);
811
812            self.collections.retain(|id, c| {
813                if c.compute_inputs.iter().any(|id| pruned_inputs.contains(id)) {
814                    pruned.insert(*id);
815                    false
816                } else {
817                    true
818                }
819            });
820
821            warn!(?pruned, "pruned collections with pruned inputs");
822        }
823    }
824}
825
826/// Runs `step` in a loop until it stops reporting changes.
827fn fixpoint(mut step: impl FnMut(&mut bool)) {
828    loop {
829        let mut changed = false;
830        step(&mut changed);
831        if !changed {
832            break;
833        }
834    }
835}
836
837/// Step back the given frontier.
838///
839/// This method is saturating: If the frontier contains `T::minimum()` times, these are kept
840/// unchanged.
841fn step_back_frontier<T: TimestampManipulation>(frontier: &Antichain<T>) -> Antichain<T> {
842    frontier
843        .iter()
844        .map(|t| t.step_back().unwrap_or_else(T::minimum))
845        .collect()
846}
847
848#[cfg(test)]
849mod tests {
850    use std::collections::BTreeSet;
851
852    use async_trait::async_trait;
853    use differential_dataflow::lattice::Lattice;
854    use futures::future::BoxFuture;
855    use futures::stream::BoxStream;
856    use mz_compute_types::dataflows::{IndexDesc, IndexImport};
857    use mz_compute_types::sinks::ComputeSinkConnection;
858    use mz_compute_types::sinks::ComputeSinkDesc;
859    use mz_compute_types::sinks::MaterializedViewSinkConnection;
860    use mz_compute_types::sources::SourceInstanceArguments;
861    use mz_compute_types::sources::SourceInstanceDesc;
862    use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
863    use mz_persist_types::{Codec64, ShardId};
864    use mz_repr::Timestamp;
865    use mz_repr::{RelationDesc, RelationVersion, Row, SqlRelationType};
866    use mz_storage_client::client::TimestamplessUpdateBuilder;
867    use mz_storage_client::controller::{CollectionDescription, StorageMetadata, StorageTxn};
868    use mz_storage_client::storage_collections::{CollectionFrontiers, SnapshotCursor};
869    use mz_storage_types::StorageDiff;
870    use mz_storage_types::connections::inline::InlinedConnection;
871    use mz_storage_types::controller::{CollectionMetadata, StorageError};
872    use mz_storage_types::parameters::StorageParameters;
873    use mz_storage_types::read_holds::ReadHoldError;
874    use mz_storage_types::sources::{GenericSourceConnection, SourceDesc};
875    use mz_storage_types::sources::{SourceData, SourceExportDataConfig};
876    use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
877    use timely::progress::Timestamp as TimelyTimestamp;
878
879    use super::*;
880
881    const SEALED: u64 = 0x5ea1ed;
882
883    fn ts_to_frontier(ts: u64) -> Antichain<Timestamp> {
884        if ts == SEALED {
885            Antichain::new()
886        } else {
887            Antichain::from_elem(ts.into())
888        }
889    }
890
891    #[derive(Debug)]
892    struct StorageFrontiers(BTreeMap<GlobalId, (Antichain<Timestamp>, Antichain<Timestamp>)>);
893
894    #[async_trait]
895    impl StorageCollections for StorageFrontiers {
896        type Timestamp = Timestamp;
897
898        async fn initialize_state(
899            &self,
900            _txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
901            _init_ids: BTreeSet<GlobalId>,
902        ) -> Result<(), StorageError<Self::Timestamp>> {
903            unimplemented!()
904        }
905
906        fn update_parameters(&self, _config_params: StorageParameters) {
907            unimplemented!()
908        }
909
910        fn collection_metadata(
911            &self,
912            _id: GlobalId,
913        ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>> {
914            unimplemented!()
915        }
916
917        fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
918            unimplemented!()
919        }
920
921        fn collections_frontiers(
922            &self,
923            ids: Vec<GlobalId>,
924        ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>>
925        {
926            let mut frontiers = Vec::with_capacity(ids.len());
927            for id in ids {
928                let (read, write) = self.0.get(&id).ok_or(StorageError::IdentifierMissing(id))?;
929                frontiers.push(CollectionFrontiers {
930                    id,
931                    write_frontier: write.clone(),
932                    implied_capability: read.clone(),
933                    read_capabilities: read.clone(),
934                })
935            }
936            Ok(frontiers)
937        }
938
939        fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
940            unimplemented!()
941        }
942
943        fn check_exists(&self, _id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
944            unimplemented!()
945        }
946
947        async fn snapshot_stats(
948            &self,
949            _id: GlobalId,
950            _as_of: Antichain<Self::Timestamp>,
951        ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
952            unimplemented!()
953        }
954
955        async fn snapshot_parts_stats(
956            &self,
957            _id: GlobalId,
958            _as_of: Antichain<Self::Timestamp>,
959        ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
960            unimplemented!()
961        }
962
963        fn snapshot(
964            &self,
965            _id: GlobalId,
966            _as_of: Self::Timestamp,
967        ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>
968        {
969            unimplemented!()
970        }
971
972        async fn snapshot_latest(
973            &self,
974            _id: GlobalId,
975        ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
976            unimplemented!()
977        }
978
979        fn snapshot_cursor(
980            &self,
981            _id: GlobalId,
982            _as_of: Self::Timestamp,
983        ) -> BoxFuture<
984            'static,
985            Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>,
986        >
987        where
988            Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
989        {
990            unimplemented!()
991        }
992
993        fn snapshot_and_stream(
994            &self,
995            _id: GlobalId,
996            _as_of: Self::Timestamp,
997        ) -> BoxFuture<
998            'static,
999            Result<
1000                BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
1001                StorageError<Self::Timestamp>,
1002            >,
1003        > {
1004            unimplemented!()
1005        }
1006
1007        /// Create a [`TimestamplessUpdateBuilder`] that can be used to stage
1008        /// updates for the provided [`GlobalId`].
1009        fn create_update_builder(
1010            &self,
1011            _id: GlobalId,
1012        ) -> BoxFuture<
1013            'static,
1014            Result<
1015                TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
1016                StorageError<Self::Timestamp>,
1017            >,
1018        >
1019        where
1020            Self::Timestamp: Lattice + Codec64,
1021        {
1022            unimplemented!()
1023        }
1024
1025        async fn prepare_state(
1026            &self,
1027            _txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
1028            _ids_to_add: BTreeSet<GlobalId>,
1029            _ids_to_drop: BTreeSet<GlobalId>,
1030            _ids_to_register: BTreeMap<GlobalId, ShardId>,
1031        ) -> Result<(), StorageError<Self::Timestamp>> {
1032            unimplemented!()
1033        }
1034
1035        async fn create_collections_for_bootstrap(
1036            &self,
1037            _storage_metadata: &StorageMetadata,
1038            _register_ts: Option<Self::Timestamp>,
1039            _collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
1040            _migrated_storage_collections: &BTreeSet<GlobalId>,
1041        ) -> Result<(), StorageError<Self::Timestamp>> {
1042            unimplemented!()
1043        }
1044
1045        async fn alter_ingestion_source_desc(
1046            &self,
1047            _ingestion_id: GlobalId,
1048            _source_desc: SourceDesc,
1049        ) -> Result<(), StorageError<Self::Timestamp>> {
1050            unimplemented!()
1051        }
1052
1053        async fn alter_ingestion_export_data_configs(
1054            &self,
1055            _source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
1056        ) -> Result<(), StorageError<Self::Timestamp>> {
1057            unimplemented!()
1058        }
1059
1060        async fn alter_ingestion_connections(
1061            &self,
1062            _source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
1063        ) -> Result<(), StorageError<Self::Timestamp>> {
1064            unimplemented!()
1065        }
1066
1067        async fn alter_table_desc(
1068            &self,
1069            _existing_collection: GlobalId,
1070            _new_collection: GlobalId,
1071            _new_desc: RelationDesc,
1072            _expected_version: RelationVersion,
1073        ) -> Result<(), StorageError<Self::Timestamp>> {
1074            unimplemented!()
1075        }
1076
1077        fn drop_collections_unvalidated(
1078            &self,
1079            _storage_metadata: &StorageMetadata,
1080            _identifiers: Vec<GlobalId>,
1081        ) {
1082            unimplemented!()
1083        }
1084
1085        fn set_read_policies(&self, _policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
1086            unimplemented!()
1087        }
1088
1089        fn acquire_read_holds(
1090            &self,
1091            desired_holds: Vec<GlobalId>,
1092        ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError> {
1093            let mut holds = Vec::with_capacity(desired_holds.len());
1094            for id in desired_holds {
1095                let (read, _write) = self
1096                    .0
1097                    .get(&id)
1098                    .ok_or(ReadHoldError::CollectionMissing(id))?;
1099                let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
1100                holds.push(ReadHold::with_channel(id, read.clone(), tx));
1101            }
1102            Ok(holds)
1103        }
1104
1105        fn determine_time_dependence(
1106            &self,
1107            _id: GlobalId,
1108        ) -> Result<Option<TimeDependence>, TimeDependenceError> {
1109            unimplemented!()
1110        }
1111    }
1112
1113    fn dataflow(
1114        export_id: &str,
1115        input_ids: &[&str],
1116        storage_ids: &BTreeSet<&str>,
1117    ) -> DataflowDescription<Plan> {
1118        let source_imports = input_ids
1119            .iter()
1120            .filter(|s| storage_ids.contains(*s))
1121            .map(|s| {
1122                let id = s.parse().unwrap();
1123                let desc = SourceInstanceDesc {
1124                    arguments: SourceInstanceArguments {
1125                        operators: Default::default(),
1126                    },
1127                    storage_metadata: Default::default(),
1128                    typ: SqlRelationType::empty(),
1129                };
1130                (id, (desc, Default::default(), Default::default()))
1131            })
1132            .collect();
1133        let index_imports = input_ids
1134            .iter()
1135            .filter(|s| !storage_ids.contains(*s))
1136            .map(|s| {
1137                let id = s.parse().unwrap();
1138                let import = IndexImport {
1139                    desc: IndexDesc {
1140                        on_id: GlobalId::Transient(0),
1141                        key: Default::default(),
1142                    },
1143                    typ: SqlRelationType::empty(),
1144                    monotonic: Default::default(),
1145                };
1146                (id, import)
1147            })
1148            .collect();
1149        let index_exports = std::iter::once(export_id)
1150            .filter(|s| !storage_ids.contains(*s))
1151            .map(|sid| {
1152                let id = sid.parse().unwrap();
1153                let desc = IndexDesc {
1154                    on_id: GlobalId::Transient(0),
1155                    key: Default::default(),
1156                };
1157                let typ = SqlRelationType::empty();
1158                (id, (desc, typ))
1159            })
1160            .collect();
1161        let sink_exports = std::iter::once(export_id)
1162            .filter(|s| storage_ids.contains(*s))
1163            .map(|sid| {
1164                let id = sid.parse().unwrap();
1165                let desc = ComputeSinkDesc {
1166                    from: GlobalId::Transient(0),
1167                    from_desc: RelationDesc::empty(),
1168                    connection: ComputeSinkConnection::MaterializedView(
1169                        MaterializedViewSinkConnection {
1170                            value_desc: RelationDesc::empty(),
1171                            storage_metadata: Default::default(),
1172                        },
1173                    ),
1174                    with_snapshot: Default::default(),
1175                    up_to: Default::default(),
1176                    non_null_assertions: Default::default(),
1177                    refresh_schedule: Default::default(),
1178                };
1179                (id, desc)
1180            })
1181            .collect();
1182
1183        DataflowDescription {
1184            source_imports,
1185            index_imports,
1186            objects_to_build: Default::default(),
1187            index_exports,
1188            sink_exports,
1189            as_of: None,
1190            until: Default::default(),
1191            initial_storage_as_of: Default::default(),
1192            refresh_schedule: Default::default(),
1193            debug_name: Default::default(),
1194            time_dependence: None,
1195        }
1196    }
1197
1198    macro_rules! testcase {
1199        ($name:ident, {
1200            storage: { $( $storage_id:literal: ($read:expr, $write:expr), )* },
1201            dataflows: [ $( $export_id:literal <- $inputs:expr => $as_of:expr, )* ],
1202            current_time: $current_time:literal,
1203            $( read_policies: { $( $policy_id:literal: $policy:expr, )* }, )?
1204            $( read_only: $read_only:expr, )?
1205        }) => {
1206            #[mz_ore::test]
1207            fn $name() {
1208                let storage_ids = [$( $storage_id, )*].into();
1209
1210                let storage_frontiers = StorageFrontiers(BTreeMap::from([
1211                    $(
1212                        (
1213                            $storage_id.parse().unwrap(),
1214                            (ts_to_frontier($read), ts_to_frontier($write)),
1215                        ),
1216                    )*
1217                ]));
1218
1219                let mut dataflows = [
1220                    $(
1221                        dataflow($export_id, &$inputs, &storage_ids),
1222                    )*
1223                ];
1224
1225                let read_policies = BTreeMap::from([
1226                    $($( ($policy_id.parse().unwrap(), $policy), )*)?
1227                ]);
1228
1229                #[allow(unused_variables)]
1230                let read_only = false;
1231                $( let read_only = $read_only; )?
1232
1233                super::run(
1234                    &mut dataflows,
1235                    &read_policies,
1236                    &storage_frontiers,
1237                    $current_time.into(),
1238                    read_only,
1239                );
1240
1241                let actual_as_ofs: Vec<_> = dataflows
1242                    .into_iter()
1243                    .map(|d| d.as_of.unwrap())
1244                    .collect();
1245                let expected_as_ofs = [ $( ts_to_frontier($as_of), )* ];
1246
1247                assert_eq!(actual_as_ofs, expected_as_ofs);
1248            }
1249        };
1250    }
1251
1252    testcase!(upstream_storage_constraints, {
1253        storage: {
1254            "s1": (10, 20),
1255            "s2": (20, 30),
1256        },
1257        dataflows: [
1258            "u1" <- ["s1"]       => 10,
1259            "u2" <- ["s2"]       => 20,
1260            "u3" <- ["s1", "s2"] => 20,
1261            "u4" <- ["u1", "u2"] => 20,
1262        ],
1263        current_time: 0,
1264    });
1265
1266    testcase!(downstream_storage_constraints, {
1267        storage: {
1268            "s1": (10, 20),
1269            "u3": (10, 15),
1270            "u4": (10, 13),
1271        },
1272        dataflows: [
1273            "u1" <- ["s1"] => 19,
1274            "u2" <- ["s1"] => 12,
1275            "u3" <- ["u2"] => 14,
1276            "u4" <- ["u2"] => 12,
1277        ],
1278        current_time: 100,
1279    });
1280
1281    testcase!(warmup_constraints, {
1282        storage: {
1283            "s1": (10, 20),
1284            "s2": (10, 30),
1285            "s3": (10, 40),
1286            "s4": (10, 50),
1287        },
1288        dataflows: [
1289            "u1" <- ["s1"]       => 19,
1290            "u2" <- ["s2"]       => 19,
1291            "u3" <- ["s3"]       => 39,
1292            "u4" <- ["s4"]       => 39,
1293            "u5" <- ["u1", "u2"] => 19,
1294            "u6" <- ["u3", "u4"] => 39,
1295        ],
1296        current_time: 100,
1297    });
1298
1299    testcase!(index_read_policy_constraints, {
1300        storage: {
1301            "s1": (10, 20),
1302            "u6": (10, 18),
1303        },
1304        dataflows: [
1305            "u1" <- ["s1"] => 15,
1306            "u2" <- ["s1"] => 10,
1307            "u3" <- ["s1"] => 13,
1308            "u4" <- ["s1"] => 10,
1309            "u5" <- []     => 95,
1310            "u6" <- ["s1"] => 17,
1311        ],
1312        current_time: 100,
1313        read_policies: {
1314            "u1": ReadPolicy::lag_writes_by(5.into(), 1.into()),
1315            "u2": ReadPolicy::lag_writes_by(15.into(), 1.into()),
1316            "u3": ReadPolicy::ValidFrom(Antichain::from_elem(13.into())),
1317            "u4": ReadPolicy::ValidFrom(Antichain::from_elem(5.into())),
1318            "u5": ReadPolicy::lag_writes_by(5.into(), 1.into()),
1319            "u6": ReadPolicy::ValidFrom(Antichain::from_elem(13.into())),
1320        },
1321    });
1322
1323    testcase!(index_current_time_constraints, {
1324        storage: {
1325            "s1": (10, 20),
1326            "s2": (20, 30),
1327            "u4": (10, 12),
1328            "u5": (10, 18),
1329        },
1330        dataflows: [
1331            "u1" <- ["s1"] => 15,
1332            "u2" <- ["s2"] => 20,
1333            "u3" <- ["s1"] => 11,
1334            "u4" <- ["u3"] => 11,
1335            "u5" <- ["s1"] => 17,
1336            "u6" <- []     => 15,
1337        ],
1338        current_time: 15,
1339    });
1340
1341    testcase!(sealed_storage_sink, {
1342        storage: {
1343            "s1": (10, 20),
1344            "u1": (10, SEALED),
1345        },
1346        dataflows: [
1347            "u1" <- ["s1"] => SEALED,
1348        ],
1349        current_time: 100,
1350    });
1351
1352    testcase!(read_only_dropped_storage_inputs, {
1353        storage: {
1354            "s1": (10, 20),
1355            "s2": (SEALED, SEALED),
1356            "u4": (10, 20),
1357        },
1358        dataflows: [
1359            "u1" <- ["s1"] => 15,
1360            "u2" <- ["s2"] => SEALED,
1361            "u3" <- ["s1", "s2"] => SEALED,
1362            "u4" <- ["u2"] => SEALED,
1363        ],
1364        current_time: 15,
1365        read_only: true,
1366    });
1367
1368    // Regression test for database-issues#9273.
1369    testcase!(github_9273, {
1370        storage: {
1371            "s1": (10, 20),
1372            "u3": (14, 15),
1373        },
1374        dataflows: [
1375            "u1" <- ["s1"] => 14,
1376            "u2" <- ["u1"] => 19,
1377            "u3" <- ["u1"] => 14,
1378        ],
1379        current_time: 100,
1380        read_policies: {
1381            "u1": ReadPolicy::lag_writes_by(1.into(), 1.into()),
1382            "u2": ReadPolicy::lag_writes_by(1.into(), 1.into()),
1383        },
1384    });
1385}