mz_adapter/coord/
timestamp_selection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for selecting timestamps for various operations on collections.
11
12use std::fmt;
13
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use constraints::Constraints;
17use differential_dataflow::lattice::Lattice;
18use itertools::Itertools;
19use mz_adapter_types::dyncfgs::CONSTRAINT_BASED_TIMESTAMP_SELECTION;
20use mz_adapter_types::timestamp_selection::ConstraintBasedTimestampSelection;
21use mz_compute_types::ComputeInstanceId;
22use mz_ore::cast::CastLossy;
23use mz_ore::soft_assert_eq_or_log;
24use mz_repr::{GlobalId, Timestamp, TimestampManipulation};
25use mz_sql::plan::QueryWhen;
26use mz_sql::session::metadata::SessionMetadata;
27use mz_sql::session::vars::IsolationLevel;
28use mz_storage_types::sources::Timeline;
29use serde::{Deserialize, Serialize};
30use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
31use tracing::{Level, event};
32
33use crate::AdapterError;
34use crate::catalog::CatalogState;
35use crate::coord::Coordinator;
36use crate::coord::id_bundle::CollectionIdBundle;
37use crate::coord::read_policy::ReadHolds;
38use crate::coord::timeline::TimelineContext;
39use crate::session::Session;
40
41/// The timeline and timestamp context of a read.
42#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
43pub enum TimestampContext<T> {
44    /// Read is executed in a specific timeline with a specific timestamp.
45    TimelineTimestamp {
46        timeline: Timeline,
47        /// The timestamp that was chosen for a read. This can differ from the
48        /// `oracle_ts` when collections are not readable at the (linearized)
49        /// timestamp for the oracle. In those cases (when the chosen timestamp
50        /// is further ahead than the oracle timestamp) we have to delay
51        /// returning peek results until the timestamp oracle is also
52        /// sufficiently advanced.
53        chosen_ts: T,
54        /// The timestamp that would have been chosen for the read by the
55        /// (linearized) timestamp oracle). In most cases this will be picked as
56        /// the `chosen_ts`.
57        oracle_ts: Option<T>,
58    },
59    /// Read is execute without a timeline or timestamp.
60    NoTimestamp,
61}
62
63impl<T: TimestampManipulation> TimestampContext<T> {
64    /// Creates a `TimestampContext` from a timestamp and `TimelineContext`.
65    pub fn from_timeline_context(
66        chosen_ts: T,
67        oracle_ts: Option<T>,
68        transaction_timeline: Option<Timeline>,
69        timeline_context: &TimelineContext,
70    ) -> TimestampContext<T> {
71        match timeline_context {
72            TimelineContext::TimelineDependent(timeline) => {
73                if let Some(transaction_timeline) = transaction_timeline {
74                    assert_eq!(timeline, &transaction_timeline);
75                }
76                Self::TimelineTimestamp {
77                    timeline: timeline.clone(),
78                    chosen_ts,
79                    oracle_ts,
80                }
81            }
82            TimelineContext::TimestampDependent => {
83                // We default to the `Timeline::EpochMilliseconds` timeline if one doesn't exist.
84                Self::TimelineTimestamp {
85                    timeline: transaction_timeline.unwrap_or(Timeline::EpochMilliseconds),
86                    chosen_ts,
87                    oracle_ts,
88                }
89            }
90            TimelineContext::TimestampIndependent => Self::NoTimestamp,
91        }
92    }
93
94    /// The timeline belonging to this context, if one exists.
95    pub fn timeline(&self) -> Option<&Timeline> {
96        self.timeline_timestamp().map(|tt| tt.0)
97    }
98
99    /// The timestamp belonging to this context, if one exists.
100    pub fn timestamp(&self) -> Option<&T> {
101        self.timeline_timestamp().map(|tt| tt.1)
102    }
103
104    /// The timeline and timestamp belonging to this context, if one exists.
105    pub fn timeline_timestamp(&self) -> Option<(&Timeline, &T)> {
106        match self {
107            Self::TimelineTimestamp {
108                timeline,
109                chosen_ts,
110                ..
111            } => Some((timeline, chosen_ts)),
112            Self::NoTimestamp => None,
113        }
114    }
115
116    /// The timestamp belonging to this context, or a sensible default if one does not exists.
117    pub fn timestamp_or_default(&self) -> T {
118        match self {
119            Self::TimelineTimestamp { chosen_ts, .. } => chosen_ts.clone(),
120            // Anything without a timestamp is given the maximum possible timestamp to indicate
121            // that they have been closed up until the end of time. This allows us to SUBSCRIBE to
122            // static views.
123            Self::NoTimestamp => T::maximum(),
124        }
125    }
126
127    /// Whether or not the context contains a timestamp.
128    pub fn contains_timestamp(&self) -> bool {
129        self.timestamp().is_some()
130    }
131
132    /// Converts this `TimestampContext` to an `Antichain`.
133    pub fn antichain(&self) -> Antichain<T> {
134        Antichain::from_elem(self.timestamp_or_default())
135    }
136}
137
138#[async_trait(?Send)]
139impl TimestampProvider for Coordinator {
140    /// Reports a collection's current read frontier.
141    fn compute_read_frontier(
142        &self,
143        instance: ComputeInstanceId,
144        id: GlobalId,
145    ) -> Antichain<Timestamp> {
146        self.controller
147            .compute
148            .collection_frontiers(id, Some(instance))
149            .expect("id does not exist")
150            .read_frontier
151    }
152
153    /// Reports a collection's current write frontier.
154    fn compute_write_frontier(
155        &self,
156        instance: ComputeInstanceId,
157        id: GlobalId,
158    ) -> Antichain<Timestamp> {
159        self.controller
160            .compute
161            .collection_frontiers(id, Some(instance))
162            .expect("id does not exist")
163            .write_frontier
164    }
165
166    fn storage_frontiers(
167        &self,
168        ids: Vec<GlobalId>,
169    ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)> {
170        self.controller
171            .storage
172            .collections_frontiers(ids)
173            .expect("missing collections")
174    }
175
176    fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds<Timestamp> {
177        self.acquire_read_holds(id_bundle)
178    }
179
180    fn catalog_state(&self) -> &CatalogState {
181        self.catalog().state()
182    }
183}
184
185/// A timestamp determination, which includes the timestamp, constraints, and session oracle read
186/// timestamp.
187#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct RawTimestampDetermination<T> {
189    pub timestamp: T,
190    pub constraints: Option<Constraints>,
191    pub session_oracle_read_ts: Option<T>,
192}
193
194#[async_trait(?Send)]
195pub trait TimestampProvider {
196    fn compute_read_frontier(
197        &self,
198        instance: ComputeInstanceId,
199        id: GlobalId,
200    ) -> Antichain<Timestamp>;
201    fn compute_write_frontier(
202        &self,
203        instance: ComputeInstanceId,
204        id: GlobalId,
205    ) -> Antichain<Timestamp>;
206
207    /// Returns the implied capability (since) and write frontier (upper) for
208    /// the specified storage collections.
209    fn storage_frontiers(
210        &self,
211        ids: Vec<GlobalId>,
212    ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)>;
213
214    fn catalog_state(&self) -> &CatalogState;
215
216    fn get_timeline(timeline_context: &TimelineContext) -> Option<Timeline> {
217        let timeline = match timeline_context {
218            TimelineContext::TimelineDependent(timeline) => Some(timeline.clone()),
219            // We default to the `Timeline::EpochMilliseconds` timeline if one doesn't exist.
220            TimelineContext::TimestampDependent => Some(Timeline::EpochMilliseconds),
221            TimelineContext::TimestampIndependent => None,
222        };
223
224        timeline
225    }
226
227    /// Returns true if-and-only-if the given configuration needs a linearized
228    /// read timestamp from a timestamp oracle.
229    ///
230    /// This assumes that the query happens in the context of a timeline. If
231    /// there is no timeline, we cannot and don't have to get a linearized read
232    /// timestamp.
233    fn needs_linearized_read_ts(isolation_level: &IsolationLevel, when: &QueryWhen) -> bool {
234        // When we're in the context of a timeline (assumption) and one of these
235        // scenarios hold, we need to use a linearized read timestamp:
236        // - The isolation level is Strict Serializable and the `when` allows us to use the
237        //   the timestamp oracle (ex: queries with no AS OF).
238        // - The `when` requires us to use the timestamp oracle (ex: read-then-write queries).
239        when.must_advance_to_timeline_ts()
240            || (when.can_advance_to_timeline_ts()
241                && matches!(
242                    isolation_level,
243                    IsolationLevel::StrictSerializable | IsolationLevel::StrongSessionSerializable
244                ))
245    }
246
247    /// Determines the timestamp for a query using the classical logic (as opposed to constraint-based).
248    fn determine_timestamp_classical(
249        session: &Session,
250        read_holds: &ReadHolds<Timestamp>,
251        id_bundle: &CollectionIdBundle,
252        when: &QueryWhen,
253        oracle_read_ts: Option<Timestamp>,
254        compute_instance: ComputeInstanceId,
255        real_time_recency_ts: Option<Timestamp>,
256        isolation_level: &IsolationLevel,
257        timeline: &Option<Timeline>,
258        largest_not_in_advance_of_upper: Timestamp,
259        since: &Antichain<Timestamp>,
260    ) -> Result<RawTimestampDetermination<Timestamp>, AdapterError> {
261        let mut session_oracle_read_ts = None;
262        // Each involved trace has a validity interval `[since, upper)`.
263        // The contents of a trace are only guaranteed to be correct when
264        // accumulated at a time greater or equal to `since`, and they
265        // are only guaranteed to be currently present for times not
266        // greater or equal to `upper`.
267        //
268        // The plan is to first determine a timestamp, based on the requested
269        // timestamp policy, and then determine if it can be satisfied using
270        // the compacted arrangements we have at hand. It remains unresolved
271        // what to do if it cannot be satisfied (perhaps the query should use
272        // a larger timestamp and block, perhaps the user should intervene).
273
274        {
275            // TODO: We currently split out getting the oracle timestamp because
276            // it's a potentially expensive call, but a call that can be done in an
277            // async task. TimestampProvider is not Send (nor Sync), so we cannot do
278            // the call to `determine_timestamp_for` (including the oracle call) on
279            // an async task. If/when TimestampProvider can become Send, we can fold
280            // the call to the TimestampOracle back into this function.
281            //
282            // We assert here that the logic that determines the oracle timestamp
283            // matches our expectations.
284
285            if timeline.is_some() && Self::needs_linearized_read_ts(isolation_level, when) {
286                assert!(
287                    oracle_read_ts.is_some(),
288                    "should get a timestamp from the oracle for linearized timeline {:?} but didn't",
289                    timeline
290                );
291            }
292        }
293
294        // Initialize candidate to the minimum correct time.
295        let mut candidate = Timestamp::minimum();
296
297        if let Some(ts) = when.advance_to_timestamp() {
298            candidate.join_assign(&ts);
299        }
300
301        if when.advance_to_since() {
302            candidate.advance_by(since.borrow());
303        }
304
305        // If we've acquired a read timestamp from the timestamp oracle, use it
306        // as the new lower bound for the candidate.
307        // In Strong Session Serializable, we ignore the oracle timestamp for now, unless we need
308        // to use it.
309        if let Some(timestamp) = &oracle_read_ts {
310            if isolation_level != &IsolationLevel::StrongSessionSerializable
311                || when.must_advance_to_timeline_ts()
312            {
313                candidate.join_assign(timestamp);
314            }
315        }
316
317        // We advance to the upper in the following scenarios:
318        // - The isolation level is Serializable and the `when` allows us to advance to upper (ex:
319        //   queries with no AS OF). We avoid using the upper in Strict Serializable to prevent
320        //   reading source data that is being written to in the future.
321        // - The isolation level is Strict Serializable but there is no timelines and the `when`
322        //   allows us to advance to upper.
323        if when.can_advance_to_upper()
324            && (isolation_level == &IsolationLevel::Serializable || timeline.is_none())
325        {
326            candidate.join_assign(&largest_not_in_advance_of_upper);
327        }
328
329        if let Some(real_time_recency_ts) = real_time_recency_ts {
330            if !(session.vars().real_time_recency()
331                && isolation_level == &IsolationLevel::StrictSerializable)
332            {
333                // Erring on the side of caution, lets bail out here.
334                // This should never happen in practice, as the real time recency timestamp should
335                // only be supplied when real time recency is enabled.
336                coord_bail!(
337                    "real time recency timestamp should only be supplied when real time recency \
338                            is enabled and the isolation level is strict serializable"
339                );
340            }
341            candidate.join_assign(&real_time_recency_ts);
342        }
343
344        if isolation_level == &IsolationLevel::StrongSessionSerializable {
345            if let Some(timeline) = &timeline {
346                if let Some(oracle) = session.get_timestamp_oracle(timeline) {
347                    let session_ts = oracle.read_ts();
348                    candidate.join_assign(&session_ts);
349                    session_oracle_read_ts = Some(session_ts);
350                }
351            }
352
353            // When advancing the read timestamp under Strong Session Serializable, there is a
354            // trade-off to make between freshness and latency. We can choose a timestamp close the
355            // `upper`, but then later queries might block if the `upper` is too far into the
356            // future. We can chose a timestamp close to the current time, but then we may not be
357            // getting results that are as fresh as possible. As a heuristic, we choose the minimum
358            // of now and the upper, where we use the global timestamp oracle read timestamp as a
359            // proxy for now. If upper > now, then we choose now and prevent blocking future
360            // queries. If upper < now, then we choose the upper and prevent blocking the current
361            // query.
362            if when.can_advance_to_upper() && when.can_advance_to_timeline_ts() {
363                let mut advance_to = largest_not_in_advance_of_upper;
364                if let Some(oracle_read_ts) = oracle_read_ts {
365                    advance_to = std::cmp::min(advance_to, oracle_read_ts);
366                }
367                candidate.join_assign(&advance_to);
368            }
369        }
370
371        // If the timestamp is greater or equal to some element in `since` we are
372        // assured that the answer will be correct.
373        //
374        // It's ok for this timestamp to be larger than the current timestamp of
375        // the timestamp oracle. For Strict Serializable queries, the Coord will
376        // linearize the query by holding back the result until the timestamp
377        // oracle catches up.
378        let timestamp = if since.less_equal(&candidate) {
379            event!(
380                Level::DEBUG,
381                conn_id = format!("{}", session.conn_id()),
382                since = format!("{since:?}"),
383                largest_not_in_advance_of_upper = format!("{largest_not_in_advance_of_upper}"),
384                timestamp = format!("{candidate}")
385            );
386            candidate
387        } else {
388            coord_bail!(generate_timestamp_not_valid_error_msg(
389                id_bundle,
390                compute_instance,
391                read_holds,
392                candidate
393            ));
394        };
395        Ok(RawTimestampDetermination {
396            timestamp,
397            constraints: None,
398            session_oracle_read_ts,
399        })
400    }
401
402    /// Uses constraints and preferences to determine a timestamp for a query.
403    /// Returns the determined timestamp, the constraints that were applied, and
404    /// session_oracle_read_ts.
405    fn determine_timestamp_via_constraints(
406        session: &Session,
407        read_holds: &ReadHolds<Timestamp>,
408        id_bundle: &CollectionIdBundle,
409        when: &QueryWhen,
410        oracle_read_ts: Option<Timestamp>,
411        compute_instance: ComputeInstanceId,
412        real_time_recency_ts: Option<Timestamp>,
413        isolation_level: &IsolationLevel,
414        timeline: &Option<Timeline>,
415        largest_not_in_advance_of_upper: Timestamp,
416    ) -> Result<RawTimestampDetermination<Timestamp>, AdapterError> {
417        use constraints::{Constraints, Preference, Reason};
418
419        let mut session_oracle_read_ts = None;
420        // We start by establishing the hard constraints that must be applied to timestamp determination.
421        // These constraints are derived from the input arguments, and properties of the collections involved.
422        // TODO: Many of the constraints are expressed obliquely, and could be made more direct.
423        let constraints = {
424            // Constraints we will populate through a sequence of opinions.
425            let mut constraints = Constraints::default();
426
427            // First, we have validity constraints from the `id_bundle` argument which indicates
428            // which collections we are reading from.
429            // TODO: Refine the detail about which identifiers are binding and which are not.
430            // TODO(dov): It's not entirely clear to me that there ever would be a non
431            // binding constraint introduced by the `id_bundle`. We should revisit this.
432            let since = read_holds.least_valid_read();
433            let storage = id_bundle
434                .storage_ids
435                .iter()
436                .cloned()
437                .collect::<Vec<GlobalId>>();
438            if !storage.is_empty() {
439                constraints
440                    .lower
441                    .push((since.clone(), Reason::StorageInput(storage)));
442            }
443            let compute = id_bundle
444                .compute_ids
445                .iter()
446                .flat_map(|(key, ids)| ids.iter().map(|id| (*key, *id)))
447                .collect::<Vec<(ComputeInstanceId, GlobalId)>>();
448            if !compute.is_empty() {
449                constraints
450                    .lower
451                    .push((since.clone(), Reason::ComputeInput(compute)));
452            }
453
454            // The query's `when` may indicates a specific timestamp we must advance to, or a specific value we must use.
455            if let Some(ts) = when.advance_to_timestamp() {
456                constraints
457                    .lower
458                    .push((Antichain::from_elem(ts), Reason::QueryAsOf));
459                // If the query is at a specific timestamp, we must introduce an upper bound as well.
460                if when.constrains_upper() {
461                    constraints
462                        .upper
463                        .push((Antichain::from_elem(ts), Reason::QueryAsOf));
464                }
465            }
466
467            // The specification of an `oracle_read_ts` may indicates that we must advance to it,
468            // except in one isolation mode, or if `when` does not indicate that we should.
469            // At the moment, only `QueryWhen::FreshestTableWrite` indicates that we should.
470            // TODO: Should this just depend on the isolation level?
471            if let Some(timestamp) = &oracle_read_ts {
472                if isolation_level != &IsolationLevel::StrongSessionSerializable
473                    || when.must_advance_to_timeline_ts()
474                {
475                    // When specification of an `oracle_read_ts` is required, we must advance to it.
476                    // If it's not present, lets bail out.
477                    constraints.lower.push((
478                        Antichain::from_elem(*timestamp),
479                        Reason::IsolationLevel(*isolation_level),
480                    ));
481                }
482            }
483
484            // If a real time recency timestamp is supplied, we must advance to it.
485            if let Some(real_time_recency_ts) = real_time_recency_ts {
486                assert!(
487                    session.vars().real_time_recency()
488                        && isolation_level == &IsolationLevel::StrictSerializable,
489                    "real time recency timestamp should only be supplied when real time recency \
490                                is enabled and the isolation level is strict serializable"
491                );
492                constraints.lower.push((
493                    Antichain::from_elem(real_time_recency_ts),
494                    Reason::RealTimeRecency,
495                ));
496            }
497
498            // If we are operating in Strong Session Serializable, we use an alternate timestamp lower bound.
499            if isolation_level == &IsolationLevel::StrongSessionSerializable {
500                if let Some(timeline) = &timeline {
501                    if let Some(oracle) = session.get_timestamp_oracle(timeline) {
502                        let session_ts = oracle.read_ts();
503                        constraints.lower.push((
504                            Antichain::from_elem(session_ts),
505                            Reason::IsolationLevel(*isolation_level),
506                        ));
507                        session_oracle_read_ts = Some(session_ts);
508                    }
509
510                    // When advancing the read timestamp under Strong Session Serializable, there is a
511                    // trade-off to make between freshness and latency. We can choose a timestamp close the
512                    // `upper`, but then later queries might block if the `upper` is too far into the
513                    // future. We can chose a timestamp close to the current time, but then we may not be
514                    // getting results that are as fresh as possible. As a heuristic, we choose the minimum
515                    // of now and the upper, where we use the global timestamp oracle read timestamp as a
516                    // proxy for now. If upper > now, then we choose now and prevent blocking future
517                    // queries. If upper < now, then we choose the upper and prevent blocking the current
518                    // query.
519                    if when.can_advance_to_upper() && when.can_advance_to_timeline_ts() {
520                        let mut advance_to = largest_not_in_advance_of_upper;
521                        if let Some(oracle_read_ts) = oracle_read_ts {
522                            advance_to = std::cmp::min(advance_to, oracle_read_ts);
523                        }
524                        constraints.lower.push((
525                            Antichain::from_elem(advance_to),
526                            Reason::IsolationLevel(*isolation_level),
527                        ));
528                    }
529                }
530            }
531
532            constraints.minimize();
533            constraints
534        };
535
536        // Next we establish the preferences that we would like to apply to timestamp determination.
537        // Generally, we want to choose the freshest timestamp possible, although there are exceptions
538        // when we either want a maximally *stale* timestamp, or we want to protect other queries from
539        // a recklessly advanced timestamp.
540        let preferences = {
541            // Counter-intuitively, the only `when` that allows `can_advance_to_upper` is `Immediately`,
542            // and not `FreshestTableWrite`. This is because `FreshestTableWrite` instead imposes a lower
543            // bound through the `oracle_read_ts`, and then requires the stalest valid timestamp.
544
545            if when.can_advance_to_upper()
546                && (isolation_level == &IsolationLevel::Serializable || timeline.is_none())
547            {
548                Preference::FreshestAvailable
549            } else {
550                Preference::StalestValid
551            }
552
553            // TODO: `StrongSessionSerializable` has a different set of preferences that starts to tease
554            // out the trade-off between freshness and responsiveness. I think we don't yet know enough
555            // to properly frame these preferences, though they are clearly aimed at the right concerns.
556        };
557
558        // Determine a candidate based on constraints and preferences.
559        let constraint_candidate = {
560            let mut candidate = Timestamp::minimum();
561            candidate.advance_by(constraints.lower_bound().borrow());
562            // If we have a preference to be the freshest available, advance to the minimum
563            // of the upper bound constraints and the `largest_not_in_advance_of_upper`.
564            if let Preference::FreshestAvailable = preferences {
565                let mut upper_bound = constraints.upper_bound();
566                upper_bound.insert(largest_not_in_advance_of_upper);
567                candidate.advance_by(upper_bound.borrow());
568            }
569            // If the candidate strictly exceeds the upper bound, we didn't have a viable timestamp.
570            if constraints.upper_bound().less_than(&candidate) {
571                coord_bail!(generate_timestamp_not_valid_error_msg(
572                    id_bundle,
573                    compute_instance,
574                    read_holds,
575                    candidate
576                ));
577            } else {
578                candidate
579            }
580        };
581
582        Ok(RawTimestampDetermination {
583            timestamp: constraint_candidate,
584            constraints: Some(constraints),
585            session_oracle_read_ts,
586        })
587    }
588
589    /// Determines the timestamp for a query.
590    ///
591    /// Timestamp determination may fail due to the restricted validity of
592    /// traces. Each has a `since` and `upper` frontier, and are only valid
593    /// after `since` and sure to be available not after `upper`.
594    ///
595    /// The timeline that `id_bundle` belongs to is also returned, if one exists.
596    fn determine_timestamp_for(
597        &self,
598        session: &Session,
599        id_bundle: &CollectionIdBundle,
600        when: &QueryWhen,
601        compute_instance: ComputeInstanceId,
602        timeline_context: &TimelineContext,
603        oracle_read_ts: Option<Timestamp>,
604        real_time_recency_ts: Option<Timestamp>,
605        isolation_level: &IsolationLevel,
606        constraint_based: &ConstraintBasedTimestampSelection,
607    ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
608        // First, we acquire read holds that will ensure the queried collections
609        // stay queryable at the chosen timestamp.
610        let read_holds = self.acquire_read_holds(id_bundle);
611
612        let upper = self.least_valid_write(id_bundle);
613
614        Self::determine_timestamp_for_inner(
615            session,
616            id_bundle,
617            when,
618            compute_instance,
619            timeline_context,
620            oracle_read_ts,
621            real_time_recency_ts,
622            isolation_level,
623            constraint_based,
624            read_holds,
625            upper,
626        )
627    }
628
629    /// Same as determine_timestamp_for, but read_holds and least_valid_write are already passed in.
630    fn determine_timestamp_for_inner(
631        session: &Session,
632        id_bundle: &CollectionIdBundle,
633        when: &QueryWhen,
634        compute_instance: ComputeInstanceId,
635        timeline_context: &TimelineContext,
636        oracle_read_ts: Option<Timestamp>,
637        real_time_recency_ts: Option<Timestamp>,
638        isolation_level: &IsolationLevel,
639        constraint_based: &ConstraintBasedTimestampSelection,
640        read_holds: ReadHolds<Timestamp>,
641        upper: Antichain<Timestamp>,
642    ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
643        let timeline = Self::get_timeline(timeline_context);
644        let largest_not_in_advance_of_upper = Coordinator::largest_not_in_advance_of_upper(&upper);
645        let since = read_holds.least_valid_read();
646
647        let raw_determination = match constraint_based {
648            ConstraintBasedTimestampSelection::Disabled => Self::determine_timestamp_classical(
649                session,
650                &read_holds,
651                id_bundle,
652                when,
653                oracle_read_ts,
654                compute_instance,
655                real_time_recency_ts,
656                isolation_level,
657                &timeline,
658                largest_not_in_advance_of_upper,
659                &since,
660            )?,
661            ConstraintBasedTimestampSelection::Enabled => {
662                Self::determine_timestamp_via_constraints(
663                    session,
664                    &read_holds,
665                    id_bundle,
666                    when,
667                    oracle_read_ts,
668                    compute_instance,
669                    real_time_recency_ts,
670                    isolation_level,
671                    &timeline,
672                    largest_not_in_advance_of_upper,
673                )?
674            }
675            ConstraintBasedTimestampSelection::Verify => {
676                let classical_determination = Self::determine_timestamp_classical(
677                    session,
678                    &read_holds,
679                    id_bundle,
680                    when,
681                    oracle_read_ts,
682                    compute_instance,
683                    real_time_recency_ts,
684                    isolation_level,
685                    &timeline,
686                    largest_not_in_advance_of_upper,
687                    &since,
688                )?;
689
690                match Self::determine_timestamp_via_constraints(
691                    session,
692                    &read_holds,
693                    id_bundle,
694                    when,
695                    oracle_read_ts,
696                    compute_instance,
697                    real_time_recency_ts,
698                    isolation_level,
699                    &timeline,
700                    largest_not_in_advance_of_upper,
701                ) {
702                    Ok(constraint_determination) => {
703                        soft_assert_eq_or_log!(
704                            classical_determination.timestamp,
705                            constraint_determination.timestamp,
706                            "timestamp determination mismatch"
707                        );
708                        if classical_determination.timestamp != constraint_determination.timestamp {
709                            tracing::info!(
710                                "timestamp constrains: {:?}",
711                                constraint_determination.constraints
712                            );
713                        }
714                        RawTimestampDetermination {
715                            timestamp: classical_determination.timestamp,
716                            constraints: constraint_determination.constraints,
717                            session_oracle_read_ts: classical_determination.session_oracle_read_ts,
718                        }
719                    }
720                    Err(e) => {
721                        event!(Level::ERROR, error = ?e, "constraint-based timestamp determination failed");
722                        RawTimestampDetermination {
723                            timestamp: classical_determination.timestamp,
724                            constraints: classical_determination.constraints,
725                            session_oracle_read_ts: classical_determination.session_oracle_read_ts,
726                        }
727                    }
728                }
729            }
730        };
731
732        let timestamp_context = TimestampContext::from_timeline_context(
733            raw_determination.timestamp,
734            oracle_read_ts,
735            timeline,
736            timeline_context,
737        );
738
739        let determination = TimestampDetermination {
740            timestamp_context,
741            since,
742            upper,
743            largest_not_in_advance_of_upper,
744            oracle_read_ts,
745            session_oracle_read_ts: raw_determination.session_oracle_read_ts,
746            real_time_recency_ts,
747            constraints: raw_determination.constraints,
748        };
749
750        Ok((determination, read_holds))
751    }
752
753    /// Acquires [ReadHolds], for the given `id_bundle` at the earliest possible
754    /// times.
755    fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds<mz_repr::Timestamp>;
756
757    /// The smallest common valid write frontier among the specified collections.
758    ///
759    /// Times that are not greater or equal to this frontier are complete for all collections
760    /// identified as arguments.
761    fn least_valid_write(&self, id_bundle: &CollectionIdBundle) -> Antichain<mz_repr::Timestamp> {
762        let mut upper = Antichain::new();
763        {
764            for (_id, _since, collection_upper) in
765                self.storage_frontiers(id_bundle.storage_ids.iter().cloned().collect_vec())
766            {
767                upper.extend(collection_upper);
768            }
769        }
770        {
771            for (instance, compute_ids) in &id_bundle.compute_ids {
772                for id in compute_ids.iter() {
773                    upper.extend(self.compute_write_frontier(*instance, *id).into_iter());
774                }
775            }
776        }
777        upper
778    }
779
780    /// Returns `least_valid_write` - 1, i.e., each time in `least_valid_write` stepped back in a
781    /// saturating way.
782    fn greatest_available_read(&self, id_bundle: &CollectionIdBundle) -> Antichain<Timestamp> {
783        let mut frontier = Antichain::new();
784        for t in self.least_valid_write(id_bundle) {
785            frontier.insert(t.step_back().unwrap_or(t));
786        }
787        frontier
788    }
789}
790
791fn generate_timestamp_not_valid_error_msg(
792    id_bundle: &CollectionIdBundle,
793    compute_instance: ComputeInstanceId,
794    read_holds: &ReadHolds<mz_repr::Timestamp>,
795    candidate: mz_repr::Timestamp,
796) -> String {
797    let mut invalid = Vec::new();
798
799    if let Some(compute_ids) = id_bundle.compute_ids.get(&compute_instance) {
800        for id in compute_ids {
801            let since = read_holds.since(id);
802            if !since.less_equal(&candidate) {
803                invalid.push((*id, since));
804            }
805        }
806    }
807
808    for id in id_bundle.storage_ids.iter() {
809        let since = read_holds.since(id);
810        if !since.less_equal(&candidate) {
811            invalid.push((*id, since));
812        }
813    }
814
815    format!(
816        "Timestamp ({}) is not valid for all inputs: {:?}",
817        candidate, invalid,
818    )
819}
820
821impl Coordinator {
822    pub(crate) async fn oracle_read_ts(
823        &self,
824        session: &Session,
825        timeline_ctx: &TimelineContext,
826        when: &QueryWhen,
827    ) -> Option<Timestamp> {
828        let isolation_level = session.vars().transaction_isolation().clone();
829        let timeline = Coordinator::get_timeline(timeline_ctx);
830        let needs_linearized_read_ts =
831            Coordinator::needs_linearized_read_ts(&isolation_level, when);
832
833        let oracle_read_ts = match timeline {
834            Some(timeline) if needs_linearized_read_ts => {
835                let timestamp_oracle = self.get_timestamp_oracle(&timeline);
836                Some(timestamp_oracle.read_ts().await)
837            }
838            Some(_) | None => None,
839        };
840
841        oracle_read_ts
842    }
843
844    /// Determines the timestamp for a query, acquires read holds that ensure the
845    /// query remains executable at that time, and returns those.
846    /// The caller is responsible for eventually dropping those read holds.
847    #[mz_ore::instrument(level = "debug")]
848    pub(crate) fn determine_timestamp(
849        &self,
850        session: &Session,
851        id_bundle: &CollectionIdBundle,
852        when: &QueryWhen,
853        compute_instance: ComputeInstanceId,
854        timeline_context: &TimelineContext,
855        oracle_read_ts: Option<Timestamp>,
856        real_time_recency_ts: Option<mz_repr::Timestamp>,
857    ) -> Result<
858        (
859            TimestampDetermination<mz_repr::Timestamp>,
860            ReadHolds<mz_repr::Timestamp>,
861        ),
862        AdapterError,
863    > {
864        let constraint_based = ConstraintBasedTimestampSelection::from_str(
865            &CONSTRAINT_BASED_TIMESTAMP_SELECTION
866                .get(self.catalog_state().system_config().dyncfgs()),
867        );
868
869        let isolation_level = session.vars().transaction_isolation();
870        let (det, read_holds) = self.determine_timestamp_for(
871            session,
872            id_bundle,
873            when,
874            compute_instance,
875            timeline_context,
876            oracle_read_ts,
877            real_time_recency_ts,
878            isolation_level,
879            &constraint_based,
880        )?;
881        self.metrics
882            .determine_timestamp
883            .with_label_values(&[
884                match det.respond_immediately() {
885                    true => "true",
886                    false => "false",
887                },
888                isolation_level.as_str(),
889                &compute_instance.to_string(),
890                constraint_based.as_str(),
891            ])
892            .inc();
893        if !det.respond_immediately()
894            && isolation_level == &IsolationLevel::StrictSerializable
895            && real_time_recency_ts.is_none()
896        {
897            if let Some(strict) = det.timestamp_context.timestamp() {
898                let (serializable_det, _tmp_read_holds) = self.determine_timestamp_for(
899                    session,
900                    id_bundle,
901                    when,
902                    compute_instance,
903                    timeline_context,
904                    oracle_read_ts,
905                    real_time_recency_ts,
906                    &IsolationLevel::Serializable,
907                    &constraint_based,
908                )?;
909
910                if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
911                    self.metrics
912                        .timestamp_difference_for_strict_serializable_ms
913                        .with_label_values(&[
914                            &compute_instance.to_string(),
915                            constraint_based.as_str(),
916                        ])
917                        .observe(f64::cast_lossy(u64::from(
918                            strict.saturating_sub(*serializable),
919                        )));
920                }
921            }
922        }
923        Ok((det, read_holds))
924    }
925
926    /// The largest timestamp not greater or equal to an element of `upper`.
927    ///
928    /// If no such timestamp exists, for example because `upper` contains only the
929    /// minimal timestamp, the return value is `Timestamp::minimum()`.
930    pub(crate) fn largest_not_in_advance_of_upper(
931        upper: &Antichain<mz_repr::Timestamp>,
932    ) -> mz_repr::Timestamp {
933        // We peek at the largest element not in advance of `upper`, which
934        // involves a subtraction. If `upper` contains a zero timestamp there
935        // is no "prior" answer, and we do not want to peek at it as it risks
936        // hanging awaiting the response to data that may never arrive.
937        if let Some(upper) = upper.as_option() {
938            upper.step_back().unwrap_or_else(Timestamp::minimum)
939        } else {
940            // A complete trace can be read in its final form with this time.
941            //
942            // This should only happen for literals that have no sources or sources that
943            // are known to have completed (non-tailed files for example).
944            Timestamp::MAX
945        }
946    }
947}
948
949/// Information used when determining the timestamp for a query.
950#[derive(Serialize, Deserialize, Debug, Clone)]
951pub struct TimestampDetermination<T> {
952    /// The chosen timestamp context from `determine_timestamp`.
953    pub timestamp_context: TimestampContext<T>,
954    /// The read frontier of all involved sources.
955    pub since: Antichain<T>,
956    /// The write frontier of all involved sources.
957    pub upper: Antichain<T>,
958    /// The largest timestamp not in advance of upper.
959    pub largest_not_in_advance_of_upper: T,
960    /// The value of the timeline's oracle timestamp, if used.
961    pub oracle_read_ts: Option<T>,
962    /// The value of the session local timestamp's oracle timestamp, if used.
963    pub session_oracle_read_ts: Option<T>,
964    /// The value of the real time recency timestamp, if used.
965    pub real_time_recency_ts: Option<T>,
966    /// The constraints used by the constraint based solver.
967    /// See the [`constraints`] module for more information.
968    pub constraints: Option<Constraints>,
969}
970
971impl<T: TimestampManipulation> TimestampDetermination<T> {
972    pub fn respond_immediately(&self) -> bool {
973        match &self.timestamp_context {
974            TimestampContext::TimelineTimestamp { chosen_ts, .. } => {
975                !self.upper.less_equal(chosen_ts)
976            }
977            TimestampContext::NoTimestamp => true,
978        }
979    }
980}
981
982/// Information used when determining the timestamp for a query.
983#[derive(Clone, Debug, Serialize, Deserialize)]
984pub struct TimestampExplanation<T> {
985    /// The chosen timestamp from `determine_timestamp`.
986    pub determination: TimestampDetermination<T>,
987    /// Details about each source.
988    pub sources: Vec<TimestampSource<T>>,
989    /// Wall time of first statement executed in this transaction
990    pub session_wall_time: DateTime<Utc>,
991    /// Cached value of determination.respond_immediately()
992    pub respond_immediately: bool,
993}
994
995#[derive(Clone, Debug, Serialize, Deserialize)]
996pub struct TimestampSource<T> {
997    pub name: String,
998    pub read_frontier: Vec<T>,
999    pub write_frontier: Vec<T>,
1000}
1001
1002pub trait DisplayableInTimeline {
1003    fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result;
1004    fn display<'a>(&'a self, timeline: Option<&'a Timeline>) -> DisplayInTimeline<'a, Self> {
1005        DisplayInTimeline { t: self, timeline }
1006    }
1007}
1008
1009impl DisplayableInTimeline for mz_repr::Timestamp {
1010    fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
1011        if let Some(Timeline::EpochMilliseconds) = timeline {
1012            let ts_ms: u64 = self.into();
1013            if let Ok(ts_ms) = i64::try_from(ts_ms) {
1014                if let Some(ndt) = DateTime::from_timestamp_millis(ts_ms) {
1015                    return write!(f, "{:13} ({})", self, ndt.format("%Y-%m-%d %H:%M:%S%.3f"));
1016                }
1017            }
1018        }
1019        write!(f, "{:13}", self)
1020    }
1021}
1022
1023pub struct DisplayInTimeline<'a, T: ?Sized> {
1024    t: &'a T,
1025    timeline: Option<&'a Timeline>,
1026}
1027impl<'a, T> fmt::Display for DisplayInTimeline<'a, T>
1028where
1029    T: DisplayableInTimeline,
1030{
1031    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1032        self.t.fmt(self.timeline, f)
1033    }
1034}
1035
1036impl<'a, T> fmt::Debug for DisplayInTimeline<'a, T>
1037where
1038    T: DisplayableInTimeline,
1039{
1040    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1041        fmt::Display::fmt(&self, f)
1042    }
1043}
1044
1045impl<T: fmt::Display + fmt::Debug + DisplayableInTimeline + TimestampManipulation> fmt::Display
1046    for TimestampExplanation<T>
1047{
1048    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1049        let timeline = self.determination.timestamp_context.timeline();
1050        writeln!(
1051            f,
1052            "                query timestamp: {}",
1053            self.determination
1054                .timestamp_context
1055                .timestamp_or_default()
1056                .display(timeline)
1057        )?;
1058        if let Some(oracle_read_ts) = &self.determination.oracle_read_ts {
1059            writeln!(
1060                f,
1061                "          oracle read timestamp: {}",
1062                oracle_read_ts.display(timeline)
1063            )?;
1064        }
1065        if let Some(session_oracle_read_ts) = &self.determination.session_oracle_read_ts {
1066            writeln!(
1067                f,
1068                "  session oracle read timestamp: {}",
1069                session_oracle_read_ts.display(timeline)
1070            )?;
1071        }
1072        if let Some(real_time_recency_ts) = &self.determination.real_time_recency_ts {
1073            writeln!(
1074                f,
1075                "    real time recency timestamp: {}",
1076                real_time_recency_ts.display(timeline)
1077            )?;
1078        }
1079        writeln!(
1080            f,
1081            "largest not in advance of upper: {}",
1082            self.determination
1083                .largest_not_in_advance_of_upper
1084                .display(timeline),
1085        )?;
1086        writeln!(
1087            f,
1088            "                          upper:{:?}",
1089            self.determination
1090                .upper
1091                .iter()
1092                .map(|t| t.display(timeline))
1093                .collect::<Vec<_>>()
1094        )?;
1095        writeln!(
1096            f,
1097            "                          since:{:?}",
1098            self.determination
1099                .since
1100                .iter()
1101                .map(|t| t.display(timeline))
1102                .collect::<Vec<_>>()
1103        )?;
1104        writeln!(
1105            f,
1106            "        can respond immediately: {}",
1107            self.respond_immediately
1108        )?;
1109        writeln!(f, "                       timeline: {:?}", &timeline)?;
1110        writeln!(
1111            f,
1112            "              session wall time: {:13} ({})",
1113            self.session_wall_time.timestamp_millis(),
1114            self.session_wall_time.format("%Y-%m-%d %H:%M:%S%.3f"),
1115        )?;
1116
1117        for source in &self.sources {
1118            writeln!(f, "")?;
1119            writeln!(f, "source {}:", source.name)?;
1120            writeln!(
1121                f,
1122                "                  read frontier:{:?}",
1123                source
1124                    .read_frontier
1125                    .iter()
1126                    .map(|t| t.display(timeline))
1127                    .collect::<Vec<_>>()
1128            )?;
1129            writeln!(
1130                f,
1131                "                 write frontier:{:?}",
1132                source
1133                    .write_frontier
1134                    .iter()
1135                    .map(|t| t.display(timeline))
1136                    .collect::<Vec<_>>()
1137            )?;
1138        }
1139
1140        if let Some(constraints) = &self.determination.constraints {
1141            writeln!(f, "")?;
1142            writeln!(f, "binding constraints:")?;
1143            write!(f, "{}", constraints.display(timeline))?;
1144        }
1145
1146        Ok(())
1147    }
1148}
1149
1150/// Types and logic in support of a constraint-based approach to timestamp determination.
1151mod constraints {
1152
1153    use core::fmt;
1154    use std::fmt::Debug;
1155
1156    use differential_dataflow::lattice::Lattice;
1157    use mz_storage_types::sources::Timeline;
1158    use serde::{Deserialize, Serialize};
1159    use timely::progress::{Antichain, Timestamp};
1160
1161    use mz_compute_types::ComputeInstanceId;
1162    use mz_repr::GlobalId;
1163    use mz_sql::session::vars::IsolationLevel;
1164
1165    use super::DisplayableInTimeline;
1166
1167    /// Constraints expressed on the timestamp of a query.
1168    ///
1169    /// The constraints are expressed on the minimum and maximum values,
1170    /// resulting in a (possibly empty) interval of valid timestamps.
1171    ///
1172    /// The constraints may be redundant, in the interest of providing
1173    /// more complete explanations, but they may also be minimized at
1174    /// any point without altering their behavior by removing redundant
1175    /// constraints.
1176    ///
1177    /// When combined with a `Preference` one can determine an
1178    /// ideal timestamp to use.
1179    #[derive(Default, Serialize, Deserialize, Clone)]
1180    pub struct Constraints {
1181        /// Timestamps and reasons that impose an inclusive lower bound.
1182        pub lower: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
1183        /// Timestamps and reasons that impose an inclusive upper bound.
1184        pub upper: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
1185    }
1186
1187    impl DisplayableInTimeline for Constraints {
1188        fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
1189            if !self.lower.is_empty() {
1190                writeln!(f, "lower:")?;
1191                for (ts, reason) in &self.lower {
1192                    let ts = ts.iter().map(|t| t.display(timeline)).collect::<Vec<_>>();
1193                    writeln!(f, "  ({:?}): {:?}", reason, ts)?;
1194                }
1195            }
1196            if !self.upper.is_empty() {
1197                writeln!(f, "upper:")?;
1198                for (ts, reason) in &self.upper {
1199                    let ts = ts.iter().map(|t| t.display(timeline)).collect::<Vec<_>>();
1200                    writeln!(f, "  ({:?}): {:?}", reason, ts)?;
1201                }
1202            }
1203            Ok(())
1204        }
1205    }
1206
1207    impl Debug for Constraints {
1208        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1209            self.display(None).fmt(f)?;
1210            Ok(())
1211        }
1212    }
1213
1214    impl Constraints {
1215        /// Remove constraints that are dominated by other constraints.
1216        ///
1217        /// This removes redundant constraints, without removing constraints
1218        /// that are "tight" in the sense that the interval would be
1219        /// meaningfully different without them.
1220        /// For example, two constraints at the same
1221        /// time will both be retained, in the interest of full information.
1222        /// But a lower bound constraint at time `t` will be removed if there is a
1223        /// constraint at time `t + 1` (or any larger time).
1224        pub fn minimize(&mut self) {
1225            // Establish the upper bound of lower constraints.
1226            let lower_frontier = self.lower_bound();
1227            // Retain constraints that intersect `lower_frontier`.
1228            self.lower.retain(|(anti, _)| {
1229                anti.iter()
1230                    .any(|time| lower_frontier.elements().contains(time))
1231            });
1232
1233            // Establish the lower bound of upper constraints.
1234            let upper_frontier = self.upper_bound();
1235            // Retain constraints that intersect `upper_frontier`.
1236            self.upper.retain(|(anti, _)| {
1237                anti.iter()
1238                    .any(|time| upper_frontier.elements().contains(time))
1239            });
1240        }
1241
1242        /// An antichain equal to the least upper bound of lower bounds.
1243        pub fn lower_bound(&self) -> Antichain<mz_repr::Timestamp> {
1244            let mut lower = Antichain::from_elem(mz_repr::Timestamp::minimum());
1245            for (anti, _) in self.lower.iter() {
1246                lower = lower.join(anti);
1247            }
1248            lower
1249        }
1250        /// An antichain equal to the greatest lower bound of upper bounds.
1251        pub fn upper_bound(&self) -> Antichain<mz_repr::Timestamp> {
1252            self.upper
1253                .iter()
1254                .flat_map(|(anti, _)| anti.iter())
1255                .cloned()
1256                .collect()
1257        }
1258    }
1259
1260    /// An explanation of reasons for a timestamp constraint.
1261    #[derive(Serialize, Deserialize, Clone)]
1262    pub enum Reason {
1263        /// A compute input at a compute instance.
1264        /// This is something like an index or view
1265        /// that is mantained by compute.
1266        ComputeInput(Vec<(ComputeInstanceId, GlobalId)>),
1267        /// A storage input.
1268        StorageInput(Vec<GlobalId>),
1269        /// A specified isolation level and the timestamp it requires.
1270        IsolationLevel(IsolationLevel),
1271        /// Real-time recency may constrains the timestamp from below.
1272        RealTimeRecency,
1273        /// The query expressed its own constraint on the timestamp.
1274        QueryAsOf,
1275    }
1276
1277    impl Debug for Reason {
1278        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1279            match self {
1280                Reason::ComputeInput(ids) => write_split_ids(f, "ComputeInput", ids),
1281                Reason::StorageInput(ids) => write_split_ids(f, "StorageInput", ids),
1282                Reason::IsolationLevel(level) => {
1283                    write!(f, "IsolationLevel({:?})", level)
1284                }
1285                Reason::RealTimeRecency => {
1286                    write!(f, "RealTimeRecency")
1287                }
1288                Reason::QueryAsOf => {
1289                    write!(f, "QueryAsOf")
1290                }
1291            }
1292        }
1293    }
1294
1295    //TODO: This is a bit of a hack to make the debug output of constraints more readable.
1296    //We should probably have a more structured way to do this.
1297    fn write_split_ids<T: Debug>(f: &mut fmt::Formatter, label: &str, ids: &[T]) -> fmt::Result {
1298        let (ids, rest) = if ids.len() > 10 {
1299            ids.split_at(10)
1300        } else {
1301            let rest: &[T] = &[];
1302            (ids, rest)
1303        };
1304        if rest.is_empty() {
1305            write!(f, "{}({:?})", label, ids)
1306        } else {
1307            write!(f, "{}({:?}, ... {} more)", label, ids, rest.len())
1308        }
1309    }
1310
1311    /// Given an interval [read, write) of timestamp options,
1312    /// this expresses a preference for either end of the spectrum.
1313    pub enum Preference {
1314        /// Prefer the greatest timestamp immediately available.
1315        ///
1316        /// This considers the immediate inputs to a query and
1317        /// selects the greatest timestamp not greater or equal
1318        /// to any of their write frontiers.
1319        ///
1320        /// The preference only relates to immediate query inputs,
1321        /// but it could be extended to transitive inputs as well.
1322        /// For example, one could imagine prefering the freshest
1323        /// data known to be ingested into Materialize, under the
1324        /// premise that those answers should soon become available,
1325        /// and may be more fresh than the immediate inputs.
1326        FreshestAvailable,
1327        /// Prefer the least valid timeastamp.
1328        ///
1329        /// This is useful when one has no expressed freshness
1330        /// constraints, and wants to minimally impact others.
1331        /// For example, `AS OF AT LEAST <time>`.
1332        StalestValid,
1333    }
1334}