mz_adapter/coord/
timestamp_selection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for selecting timestamps for various operations on collections.
11
12use std::fmt;
13
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use constraints::Constraints;
17use differential_dataflow::lattice::Lattice;
18use itertools::Itertools;
19use mz_adapter_types::dyncfgs::CONSTRAINT_BASED_TIMESTAMP_SELECTION;
20use mz_adapter_types::timestamp_selection::ConstraintBasedTimestampSelection;
21use mz_compute_types::ComputeInstanceId;
22use mz_ore::cast::CastLossy;
23use mz_ore::soft_assert_eq_or_log;
24use mz_repr::{GlobalId, Timestamp, TimestampManipulation};
25use mz_sql::plan::QueryWhen;
26use mz_sql::session::metadata::SessionMetadata;
27use mz_sql::session::vars::IsolationLevel;
28use mz_storage_types::sources::Timeline;
29use serde::{Deserialize, Serialize};
30use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
31use tracing::{Level, event};
32
33use crate::AdapterError;
34use crate::catalog::CatalogState;
35use crate::coord::Coordinator;
36use crate::coord::id_bundle::CollectionIdBundle;
37use crate::coord::read_policy::ReadHolds;
38use crate::coord::timeline::TimelineContext;
39use crate::session::Session;
40
41/// The timeline and timestamp context of a read.
42#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
43pub enum TimestampContext<T> {
44    /// Read is executed in a specific timeline with a specific timestamp.
45    TimelineTimestamp {
46        timeline: Timeline,
47        /// The timestamp that was chosen for a read. This can differ from the
48        /// `oracle_ts` when collections are not readable at the (linearized)
49        /// timestamp for the oracle. In those cases (when the chosen timestamp
50        /// is further ahead than the oracle timestamp) we have to delay
51        /// returning peek results until the timestamp oracle is also
52        /// sufficiently advanced.
53        chosen_ts: T,
54        /// The timestamp that would have been chosen for the read by the
55        /// (linearized) timestamp oracle). In most cases this will be picked as
56        /// the `chosen_ts`.
57        oracle_ts: Option<T>,
58    },
59    /// Read is executed without a timeline or timestamp.
60    NoTimestamp,
61}
62
63impl<T: TimestampManipulation> TimestampContext<T> {
64    /// Creates a `TimestampContext` from a timestamp and `TimelineContext`.
65    pub fn from_timeline_context(
66        chosen_ts: T,
67        oracle_ts: Option<T>,
68        transaction_timeline: Option<Timeline>,
69        timeline_context: &TimelineContext,
70    ) -> TimestampContext<T> {
71        match timeline_context {
72            TimelineContext::TimelineDependent(timeline) => {
73                if let Some(transaction_timeline) = transaction_timeline {
74                    assert_eq!(timeline, &transaction_timeline);
75                }
76                Self::TimelineTimestamp {
77                    timeline: timeline.clone(),
78                    chosen_ts,
79                    oracle_ts,
80                }
81            }
82            TimelineContext::TimestampDependent => {
83                // We default to the `Timeline::EpochMilliseconds` timeline if one doesn't exist.
84                Self::TimelineTimestamp {
85                    timeline: transaction_timeline.unwrap_or(Timeline::EpochMilliseconds),
86                    chosen_ts,
87                    oracle_ts,
88                }
89            }
90            TimelineContext::TimestampIndependent => Self::NoTimestamp,
91        }
92    }
93
94    /// The timeline belonging to this context, if one exists.
95    pub fn timeline(&self) -> Option<&Timeline> {
96        self.timeline_timestamp().map(|tt| tt.0)
97    }
98
99    /// The timestamp belonging to this context, if one exists.
100    pub fn timestamp(&self) -> Option<&T> {
101        self.timeline_timestamp().map(|tt| tt.1)
102    }
103
104    /// The timeline and timestamp belonging to this context, if one exists.
105    pub fn timeline_timestamp(&self) -> Option<(&Timeline, &T)> {
106        match self {
107            Self::TimelineTimestamp {
108                timeline,
109                chosen_ts,
110                ..
111            } => Some((timeline, chosen_ts)),
112            Self::NoTimestamp => None,
113        }
114    }
115
116    /// The timestamp belonging to this context, or a sensible default if one does not exists.
117    pub fn timestamp_or_default(&self) -> T {
118        match self {
119            Self::TimelineTimestamp { chosen_ts, .. } => chosen_ts.clone(),
120            // Anything without a timestamp is given the maximum possible timestamp to indicate
121            // that they have been closed up until the end of time. This allows us to SUBSCRIBE to
122            // static views.
123            Self::NoTimestamp => T::maximum(),
124        }
125    }
126
127    /// Whether or not the context contains a timestamp.
128    pub fn contains_timestamp(&self) -> bool {
129        self.timestamp().is_some()
130    }
131
132    /// Converts this `TimestampContext` to an `Antichain`.
133    pub fn antichain(&self) -> Antichain<T> {
134        Antichain::from_elem(self.timestamp_or_default())
135    }
136}
137
138#[async_trait(?Send)]
139impl TimestampProvider for Coordinator {
140    /// Reports a collection's current read frontier.
141    fn compute_read_frontier(
142        &self,
143        instance: ComputeInstanceId,
144        id: GlobalId,
145    ) -> Antichain<Timestamp> {
146        self.controller
147            .compute
148            .collection_frontiers(id, Some(instance))
149            .expect("id does not exist")
150            .read_frontier
151    }
152
153    /// Reports a collection's current write frontier.
154    fn compute_write_frontier(
155        &self,
156        instance: ComputeInstanceId,
157        id: GlobalId,
158    ) -> Antichain<Timestamp> {
159        self.controller
160            .compute
161            .collection_frontiers(id, Some(instance))
162            .expect("id does not exist")
163            .write_frontier
164    }
165
166    fn storage_frontiers(
167        &self,
168        ids: Vec<GlobalId>,
169    ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)> {
170        self.controller
171            .storage
172            .collections_frontiers(ids)
173            .expect("missing collections")
174    }
175
176    fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds<Timestamp> {
177        self.acquire_read_holds(id_bundle)
178    }
179
180    fn catalog_state(&self) -> &CatalogState {
181        self.catalog().state()
182    }
183}
184
185/// A timestamp determination, which includes the timestamp, constraints, and session oracle read
186/// timestamp.
187#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct RawTimestampDetermination<T> {
189    pub timestamp: T,
190    pub constraints: Option<Constraints>,
191    pub session_oracle_read_ts: Option<T>,
192}
193
194#[async_trait(?Send)]
195pub trait TimestampProvider {
196    fn compute_read_frontier(
197        &self,
198        instance: ComputeInstanceId,
199        id: GlobalId,
200    ) -> Antichain<Timestamp>;
201    fn compute_write_frontier(
202        &self,
203        instance: ComputeInstanceId,
204        id: GlobalId,
205    ) -> Antichain<Timestamp>;
206
207    /// Returns the implied capability (since) and write frontier (upper) for
208    /// the specified storage collections.
209    fn storage_frontiers(
210        &self,
211        ids: Vec<GlobalId>,
212    ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)>;
213
214    fn catalog_state(&self) -> &CatalogState;
215
216    fn get_timeline(timeline_context: &TimelineContext) -> Option<Timeline> {
217        let timeline = match timeline_context {
218            TimelineContext::TimelineDependent(timeline) => Some(timeline.clone()),
219            // We default to the `Timeline::EpochMilliseconds` timeline if one doesn't exist.
220            TimelineContext::TimestampDependent => Some(Timeline::EpochMilliseconds),
221            TimelineContext::TimestampIndependent => None,
222        };
223
224        timeline
225    }
226
227    /// Returns true if-and-only-if the given configuration needs a linearized
228    /// read timestamp from a timestamp oracle.
229    ///
230    /// This assumes that the query happens in the context of a timeline. If
231    /// there is no timeline, we cannot and don't have to get a linearized read
232    /// timestamp.
233    fn needs_linearized_read_ts(isolation_level: &IsolationLevel, when: &QueryWhen) -> bool {
234        // When we're in the context of a timeline (assumption) and one of these
235        // scenarios hold, we need to use a linearized read timestamp:
236        // - The isolation level is Strict Serializable and the `when` allows us to use the
237        //   the timestamp oracle (ex: queries with no AS OF).
238        // - The `when` requires us to use the timestamp oracle (ex: read-then-write queries).
239        when.must_advance_to_timeline_ts()
240            || (when.can_advance_to_timeline_ts()
241                && matches!(
242                    isolation_level,
243                    IsolationLevel::StrictSerializable | IsolationLevel::StrongSessionSerializable
244                ))
245    }
246
247    /// Determines the timestamp for a query using the classical logic (as opposed to constraint-based).
248    fn determine_timestamp_classical(
249        session: &Session,
250        read_holds: &ReadHolds<Timestamp>,
251        id_bundle: &CollectionIdBundle,
252        when: &QueryWhen,
253        oracle_read_ts: Option<Timestamp>,
254        compute_instance: ComputeInstanceId,
255        real_time_recency_ts: Option<Timestamp>,
256        isolation_level: &IsolationLevel,
257        timeline: &Option<Timeline>,
258        largest_not_in_advance_of_upper: Timestamp,
259        since: &Antichain<Timestamp>,
260    ) -> Result<RawTimestampDetermination<Timestamp>, AdapterError> {
261        let mut session_oracle_read_ts = None;
262        // Each involved trace has a validity interval `[since, upper)`.
263        // The contents of a trace are only guaranteed to be correct when
264        // accumulated at a time greater or equal to `since`, and they
265        // are only guaranteed to be currently present for times not
266        // greater or equal to `upper`.
267        //
268        // The plan is to first determine a timestamp, based on the requested
269        // timestamp policy, and then determine if it can be satisfied using
270        // the compacted arrangements we have at hand. It remains unresolved
271        // what to do if it cannot be satisfied (perhaps the query should use
272        // a larger timestamp and block, perhaps the user should intervene).
273
274        {
275            // TODO: We currently split out getting the oracle timestamp because
276            // it's a potentially expensive call, but a call that can be done in an
277            // async task. TimestampProvider is not Send (nor Sync), so we cannot do
278            // the call to `determine_timestamp_for` (including the oracle call) on
279            // an async task. If/when TimestampProvider can become Send, we can fold
280            // the call to the TimestampOracle back into this function.
281            //
282            // We assert here that the logic that determines the oracle timestamp
283            // matches our expectations.
284
285            if timeline.is_some() && Self::needs_linearized_read_ts(isolation_level, when) {
286                assert!(
287                    oracle_read_ts.is_some(),
288                    "should get a timestamp from the oracle for linearized timeline {:?} but didn't",
289                    timeline
290                );
291            }
292        }
293
294        // Initialize candidate to the minimum correct time.
295        let mut candidate = Timestamp::minimum();
296
297        if let Some(ts) = when.advance_to_timestamp() {
298            candidate.join_assign(&ts);
299        }
300
301        if when.advance_to_since() {
302            candidate.advance_by(since.borrow());
303        }
304
305        // If we've acquired a read timestamp from the timestamp oracle, use it
306        // as the new lower bound for the candidate.
307        // In Strong Session Serializable, we ignore the oracle timestamp for now, unless we need
308        // to use it.
309        if let Some(timestamp) = &oracle_read_ts {
310            if isolation_level != &IsolationLevel::StrongSessionSerializable
311                || when.must_advance_to_timeline_ts()
312            {
313                candidate.join_assign(timestamp);
314            }
315        }
316
317        // We advance to the upper in the following scenarios:
318        // - The isolation level is Serializable and the `when` allows us to advance to upper (ex:
319        //   queries with no AS OF). We avoid using the upper in Strict Serializable to prevent
320        //   reading source data that is being written to in the future.
321        // - The isolation level is Strict Serializable but there is no timelines and the `when`
322        //   allows us to advance to upper.
323        if when.can_advance_to_upper()
324            && (isolation_level == &IsolationLevel::Serializable || timeline.is_none())
325        {
326            candidate.join_assign(&largest_not_in_advance_of_upper);
327        }
328
329        if let Some(real_time_recency_ts) = real_time_recency_ts {
330            if !(session.vars().real_time_recency()
331                && isolation_level == &IsolationLevel::StrictSerializable)
332            {
333                // Erring on the side of caution, lets bail out here.
334                // This should never happen in practice, as the real time recency timestamp should
335                // only be supplied when real time recency is enabled.
336                coord_bail!(
337                    "real time recency timestamp should only be supplied when real time recency \
338                            is enabled and the isolation level is strict serializable"
339                );
340            }
341            candidate.join_assign(&real_time_recency_ts);
342        }
343
344        if isolation_level == &IsolationLevel::StrongSessionSerializable {
345            if let Some(timeline) = &timeline {
346                if let Some(oracle) = session.get_timestamp_oracle(timeline) {
347                    let session_ts = oracle.read_ts();
348                    candidate.join_assign(&session_ts);
349                    session_oracle_read_ts = Some(session_ts);
350                }
351            }
352
353            // When advancing the read timestamp under Strong Session Serializable, there is a
354            // trade-off to make between freshness and latency. We can choose a timestamp close the
355            // `upper`, but then later queries might block if the `upper` is too far into the
356            // future. We can chose a timestamp close to the current time, but then we may not be
357            // getting results that are as fresh as possible. As a heuristic, we choose the minimum
358            // of now and the upper, where we use the global timestamp oracle read timestamp as a
359            // proxy for now. If upper > now, then we choose now and prevent blocking future
360            // queries. If upper < now, then we choose the upper and prevent blocking the current
361            // query.
362            if when.can_advance_to_upper() && when.can_advance_to_timeline_ts() {
363                let mut advance_to = largest_not_in_advance_of_upper;
364                if let Some(oracle_read_ts) = oracle_read_ts {
365                    advance_to = std::cmp::min(advance_to, oracle_read_ts);
366                }
367                candidate.join_assign(&advance_to);
368            }
369        }
370
371        // If the timestamp is greater or equal to some element in `since` we are
372        // assured that the answer will be correct.
373        //
374        // It's ok for this timestamp to be larger than the current timestamp of
375        // the timestamp oracle. For Strict Serializable queries, the Coord will
376        // linearize the query by holding back the result until the timestamp
377        // oracle catches up.
378        let timestamp = if since.less_equal(&candidate) {
379            event!(
380                Level::DEBUG,
381                conn_id = format!("{}", session.conn_id()),
382                since = format!("{since:?}"),
383                largest_not_in_advance_of_upper = format!("{largest_not_in_advance_of_upper}"),
384                timestamp = format!("{candidate}")
385            );
386            candidate
387        } else {
388            coord_bail!(generate_timestamp_not_valid_error_msg(
389                id_bundle,
390                compute_instance,
391                read_holds,
392                candidate
393            ));
394        };
395        Ok(RawTimestampDetermination {
396            timestamp,
397            constraints: None,
398            session_oracle_read_ts,
399        })
400    }
401
402    /// Uses constraints and preferences to determine a timestamp for a query.
403    /// Returns the determined timestamp, the constraints that were applied, and
404    /// session_oracle_read_ts.
405    fn determine_timestamp_via_constraints(
406        session: &Session,
407        read_holds: &ReadHolds<Timestamp>,
408        id_bundle: &CollectionIdBundle,
409        when: &QueryWhen,
410        oracle_read_ts: Option<Timestamp>,
411        compute_instance: ComputeInstanceId,
412        real_time_recency_ts: Option<Timestamp>,
413        isolation_level: &IsolationLevel,
414        timeline: &Option<Timeline>,
415        largest_not_in_advance_of_upper: Timestamp,
416    ) -> Result<RawTimestampDetermination<Timestamp>, AdapterError> {
417        use constraints::{Constraints, Preference, Reason};
418
419        let mut session_oracle_read_ts = None;
420        // We start by establishing the hard constraints that must be applied to timestamp determination.
421        // These constraints are derived from the input arguments, and properties of the collections involved.
422        // TODO: Many of the constraints are expressed obliquely, and could be made more direct.
423        let constraints = {
424            // Constraints we will populate through a sequence of opinions.
425            let mut constraints = Constraints::default();
426
427            // First, we have validity constraints from the `id_bundle` argument which indicates
428            // which collections we are reading from.
429            // TODO: Refine the detail about which identifiers are binding and which are not.
430            // TODO(dov): It's not entirely clear to me that there ever would be a non
431            // binding constraint introduced by the `id_bundle`. We should revisit this.
432            let since = read_holds.least_valid_read();
433            let storage = id_bundle
434                .storage_ids
435                .iter()
436                .cloned()
437                .collect::<Vec<GlobalId>>();
438            if !storage.is_empty() {
439                constraints
440                    .lower
441                    .push((since.clone(), Reason::StorageInput(storage)));
442            }
443            let compute = id_bundle
444                .compute_ids
445                .iter()
446                .flat_map(|(key, ids)| ids.iter().map(|id| (*key, *id)))
447                .collect::<Vec<(ComputeInstanceId, GlobalId)>>();
448            if !compute.is_empty() {
449                constraints
450                    .lower
451                    .push((since.clone(), Reason::ComputeInput(compute)));
452            }
453
454            // The query's `when` may indicates a specific timestamp we must advance to, or a specific value we must use.
455            if let Some(ts) = when.advance_to_timestamp() {
456                constraints
457                    .lower
458                    .push((Antichain::from_elem(ts), Reason::QueryAsOf));
459                // If the query is at a specific timestamp, we must introduce an upper bound as well.
460                if when.constrains_upper() {
461                    constraints
462                        .upper
463                        .push((Antichain::from_elem(ts), Reason::QueryAsOf));
464                }
465            }
466
467            // The specification of an `oracle_read_ts` may indicates that we must advance to it,
468            // except in one isolation mode, or if `when` does not indicate that we should.
469            // At the moment, only `QueryWhen::FreshestTableWrite` indicates that we should.
470            // TODO: Should this just depend on the isolation level?
471            if let Some(timestamp) = &oracle_read_ts {
472                if isolation_level != &IsolationLevel::StrongSessionSerializable
473                    || when.must_advance_to_timeline_ts()
474                {
475                    // When specification of an `oracle_read_ts` is required, we must advance to it.
476                    // If it's not present, lets bail out.
477                    constraints.lower.push((
478                        Antichain::from_elem(*timestamp),
479                        Reason::IsolationLevel(*isolation_level),
480                    ));
481                }
482            }
483
484            // If a real time recency timestamp is supplied, we must advance to it.
485            if let Some(real_time_recency_ts) = real_time_recency_ts {
486                assert!(
487                    session.vars().real_time_recency()
488                        && isolation_level == &IsolationLevel::StrictSerializable,
489                    "real time recency timestamp should only be supplied when real time recency \
490                                is enabled and the isolation level is strict serializable"
491                );
492                constraints.lower.push((
493                    Antichain::from_elem(real_time_recency_ts),
494                    Reason::RealTimeRecency,
495                ));
496            }
497
498            // If we are operating in Strong Session Serializable, we use an alternate timestamp lower bound.
499            if isolation_level == &IsolationLevel::StrongSessionSerializable {
500                if let Some(timeline) = &timeline {
501                    if let Some(oracle) = session.get_timestamp_oracle(timeline) {
502                        let session_ts = oracle.read_ts();
503                        constraints.lower.push((
504                            Antichain::from_elem(session_ts),
505                            Reason::IsolationLevel(*isolation_level),
506                        ));
507                        session_oracle_read_ts = Some(session_ts);
508                    }
509
510                    // When advancing the read timestamp under Strong Session Serializable, there is a
511                    // trade-off to make between freshness and latency. We can choose a timestamp close the
512                    // `upper`, but then later queries might block if the `upper` is too far into the
513                    // future. We can chose a timestamp close to the current time, but then we may not be
514                    // getting results that are as fresh as possible. As a heuristic, we choose the minimum
515                    // of now and the upper, where we use the global timestamp oracle read timestamp as a
516                    // proxy for now. If upper > now, then we choose now and prevent blocking future
517                    // queries. If upper < now, then we choose the upper and prevent blocking the current
518                    // query.
519                    if when.can_advance_to_upper() && when.can_advance_to_timeline_ts() {
520                        let mut advance_to = largest_not_in_advance_of_upper;
521                        if let Some(oracle_read_ts) = oracle_read_ts {
522                            advance_to = std::cmp::min(advance_to, oracle_read_ts);
523                        }
524                        constraints.lower.push((
525                            Antichain::from_elem(advance_to),
526                            Reason::IsolationLevel(*isolation_level),
527                        ));
528                    }
529                }
530            }
531
532            constraints.minimize();
533            constraints
534        };
535
536        // Next we establish the preferences that we would like to apply to timestamp determination.
537        // Generally, we want to choose the freshest timestamp possible, although there are exceptions
538        // when we either want a maximally *stale* timestamp, or we want to protect other queries from
539        // a recklessly advanced timestamp.
540        let preferences = {
541            // Counter-intuitively, the only `when` that allows `can_advance_to_upper` is `Immediately`,
542            // and not `FreshestTableWrite`. This is because `FreshestTableWrite` instead imposes a lower
543            // bound through the `oracle_read_ts`, and then requires the stalest valid timestamp.
544
545            if when.can_advance_to_upper()
546                && (isolation_level == &IsolationLevel::Serializable || timeline.is_none())
547            {
548                Preference::FreshestAvailable
549            } else {
550                Preference::StalestValid
551            }
552
553            // TODO: `StrongSessionSerializable` has a different set of preferences that starts to tease
554            // out the trade-off between freshness and responsiveness. I think we don't yet know enough
555            // to properly frame these preferences, though they are clearly aimed at the right concerns.
556        };
557
558        // Determine a candidate based on constraints and preferences.
559        let constraint_candidate = {
560            let mut candidate = Timestamp::minimum();
561            candidate.advance_by(constraints.lower_bound().borrow());
562            // If we have a preference to be the freshest available, advance to the minimum
563            // of the upper bound constraints and the `largest_not_in_advance_of_upper`.
564            if let Preference::FreshestAvailable = preferences {
565                let mut upper_bound = constraints.upper_bound();
566                upper_bound.insert(largest_not_in_advance_of_upper);
567                candidate.advance_by(upper_bound.borrow());
568            }
569            // If the candidate strictly exceeds the upper bound, we didn't have a viable timestamp.
570            if constraints.upper_bound().less_than(&candidate) {
571                coord_bail!(generate_timestamp_not_valid_error_msg(
572                    id_bundle,
573                    compute_instance,
574                    read_holds,
575                    candidate
576                ));
577            } else {
578                candidate
579            }
580        };
581
582        Ok(RawTimestampDetermination {
583            timestamp: constraint_candidate,
584            constraints: Some(constraints),
585            session_oracle_read_ts,
586        })
587    }
588
589    /// Determines the timestamp for a query.
590    ///
591    /// Timestamp determination may fail due to the restricted validity of
592    /// traces. Each has a `since` and `upper` frontier, and are only valid
593    /// after `since` and sure to be available not after `upper`.
594    ///
595    /// The timeline that `id_bundle` belongs to is also returned, if one exists.
596    fn determine_timestamp_for(
597        &self,
598        session: &Session,
599        id_bundle: &CollectionIdBundle,
600        when: &QueryWhen,
601        compute_instance: ComputeInstanceId,
602        timeline_context: &TimelineContext,
603        oracle_read_ts: Option<Timestamp>,
604        real_time_recency_ts: Option<Timestamp>,
605        isolation_level: &IsolationLevel,
606        constraint_based: &ConstraintBasedTimestampSelection,
607    ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
608        // First, we acquire read holds that will ensure the queried collections
609        // stay queryable at the chosen timestamp.
610        let read_holds = self.acquire_read_holds(id_bundle);
611
612        let upper = self.least_valid_write(id_bundle);
613
614        Self::determine_timestamp_for_inner(
615            session,
616            id_bundle,
617            when,
618            compute_instance,
619            timeline_context,
620            oracle_read_ts,
621            real_time_recency_ts,
622            isolation_level,
623            constraint_based,
624            read_holds,
625            upper,
626        )
627    }
628
629    /// Same as determine_timestamp_for, but read_holds and least_valid_write are already passed in.
630    fn determine_timestamp_for_inner(
631        session: &Session,
632        id_bundle: &CollectionIdBundle,
633        when: &QueryWhen,
634        compute_instance: ComputeInstanceId,
635        timeline_context: &TimelineContext,
636        oracle_read_ts: Option<Timestamp>,
637        real_time_recency_ts: Option<Timestamp>,
638        isolation_level: &IsolationLevel,
639        constraint_based: &ConstraintBasedTimestampSelection,
640        read_holds: ReadHolds<Timestamp>,
641        upper: Antichain<Timestamp>,
642    ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
643        let timeline = Self::get_timeline(timeline_context);
644        let largest_not_in_advance_of_upper = Coordinator::largest_not_in_advance_of_upper(&upper);
645        let since = read_holds.least_valid_read();
646
647        let raw_determination = match constraint_based {
648            ConstraintBasedTimestampSelection::Disabled => Self::determine_timestamp_classical(
649                session,
650                &read_holds,
651                id_bundle,
652                when,
653                oracle_read_ts,
654                compute_instance,
655                real_time_recency_ts,
656                isolation_level,
657                &timeline,
658                largest_not_in_advance_of_upper,
659                &since,
660            )?,
661            ConstraintBasedTimestampSelection::Enabled => {
662                Self::determine_timestamp_via_constraints(
663                    session,
664                    &read_holds,
665                    id_bundle,
666                    when,
667                    oracle_read_ts,
668                    compute_instance,
669                    real_time_recency_ts,
670                    isolation_level,
671                    &timeline,
672                    largest_not_in_advance_of_upper,
673                )?
674            }
675            ConstraintBasedTimestampSelection::Verify => {
676                let classical_determination = Self::determine_timestamp_classical(
677                    session,
678                    &read_holds,
679                    id_bundle,
680                    when,
681                    oracle_read_ts,
682                    compute_instance,
683                    real_time_recency_ts,
684                    isolation_level,
685                    &timeline,
686                    largest_not_in_advance_of_upper,
687                    &since,
688                )?;
689
690                match Self::determine_timestamp_via_constraints(
691                    session,
692                    &read_holds,
693                    id_bundle,
694                    when,
695                    oracle_read_ts,
696                    compute_instance,
697                    real_time_recency_ts,
698                    isolation_level,
699                    &timeline,
700                    largest_not_in_advance_of_upper,
701                ) {
702                    Ok(constraint_determination) => {
703                        soft_assert_eq_or_log!(
704                            classical_determination.timestamp,
705                            constraint_determination.timestamp,
706                            "timestamp determination mismatch"
707                        );
708                        if classical_determination.timestamp != constraint_determination.timestamp {
709                            tracing::info!(
710                                "timestamp constrains: {:?}",
711                                constraint_determination.constraints
712                            );
713                        }
714                        RawTimestampDetermination {
715                            timestamp: classical_determination.timestamp,
716                            constraints: constraint_determination.constraints,
717                            session_oracle_read_ts: classical_determination.session_oracle_read_ts,
718                        }
719                    }
720                    Err(e) => {
721                        event!(Level::ERROR, error = ?e, "constraint-based timestamp determination failed");
722                        RawTimestampDetermination {
723                            timestamp: classical_determination.timestamp,
724                            constraints: classical_determination.constraints,
725                            session_oracle_read_ts: classical_determination.session_oracle_read_ts,
726                        }
727                    }
728                }
729            }
730        };
731
732        let timestamp_context = TimestampContext::from_timeline_context(
733            raw_determination.timestamp,
734            oracle_read_ts,
735            timeline,
736            timeline_context,
737        );
738
739        let determination = TimestampDetermination {
740            timestamp_context,
741            since,
742            upper,
743            largest_not_in_advance_of_upper,
744            oracle_read_ts,
745            session_oracle_read_ts: raw_determination.session_oracle_read_ts,
746            real_time_recency_ts,
747            constraints: raw_determination.constraints,
748        };
749
750        Ok((determination, read_holds))
751    }
752
753    /// Acquires [ReadHolds], for the given `id_bundle` at the earliest possible
754    /// times.
755    fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds<mz_repr::Timestamp>;
756
757    /// The smallest common valid write frontier among the specified collections.
758    ///
759    /// Times that are not greater or equal to this frontier are complete for all collections
760    /// identified as arguments.
761    fn least_valid_write(&self, id_bundle: &CollectionIdBundle) -> Antichain<mz_repr::Timestamp> {
762        let mut upper = Antichain::new();
763        {
764            for (_id, _since, collection_upper) in
765                self.storage_frontiers(id_bundle.storage_ids.iter().cloned().collect_vec())
766            {
767                upper.extend(collection_upper);
768            }
769        }
770        {
771            for (instance, compute_ids) in &id_bundle.compute_ids {
772                for id in compute_ids.iter() {
773                    upper.extend(self.compute_write_frontier(*instance, *id).into_iter());
774                }
775            }
776        }
777        upper
778    }
779
780    /// Returns `least_valid_write` - 1, i.e., each time in `least_valid_write` stepped back in a
781    /// saturating way.
782    fn greatest_available_read(&self, id_bundle: &CollectionIdBundle) -> Antichain<Timestamp> {
783        let mut frontier = Antichain::new();
784        for t in self.least_valid_write(id_bundle) {
785            frontier.insert(t.step_back().unwrap_or(t));
786        }
787        frontier
788    }
789}
790
791fn generate_timestamp_not_valid_error_msg(
792    id_bundle: &CollectionIdBundle,
793    compute_instance: ComputeInstanceId,
794    read_holds: &ReadHolds<mz_repr::Timestamp>,
795    candidate: mz_repr::Timestamp,
796) -> String {
797    let mut invalid = Vec::new();
798
799    if let Some(compute_ids) = id_bundle.compute_ids.get(&compute_instance) {
800        for id in compute_ids {
801            let since = read_holds.since(id);
802            if !since.less_equal(&candidate) {
803                invalid.push((*id, since));
804            }
805        }
806    }
807
808    for id in id_bundle.storage_ids.iter() {
809        let since = read_holds.since(id);
810        if !since.less_equal(&candidate) {
811            invalid.push((*id, since));
812        }
813    }
814
815    format!(
816        "Timestamp ({}) is not valid for all inputs: {:?}",
817        candidate, invalid,
818    )
819}
820
821impl Coordinator {
822    pub(crate) async fn oracle_read_ts(
823        &self,
824        session: &Session,
825        timeline_ctx: &TimelineContext,
826        when: &QueryWhen,
827    ) -> Option<Timestamp> {
828        let isolation_level = session.vars().transaction_isolation().clone();
829        let timeline = Coordinator::get_timeline(timeline_ctx);
830        let needs_linearized_read_ts =
831            Coordinator::needs_linearized_read_ts(&isolation_level, when);
832
833        let oracle_read_ts = match timeline {
834            Some(timeline) if needs_linearized_read_ts => {
835                let timestamp_oracle = self.get_timestamp_oracle(&timeline);
836                Some(timestamp_oracle.read_ts().await)
837            }
838            Some(_) | None => None,
839        };
840
841        oracle_read_ts
842    }
843
844    /// Determines the timestamp for a query, acquires read holds that ensure the
845    /// query remains executable at that time, and returns those.
846    /// The caller is responsible for eventually dropping those read holds.
847    #[mz_ore::instrument(level = "debug")]
848    pub(crate) fn determine_timestamp(
849        &self,
850        session: &Session,
851        id_bundle: &CollectionIdBundle,
852        when: &QueryWhen,
853        compute_instance: ComputeInstanceId,
854        timeline_context: &TimelineContext,
855        oracle_read_ts: Option<Timestamp>,
856        real_time_recency_ts: Option<mz_repr::Timestamp>,
857    ) -> Result<
858        (
859            TimestampDetermination<mz_repr::Timestamp>,
860            ReadHolds<mz_repr::Timestamp>,
861        ),
862        AdapterError,
863    > {
864        let constraint_based = ConstraintBasedTimestampSelection::from_str(
865            &CONSTRAINT_BASED_TIMESTAMP_SELECTION
866                .get(self.catalog_state().system_config().dyncfgs()),
867        );
868
869        let isolation_level = session.vars().transaction_isolation();
870        let (det, read_holds) = self.determine_timestamp_for(
871            session,
872            id_bundle,
873            when,
874            compute_instance,
875            timeline_context,
876            oracle_read_ts,
877            real_time_recency_ts,
878            isolation_level,
879            &constraint_based,
880        )?;
881        self.metrics
882            .determine_timestamp
883            .with_label_values(&[
884                match det.respond_immediately() {
885                    true => "true",
886                    false => "false",
887                },
888                isolation_level.as_str(),
889                &compute_instance.to_string(),
890                constraint_based.as_str(),
891            ])
892            .inc();
893        if !det.respond_immediately()
894            && isolation_level == &IsolationLevel::StrictSerializable
895            && real_time_recency_ts.is_none()
896        {
897            // Note down the difference between StrictSerializable and Serializable into a metric.
898            if let Some(strict) = det.timestamp_context.timestamp() {
899                let (serializable_det, _tmp_read_holds) = self.determine_timestamp_for(
900                    session,
901                    id_bundle,
902                    when,
903                    compute_instance,
904                    timeline_context,
905                    oracle_read_ts,
906                    real_time_recency_ts,
907                    &IsolationLevel::Serializable,
908                    &constraint_based,
909                )?;
910
911                if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
912                    self.metrics
913                        .timestamp_difference_for_strict_serializable_ms
914                        .with_label_values(&[
915                            compute_instance.to_string().as_ref(),
916                            constraint_based.as_str(),
917                        ])
918                        .observe(f64::cast_lossy(u64::from(
919                            strict.saturating_sub(*serializable),
920                        )));
921                }
922            }
923        }
924        Ok((det, read_holds))
925    }
926
927    /// The largest timestamp not greater or equal to an element of `upper`.
928    ///
929    /// If no such timestamp exists, for example because `upper` contains only the
930    /// minimal timestamp, the return value is `Timestamp::minimum()`.
931    pub(crate) fn largest_not_in_advance_of_upper(
932        upper: &Antichain<mz_repr::Timestamp>,
933    ) -> mz_repr::Timestamp {
934        // We peek at the largest element not in advance of `upper`, which
935        // involves a subtraction. If `upper` contains a zero timestamp there
936        // is no "prior" answer, and we do not want to peek at it as it risks
937        // hanging awaiting the response to data that may never arrive.
938        if let Some(upper) = upper.as_option() {
939            upper.step_back().unwrap_or_else(Timestamp::minimum)
940        } else {
941            // A complete trace can be read in its final form with this time.
942            //
943            // This should only happen for literals that have no sources or sources that
944            // are known to have completed (non-tailed files for example).
945            Timestamp::MAX
946        }
947    }
948}
949
950/// Information used when determining the timestamp for a query.
951#[derive(Serialize, Deserialize, Debug, Clone)]
952pub struct TimestampDetermination<T> {
953    /// The chosen timestamp context from `determine_timestamp`.
954    pub timestamp_context: TimestampContext<T>,
955    /// The read frontier of all involved sources.
956    pub since: Antichain<T>,
957    /// The write frontier of all involved sources.
958    pub upper: Antichain<T>,
959    /// The largest timestamp not in advance of upper.
960    pub largest_not_in_advance_of_upper: T,
961    /// The value of the timeline's oracle timestamp, if used.
962    pub oracle_read_ts: Option<T>,
963    /// The value of the session local timestamp's oracle timestamp, if used.
964    pub session_oracle_read_ts: Option<T>,
965    /// The value of the real time recency timestamp, if used.
966    pub real_time_recency_ts: Option<T>,
967    /// The constraints used by the constraint based solver.
968    /// See the [`constraints`] module for more information.
969    pub constraints: Option<Constraints>,
970}
971
972impl<T: TimestampManipulation> TimestampDetermination<T> {
973    pub fn respond_immediately(&self) -> bool {
974        match &self.timestamp_context {
975            TimestampContext::TimelineTimestamp { chosen_ts, .. } => {
976                !self.upper.less_equal(chosen_ts)
977            }
978            TimestampContext::NoTimestamp => true,
979        }
980    }
981}
982
983/// Information used when determining the timestamp for a query.
984#[derive(Clone, Debug, Serialize, Deserialize)]
985pub struct TimestampExplanation<T> {
986    /// The chosen timestamp from `determine_timestamp`.
987    pub determination: TimestampDetermination<T>,
988    /// Details about each source.
989    pub sources: Vec<TimestampSource<T>>,
990    /// Wall time of first statement executed in this transaction
991    pub session_wall_time: DateTime<Utc>,
992    /// Cached value of determination.respond_immediately()
993    pub respond_immediately: bool,
994}
995
996#[derive(Clone, Debug, Serialize, Deserialize)]
997pub struct TimestampSource<T> {
998    pub name: String,
999    pub read_frontier: Vec<T>,
1000    pub write_frontier: Vec<T>,
1001}
1002
1003pub trait DisplayableInTimeline {
1004    fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result;
1005    fn display<'a>(&'a self, timeline: Option<&'a Timeline>) -> DisplayInTimeline<'a, Self> {
1006        DisplayInTimeline { t: self, timeline }
1007    }
1008}
1009
1010impl DisplayableInTimeline for mz_repr::Timestamp {
1011    fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
1012        if let Some(Timeline::EpochMilliseconds) = timeline {
1013            let ts_ms: u64 = self.into();
1014            if let Ok(ts_ms) = i64::try_from(ts_ms) {
1015                if let Some(ndt) = DateTime::from_timestamp_millis(ts_ms) {
1016                    return write!(f, "{:13} ({})", self, ndt.format("%Y-%m-%d %H:%M:%S%.3f"));
1017                }
1018            }
1019        }
1020        write!(f, "{:13}", self)
1021    }
1022}
1023
1024pub struct DisplayInTimeline<'a, T: ?Sized> {
1025    t: &'a T,
1026    timeline: Option<&'a Timeline>,
1027}
1028impl<'a, T> fmt::Display for DisplayInTimeline<'a, T>
1029where
1030    T: DisplayableInTimeline,
1031{
1032    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1033        self.t.fmt(self.timeline, f)
1034    }
1035}
1036
1037impl<'a, T> fmt::Debug for DisplayInTimeline<'a, T>
1038where
1039    T: DisplayableInTimeline,
1040{
1041    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1042        fmt::Display::fmt(&self, f)
1043    }
1044}
1045
1046impl<T: fmt::Display + fmt::Debug + DisplayableInTimeline + TimestampManipulation> fmt::Display
1047    for TimestampExplanation<T>
1048{
1049    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1050        let timeline = self.determination.timestamp_context.timeline();
1051        writeln!(
1052            f,
1053            "                query timestamp: {}",
1054            self.determination
1055                .timestamp_context
1056                .timestamp_or_default()
1057                .display(timeline)
1058        )?;
1059        if let Some(oracle_read_ts) = &self.determination.oracle_read_ts {
1060            writeln!(
1061                f,
1062                "          oracle read timestamp: {}",
1063                oracle_read_ts.display(timeline)
1064            )?;
1065        }
1066        if let Some(session_oracle_read_ts) = &self.determination.session_oracle_read_ts {
1067            writeln!(
1068                f,
1069                "  session oracle read timestamp: {}",
1070                session_oracle_read_ts.display(timeline)
1071            )?;
1072        }
1073        if let Some(real_time_recency_ts) = &self.determination.real_time_recency_ts {
1074            writeln!(
1075                f,
1076                "    real time recency timestamp: {}",
1077                real_time_recency_ts.display(timeline)
1078            )?;
1079        }
1080        writeln!(
1081            f,
1082            "largest not in advance of upper: {}",
1083            self.determination
1084                .largest_not_in_advance_of_upper
1085                .display(timeline),
1086        )?;
1087        writeln!(
1088            f,
1089            "                          upper:{:?}",
1090            self.determination
1091                .upper
1092                .iter()
1093                .map(|t| t.display(timeline))
1094                .collect::<Vec<_>>()
1095        )?;
1096        writeln!(
1097            f,
1098            "                          since:{:?}",
1099            self.determination
1100                .since
1101                .iter()
1102                .map(|t| t.display(timeline))
1103                .collect::<Vec<_>>()
1104        )?;
1105        writeln!(
1106            f,
1107            "        can respond immediately: {}",
1108            self.respond_immediately
1109        )?;
1110        writeln!(f, "                       timeline: {:?}", &timeline)?;
1111        writeln!(
1112            f,
1113            "              session wall time: {:13} ({})",
1114            self.session_wall_time.timestamp_millis(),
1115            self.session_wall_time.format("%Y-%m-%d %H:%M:%S%.3f"),
1116        )?;
1117
1118        for source in &self.sources {
1119            writeln!(f, "")?;
1120            writeln!(f, "source {}:", source.name)?;
1121            writeln!(
1122                f,
1123                "                  read frontier:{:?}",
1124                source
1125                    .read_frontier
1126                    .iter()
1127                    .map(|t| t.display(timeline))
1128                    .collect::<Vec<_>>()
1129            )?;
1130            writeln!(
1131                f,
1132                "                 write frontier:{:?}",
1133                source
1134                    .write_frontier
1135                    .iter()
1136                    .map(|t| t.display(timeline))
1137                    .collect::<Vec<_>>()
1138            )?;
1139        }
1140
1141        if let Some(constraints) = &self.determination.constraints {
1142            writeln!(f, "")?;
1143            writeln!(f, "binding constraints:")?;
1144            write!(f, "{}", constraints.display(timeline))?;
1145        }
1146
1147        Ok(())
1148    }
1149}
1150
1151/// Types and logic in support of a constraint-based approach to timestamp determination.
1152mod constraints {
1153
1154    use core::fmt;
1155    use std::fmt::Debug;
1156
1157    use differential_dataflow::lattice::Lattice;
1158    use mz_storage_types::sources::Timeline;
1159    use serde::{Deserialize, Serialize};
1160    use timely::progress::{Antichain, Timestamp};
1161
1162    use mz_compute_types::ComputeInstanceId;
1163    use mz_repr::GlobalId;
1164    use mz_sql::session::vars::IsolationLevel;
1165
1166    use super::DisplayableInTimeline;
1167
1168    /// Constraints expressed on the timestamp of a query.
1169    ///
1170    /// The constraints are expressed on the minimum and maximum values,
1171    /// resulting in a (possibly empty) interval of valid timestamps.
1172    ///
1173    /// The constraints may be redundant, in the interest of providing
1174    /// more complete explanations, but they may also be minimized at
1175    /// any point without altering their behavior by removing redundant
1176    /// constraints.
1177    ///
1178    /// When combined with a `Preference` one can determine an
1179    /// ideal timestamp to use.
1180    #[derive(Default, Serialize, Deserialize, Clone)]
1181    pub struct Constraints {
1182        /// Timestamps and reasons that impose an inclusive lower bound.
1183        pub lower: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
1184        /// Timestamps and reasons that impose an inclusive upper bound.
1185        pub upper: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
1186    }
1187
1188    impl DisplayableInTimeline for Constraints {
1189        fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
1190            if !self.lower.is_empty() {
1191                writeln!(f, "lower:")?;
1192                for (ts, reason) in &self.lower {
1193                    let ts = ts.iter().map(|t| t.display(timeline)).collect::<Vec<_>>();
1194                    writeln!(f, "  ({:?}): {:?}", reason, ts)?;
1195                }
1196            }
1197            if !self.upper.is_empty() {
1198                writeln!(f, "upper:")?;
1199                for (ts, reason) in &self.upper {
1200                    let ts = ts.iter().map(|t| t.display(timeline)).collect::<Vec<_>>();
1201                    writeln!(f, "  ({:?}): {:?}", reason, ts)?;
1202                }
1203            }
1204            Ok(())
1205        }
1206    }
1207
1208    impl Debug for Constraints {
1209        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1210            self.display(None).fmt(f)?;
1211            Ok(())
1212        }
1213    }
1214
1215    impl Constraints {
1216        /// Remove constraints that are dominated by other constraints.
1217        ///
1218        /// This removes redundant constraints, without removing constraints
1219        /// that are "tight" in the sense that the interval would be
1220        /// meaningfully different without them.
1221        /// For example, two constraints at the same
1222        /// time will both be retained, in the interest of full information.
1223        /// But a lower bound constraint at time `t` will be removed if there is a
1224        /// constraint at time `t + 1` (or any larger time).
1225        pub fn minimize(&mut self) {
1226            // Establish the upper bound of lower constraints.
1227            let lower_frontier = self.lower_bound();
1228            // Retain constraints that intersect `lower_frontier`.
1229            self.lower.retain(|(anti, _)| {
1230                anti.iter()
1231                    .any(|time| lower_frontier.elements().contains(time))
1232            });
1233
1234            // Establish the lower bound of upper constraints.
1235            let upper_frontier = self.upper_bound();
1236            // Retain constraints that intersect `upper_frontier`.
1237            self.upper.retain(|(anti, _)| {
1238                anti.iter()
1239                    .any(|time| upper_frontier.elements().contains(time))
1240            });
1241        }
1242
1243        /// An antichain equal to the least upper bound of lower bounds.
1244        pub fn lower_bound(&self) -> Antichain<mz_repr::Timestamp> {
1245            let mut lower = Antichain::from_elem(mz_repr::Timestamp::minimum());
1246            for (anti, _) in self.lower.iter() {
1247                lower = lower.join(anti);
1248            }
1249            lower
1250        }
1251        /// An antichain equal to the greatest lower bound of upper bounds.
1252        pub fn upper_bound(&self) -> Antichain<mz_repr::Timestamp> {
1253            self.upper
1254                .iter()
1255                .flat_map(|(anti, _)| anti.iter())
1256                .cloned()
1257                .collect()
1258        }
1259    }
1260
1261    /// An explanation of reasons for a timestamp constraint.
1262    #[derive(Serialize, Deserialize, Clone)]
1263    pub enum Reason {
1264        /// A compute input at a compute instance.
1265        /// This is something like an index or view
1266        /// that is mantained by compute.
1267        ComputeInput(Vec<(ComputeInstanceId, GlobalId)>),
1268        /// A storage input.
1269        StorageInput(Vec<GlobalId>),
1270        /// A specified isolation level and the timestamp it requires.
1271        IsolationLevel(IsolationLevel),
1272        /// Real-time recency may constrains the timestamp from below.
1273        RealTimeRecency,
1274        /// The query expressed its own constraint on the timestamp.
1275        QueryAsOf,
1276    }
1277
1278    impl Debug for Reason {
1279        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1280            match self {
1281                Reason::ComputeInput(ids) => write_split_ids(f, "ComputeInput", ids),
1282                Reason::StorageInput(ids) => write_split_ids(f, "StorageInput", ids),
1283                Reason::IsolationLevel(level) => {
1284                    write!(f, "IsolationLevel({:?})", level)
1285                }
1286                Reason::RealTimeRecency => {
1287                    write!(f, "RealTimeRecency")
1288                }
1289                Reason::QueryAsOf => {
1290                    write!(f, "QueryAsOf")
1291                }
1292            }
1293        }
1294    }
1295
1296    //TODO: This is a bit of a hack to make the debug output of constraints more readable.
1297    //We should probably have a more structured way to do this.
1298    fn write_split_ids<T: Debug>(f: &mut fmt::Formatter, label: &str, ids: &[T]) -> fmt::Result {
1299        let (ids, rest) = if ids.len() > 10 {
1300            ids.split_at(10)
1301        } else {
1302            let rest: &[T] = &[];
1303            (ids, rest)
1304        };
1305        if rest.is_empty() {
1306            write!(f, "{}({:?})", label, ids)
1307        } else {
1308            write!(f, "{}({:?}, ... {} more)", label, ids, rest.len())
1309        }
1310    }
1311
1312    /// Given an interval [read, write) of timestamp options,
1313    /// this expresses a preference for either end of the spectrum.
1314    pub enum Preference {
1315        /// Prefer the greatest timestamp immediately available.
1316        ///
1317        /// This considers the immediate inputs to a query and
1318        /// selects the greatest timestamp not greater or equal
1319        /// to any of their write frontiers.
1320        ///
1321        /// The preference only relates to immediate query inputs,
1322        /// but it could be extended to transitive inputs as well.
1323        /// For example, one could imagine prefering the freshest
1324        /// data known to be ingested into Materialize, under the
1325        /// premise that those answers should soon become available,
1326        /// and may be more fresh than the immediate inputs.
1327        FreshestAvailable,
1328        /// Prefer the least valid timeastamp.
1329        ///
1330        /// This is useful when one has no expressed freshness
1331        /// constraints, and wants to minimally impact others.
1332        /// For example, `AS OF AT LEAST <time>`.
1333        StalestValid,
1334    }
1335}