mz_adapter/coord/
timestamp_selection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for selecting timestamps for various operations on collections.
11
12use std::fmt;
13
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use constraints::Constraints;
17use differential_dataflow::lattice::Lattice;
18use itertools::Itertools;
19use mz_adapter_types::dyncfgs::CONSTRAINT_BASED_TIMESTAMP_SELECTION;
20use mz_adapter_types::timestamp_selection::ConstraintBasedTimestampSelection;
21use mz_compute_types::ComputeInstanceId;
22use mz_expr::MirScalarExpr;
23use mz_ore::cast::CastLossy;
24use mz_ore::soft_assert_eq_or_log;
25use mz_repr::explain::ExprHumanizer;
26use mz_repr::{GlobalId, RowArena, ScalarType, Timestamp, TimestampManipulation};
27use mz_sql::plan::QueryWhen;
28use mz_sql::session::metadata::SessionMetadata;
29use mz_sql::session::vars::IsolationLevel;
30use mz_storage_types::sources::Timeline;
31use serde::{Deserialize, Serialize};
32use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
33use tracing::{Level, event};
34
35use crate::AdapterError;
36use crate::catalog::CatalogState;
37use crate::coord::Coordinator;
38use crate::coord::id_bundle::CollectionIdBundle;
39use crate::coord::read_policy::ReadHolds;
40use crate::coord::timeline::TimelineContext;
41use crate::optimize::dataflows::{ExprPrepStyle, prep_scalar_expr};
42use crate::session::Session;
43
44/// The timeline and timestamp context of a read.
45#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
46pub enum TimestampContext<T> {
47    /// Read is executed in a specific timeline with a specific timestamp.
48    TimelineTimestamp {
49        timeline: Timeline,
50        /// The timestamp that was chosen for a read. This can differ from the
51        /// `oracle_ts` when collections are not readable at the (linearized)
52        /// timestamp for the oracle. In those cases (when the chosen timestamp
53        /// is further ahead than the oracle timestamp) we have to delay
54        /// returning peek results until the timestamp oracle is also
55        /// sufficiently advanced.
56        chosen_ts: T,
57        /// The timestamp that would have been chosen for the read by the
58        /// (linearized) timestamp oracle). In most cases this will be picked as
59        /// the `chosen_ts`.
60        oracle_ts: Option<T>,
61    },
62    /// Read is execute without a timeline or timestamp.
63    NoTimestamp,
64}
65
66impl<T: TimestampManipulation> TimestampContext<T> {
67    /// Creates a `TimestampContext` from a timestamp and `TimelineContext`.
68    pub fn from_timeline_context(
69        chosen_ts: T,
70        oracle_ts: Option<T>,
71        transaction_timeline: Option<Timeline>,
72        timeline_context: &TimelineContext,
73    ) -> TimestampContext<T> {
74        match timeline_context {
75            TimelineContext::TimelineDependent(timeline) => {
76                if let Some(transaction_timeline) = transaction_timeline {
77                    assert_eq!(timeline, &transaction_timeline);
78                }
79                Self::TimelineTimestamp {
80                    timeline: timeline.clone(),
81                    chosen_ts,
82                    oracle_ts,
83                }
84            }
85            TimelineContext::TimestampDependent => {
86                // We default to the `Timeline::EpochMilliseconds` timeline if one doesn't exist.
87                Self::TimelineTimestamp {
88                    timeline: transaction_timeline.unwrap_or(Timeline::EpochMilliseconds),
89                    chosen_ts,
90                    oracle_ts,
91                }
92            }
93            TimelineContext::TimestampIndependent => Self::NoTimestamp,
94        }
95    }
96
97    /// The timeline belonging to this context, if one exists.
98    pub fn timeline(&self) -> Option<&Timeline> {
99        self.timeline_timestamp().map(|tt| tt.0)
100    }
101
102    /// The timestamp belonging to this context, if one exists.
103    pub fn timestamp(&self) -> Option<&T> {
104        self.timeline_timestamp().map(|tt| tt.1)
105    }
106
107    /// The timeline and timestamp belonging to this context, if one exists.
108    pub fn timeline_timestamp(&self) -> Option<(&Timeline, &T)> {
109        match self {
110            Self::TimelineTimestamp {
111                timeline,
112                chosen_ts,
113                ..
114            } => Some((timeline, chosen_ts)),
115            Self::NoTimestamp => None,
116        }
117    }
118
119    /// The timestamp belonging to this context, or a sensible default if one does not exists.
120    pub fn timestamp_or_default(&self) -> T {
121        match self {
122            Self::TimelineTimestamp { chosen_ts, .. } => chosen_ts.clone(),
123            // Anything without a timestamp is given the maximum possible timestamp to indicate
124            // that they have been closed up until the end of time. This allows us to SUBSCRIBE to
125            // static views.
126            Self::NoTimestamp => T::maximum(),
127        }
128    }
129
130    /// Whether or not the context contains a timestamp.
131    pub fn contains_timestamp(&self) -> bool {
132        self.timestamp().is_some()
133    }
134
135    /// Converts this `TimestampContext` to an `Antichain`.
136    pub fn antichain(&self) -> Antichain<T> {
137        Antichain::from_elem(self.timestamp_or_default())
138    }
139}
140
141#[async_trait(?Send)]
142impl TimestampProvider for Coordinator {
143    /// Reports a collection's current read frontier.
144    fn compute_read_frontier(
145        &self,
146        instance: ComputeInstanceId,
147        id: GlobalId,
148    ) -> Antichain<Timestamp> {
149        self.controller
150            .compute
151            .collection_frontiers(id, Some(instance))
152            .expect("id does not exist")
153            .read_frontier
154    }
155
156    /// Reports a collection's current write frontier.
157    fn compute_write_frontier(
158        &self,
159        instance: ComputeInstanceId,
160        id: GlobalId,
161    ) -> Antichain<Timestamp> {
162        self.controller
163            .compute
164            .collection_frontiers(id, Some(instance))
165            .expect("id does not exist")
166            .write_frontier
167    }
168
169    fn storage_frontiers(
170        &self,
171        ids: Vec<GlobalId>,
172    ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)> {
173        self.controller
174            .storage
175            .collections_frontiers(ids)
176            .expect("missing collections")
177    }
178
179    fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds<Timestamp> {
180        self.acquire_read_holds(id_bundle)
181    }
182
183    fn catalog_state(&self) -> &CatalogState {
184        self.catalog().state()
185    }
186}
187
188/// A timestamp determination, which includes the timestamp, constraints, and session oracle read
189/// timestamp.
190#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct RawTimestampDetermination<T> {
192    pub timestamp: T,
193    pub constraints: Option<Constraints>,
194    pub session_oracle_read_ts: Option<T>,
195}
196
197#[async_trait(?Send)]
198pub trait TimestampProvider {
199    fn compute_read_frontier(
200        &self,
201        instance: ComputeInstanceId,
202        id: GlobalId,
203    ) -> Antichain<Timestamp>;
204    fn compute_write_frontier(
205        &self,
206        instance: ComputeInstanceId,
207        id: GlobalId,
208    ) -> Antichain<Timestamp>;
209
210    /// Returns the implied capability (since) and write frontier (upper) for
211    /// the specified storage collections.
212    fn storage_frontiers(
213        &self,
214        ids: Vec<GlobalId>,
215    ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)>;
216
217    fn catalog_state(&self) -> &CatalogState;
218
219    fn get_timeline(timeline_context: &TimelineContext) -> Option<Timeline> {
220        let timeline = match timeline_context {
221            TimelineContext::TimelineDependent(timeline) => Some(timeline.clone()),
222            // We default to the `Timeline::EpochMilliseconds` timeline if one doesn't exist.
223            TimelineContext::TimestampDependent => Some(Timeline::EpochMilliseconds),
224            TimelineContext::TimestampIndependent => None,
225        };
226
227        timeline
228    }
229
230    /// Returns true if-and-only-if the given configuration needs a linearized
231    /// read timestamp from a timestamp oracle.
232    ///
233    /// This assumes that the query happens in the context of a timeline. If
234    /// there is no timeline, we cannot and don't have to get a linearized read
235    /// timestamp.
236    fn needs_linearized_read_ts(isolation_level: &IsolationLevel, when: &QueryWhen) -> bool {
237        // When we're in the context of a timeline (assumption) and one of these
238        // scenarios hold, we need to use a linearized read timestamp:
239        // - The isolation level is Strict Serializable and the `when` allows us to use the
240        //   the timestamp oracle (ex: queries with no AS OF).
241        // - The `when` requires us to use the timestamp oracle (ex: read-then-write queries).
242        when.must_advance_to_timeline_ts()
243            || (when.can_advance_to_timeline_ts()
244                && matches!(
245                    isolation_level,
246                    IsolationLevel::StrictSerializable | IsolationLevel::StrongSessionSerializable
247                ))
248    }
249
250    /// Determines the timestamp for a query using the classical logic (as opposed to constraint-based).
251    fn determine_timestamp_classical(
252        &self,
253        session: &Session,
254        read_holds: &ReadHolds<Timestamp>,
255        id_bundle: &CollectionIdBundle,
256        when: &QueryWhen,
257        oracle_read_ts: Option<Timestamp>,
258        compute_instance: ComputeInstanceId,
259        real_time_recency_ts: Option<Timestamp>,
260        isolation_level: &IsolationLevel,
261        timeline: &Option<Timeline>,
262        largest_not_in_advance_of_upper: Timestamp,
263        since: &Antichain<Timestamp>,
264    ) -> Result<RawTimestampDetermination<Timestamp>, AdapterError> {
265        let mut session_oracle_read_ts = None;
266        // Each involved trace has a validity interval `[since, upper)`.
267        // The contents of a trace are only guaranteed to be correct when
268        // accumulated at a time greater or equal to `since`, and they
269        // are only guaranteed to be currently present for times not
270        // greater or equal to `upper`.
271        //
272        // The plan is to first determine a timestamp, based on the requested
273        // timestamp policy, and then determine if it can be satisfied using
274        // the compacted arrangements we have at hand. It remains unresolved
275        // what to do if it cannot be satisfied (perhaps the query should use
276        // a larger timestamp and block, perhaps the user should intervene).
277
278        {
279            // TODO: We currently split out getting the oracle timestamp because
280            // it's a potentially expensive call, but a call that can be done in an
281            // async task. TimestampProvider is not Send (nor Sync), so we cannot do
282            // the call to `determine_timestamp_for` (including the oracle call) on
283            // an async task. If/when TimestampProvider can become Send, we can fold
284            // the call to the TimestampOracle back into this function.
285            //
286            // We assert here that the logic that determines the oracle timestamp
287            // matches our expectations.
288
289            if timeline.is_some() && Self::needs_linearized_read_ts(isolation_level, when) {
290                assert!(
291                    oracle_read_ts.is_some(),
292                    "should get a timestamp from the oracle for linearized timeline {:?} but didn't",
293                    timeline
294                );
295            }
296        }
297
298        // Initialize candidate to the minimum correct time.
299        let mut candidate = Timestamp::minimum();
300
301        if let Some(timestamp) = when.advance_to_timestamp() {
302            let catalog_state = self.catalog_state();
303            let ts = Coordinator::evaluate_when(catalog_state, timestamp, session)?;
304            candidate.join_assign(&ts);
305        }
306
307        if when.advance_to_since() {
308            candidate.advance_by(since.borrow());
309        }
310
311        // If we've acquired a read timestamp from the timestamp oracle, use it
312        // as the new lower bound for the candidate.
313        // In Strong Session Serializable, we ignore the oracle timestamp for now, unless we need
314        // to use it.
315        if let Some(timestamp) = &oracle_read_ts {
316            if isolation_level != &IsolationLevel::StrongSessionSerializable
317                || when.must_advance_to_timeline_ts()
318            {
319                candidate.join_assign(timestamp);
320            }
321        }
322
323        // We advance to the upper in the following scenarios:
324        // - The isolation level is Serializable and the `when` allows us to advance to upper (ex:
325        //   queries with no AS OF). We avoid using the upper in Strict Serializable to prevent
326        //   reading source data that is being written to in the future.
327        // - The isolation level is Strict Serializable but there is no timelines and the `when`
328        //   allows us to advance to upper.
329        if when.can_advance_to_upper()
330            && (isolation_level == &IsolationLevel::Serializable || timeline.is_none())
331        {
332            candidate.join_assign(&largest_not_in_advance_of_upper);
333        }
334
335        if let Some(real_time_recency_ts) = real_time_recency_ts {
336            if !(session.vars().real_time_recency()
337                && isolation_level == &IsolationLevel::StrictSerializable)
338            {
339                // Erring on the side of caution, lets bail out here.
340                // This should never happen in practice, as the real time recency timestamp should
341                // only be supplied when real time recency is enabled.
342                coord_bail!(
343                    "real time recency timestamp should only be supplied when real time recency \
344                            is enabled and the isolation level is strict serializable"
345                );
346            }
347            candidate.join_assign(&real_time_recency_ts);
348        }
349
350        if isolation_level == &IsolationLevel::StrongSessionSerializable {
351            if let Some(timeline) = &timeline {
352                if let Some(oracle) = session.get_timestamp_oracle(timeline) {
353                    let session_ts = oracle.read_ts();
354                    candidate.join_assign(&session_ts);
355                    session_oracle_read_ts = Some(session_ts);
356                }
357            }
358
359            // When advancing the read timestamp under Strong Session Serializable, there is a
360            // trade-off to make between freshness and latency. We can choose a timestamp close the
361            // `upper`, but then later queries might block if the `upper` is too far into the
362            // future. We can chose a timestamp close to the current time, but then we may not be
363            // getting results that are as fresh as possible. As a heuristic, we choose the minimum
364            // of now and the upper, where we use the global timestamp oracle read timestamp as a
365            // proxy for now. If upper > now, then we choose now and prevent blocking future
366            // queries. If upper < now, then we choose the upper and prevent blocking the current
367            // query.
368            if when.can_advance_to_upper() && when.can_advance_to_timeline_ts() {
369                let mut advance_to = largest_not_in_advance_of_upper;
370                if let Some(oracle_read_ts) = oracle_read_ts {
371                    advance_to = std::cmp::min(advance_to, oracle_read_ts);
372                }
373                candidate.join_assign(&advance_to);
374            }
375        }
376
377        // If the timestamp is greater or equal to some element in `since` we are
378        // assured that the answer will be correct.
379        //
380        // It's ok for this timestamp to be larger than the current timestamp of
381        // the timestamp oracle. For Strict Serializable queries, the Coord will
382        // linearize the query by holding back the result until the timestamp
383        // oracle catches up.
384        let timestamp = if since.less_equal(&candidate) {
385            event!(
386                Level::DEBUG,
387                conn_id = format!("{}", session.conn_id()),
388                since = format!("{since:?}"),
389                largest_not_in_advance_of_upper = format!("{largest_not_in_advance_of_upper}"),
390                timestamp = format!("{candidate}")
391            );
392            candidate
393        } else {
394            coord_bail!(generate_timestamp_not_valid_error_msg(
395                id_bundle,
396                compute_instance,
397                read_holds,
398                candidate
399            ));
400        };
401        Ok(RawTimestampDetermination {
402            timestamp,
403            constraints: None,
404            session_oracle_read_ts,
405        })
406    }
407
408    /// Uses constraints and preferences to determine a timestamp for a query.
409    /// Returns the determined timestamp, the constraints that were applied, and
410    /// session_oracle_read_ts.
411    fn determine_timestamp_via_constraints(
412        &self,
413        session: &Session,
414        read_holds: &ReadHolds<Timestamp>,
415        id_bundle: &CollectionIdBundle,
416        when: &QueryWhen,
417        oracle_read_ts: Option<Timestamp>,
418        compute_instance: ComputeInstanceId,
419        real_time_recency_ts: Option<Timestamp>,
420        isolation_level: &IsolationLevel,
421        timeline: &Option<Timeline>,
422        largest_not_in_advance_of_upper: Timestamp,
423    ) -> Result<RawTimestampDetermination<Timestamp>, AdapterError> {
424        use constraints::{Constraints, Preference, Reason};
425
426        let mut session_oracle_read_ts = None;
427        // We start by establishing the hard constraints that must be applied to timestamp determination.
428        // These constraints are derived from the input arguments, and properties of the collections involved.
429        // TODO: Many of the constraints are expressed obliquely, and could be made more direct.
430        let constraints = {
431            // Constraints we will populate through a sequence of opinions.
432            let mut constraints = Constraints::default();
433
434            // First, we have validity constraints from the `id_bundle` argument which indicates
435            // which collections we are reading from.
436            // TODO: Refine the detail about which identifiers are binding and which are not.
437            // TODO(dov): It's not entirely clear to me that there ever would be a non
438            // binding constraint introduced by the `id_bundle`. We should revisit this.
439            let since = read_holds.least_valid_read();
440            let storage = id_bundle
441                .storage_ids
442                .iter()
443                .cloned()
444                .collect::<Vec<GlobalId>>();
445            if !storage.is_empty() {
446                constraints
447                    .lower
448                    .push((since.clone(), Reason::StorageInput(storage)));
449            }
450            let compute = id_bundle
451                .compute_ids
452                .iter()
453                .flat_map(|(key, ids)| ids.iter().map(|id| (*key, *id)))
454                .collect::<Vec<(ComputeInstanceId, GlobalId)>>();
455            if !compute.is_empty() {
456                constraints
457                    .lower
458                    .push((since.clone(), Reason::ComputeInput(compute)));
459            }
460
461            // The query's `when` may indicates a specific timestamp we must advance to, or a specific value we must use.
462            if let Some(timestamp) = when.advance_to_timestamp() {
463                let catalog_state = self.catalog_state();
464                let ts = Coordinator::evaluate_when(catalog_state, timestamp, session)?;
465                constraints
466                    .lower
467                    .push((Antichain::from_elem(ts), Reason::QueryAsOf));
468                // If the query is at a specific timestamp, we must introduce an upper bound as well.
469                if when.constrains_upper() {
470                    constraints
471                        .upper
472                        .push((Antichain::from_elem(ts), Reason::QueryAsOf));
473                }
474            }
475
476            // The specification of an `oracle_read_ts` may indicates that we must advance to it,
477            // except in one isolation mode, or if `when` does not indicate that we should.
478            // At the moment, only `QueryWhen::FreshestTableWrite` indicates that we should.
479            // TODO: Should this just depend on the isolation level?
480            if let Some(timestamp) = &oracle_read_ts {
481                if isolation_level != &IsolationLevel::StrongSessionSerializable
482                    || when.must_advance_to_timeline_ts()
483                {
484                    // When specification of an `oracle_read_ts` is required, we must advance to it.
485                    // If it's not present, lets bail out.
486                    constraints.lower.push((
487                        Antichain::from_elem(*timestamp),
488                        Reason::IsolationLevel(*isolation_level),
489                    ));
490                }
491            }
492
493            // If a real time recency timestamp is supplied, we must advance to it.
494            if let Some(real_time_recency_ts) = real_time_recency_ts {
495                assert!(
496                    session.vars().real_time_recency()
497                        && isolation_level == &IsolationLevel::StrictSerializable,
498                    "real time recency timestamp should only be supplied when real time recency \
499                                is enabled and the isolation level is strict serializable"
500                );
501                constraints.lower.push((
502                    Antichain::from_elem(real_time_recency_ts),
503                    Reason::RealTimeRecency,
504                ));
505            }
506
507            // If we are operating in Strong Session Serializable, we use an alternate timestamp lower bound.
508            if isolation_level == &IsolationLevel::StrongSessionSerializable {
509                if let Some(timeline) = &timeline {
510                    if let Some(oracle) = session.get_timestamp_oracle(timeline) {
511                        let session_ts = oracle.read_ts();
512                        constraints.lower.push((
513                            Antichain::from_elem(session_ts),
514                            Reason::IsolationLevel(*isolation_level),
515                        ));
516                        session_oracle_read_ts = Some(session_ts);
517                    }
518
519                    // When advancing the read timestamp under Strong Session Serializable, there is a
520                    // trade-off to make between freshness and latency. We can choose a timestamp close the
521                    // `upper`, but then later queries might block if the `upper` is too far into the
522                    // future. We can chose a timestamp close to the current time, but then we may not be
523                    // getting results that are as fresh as possible. As a heuristic, we choose the minimum
524                    // of now and the upper, where we use the global timestamp oracle read timestamp as a
525                    // proxy for now. If upper > now, then we choose now and prevent blocking future
526                    // queries. If upper < now, then we choose the upper and prevent blocking the current
527                    // query.
528                    if when.can_advance_to_upper() && when.can_advance_to_timeline_ts() {
529                        let mut advance_to = largest_not_in_advance_of_upper;
530                        if let Some(oracle_read_ts) = oracle_read_ts {
531                            advance_to = std::cmp::min(advance_to, oracle_read_ts);
532                        }
533                        constraints.lower.push((
534                            Antichain::from_elem(advance_to),
535                            Reason::IsolationLevel(*isolation_level),
536                        ));
537                    }
538                }
539            }
540
541            constraints.minimize();
542            constraints
543        };
544
545        // Next we establish the preferences that we would like to apply to timestamp determination.
546        // Generally, we want to choose the freshest timestamp possible, although there are exceptions
547        // when we either want a maximally *stale* timestamp, or we want to protect other queries from
548        // a recklessly advanced timestamp.
549        let preferences = {
550            // Counter-intuitively, the only `when` that allows `can_advance_to_upper` is `Immediately`,
551            // and not `FreshestTableWrite`. This is because `FreshestTableWrite` instead imposes a lower
552            // bound through the `oracle_read_ts`, and then requires the stalest valid timestamp.
553
554            if when.can_advance_to_upper()
555                && (isolation_level == &IsolationLevel::Serializable || timeline.is_none())
556            {
557                Preference::FreshestAvailable
558            } else {
559                Preference::StalestValid
560            }
561
562            // TODO: `StrongSessionSerializable` has a different set of preferences that starts to tease
563            // out the trade-off between freshness and responsiveness. I think we don't yet know enough
564            // to properly frame these preferences, though they are clearly aimed at the right concerns.
565        };
566
567        // Determine a candidate based on constraints and preferences.
568        let constraint_candidate = {
569            let mut candidate = Timestamp::minimum();
570            candidate.advance_by(constraints.lower_bound().borrow());
571            // If we have a preference to be the freshest available, advance to the minimum
572            // of the upper bound constraints and the `largest_not_in_advance_of_upper`.
573            if let Preference::FreshestAvailable = preferences {
574                let mut upper_bound = constraints.upper_bound();
575                upper_bound.insert(largest_not_in_advance_of_upper);
576                candidate.advance_by(upper_bound.borrow());
577            }
578            // If the candidate strictly exceeds the upper bound, we didn't have a viable timestamp.
579            if constraints.upper_bound().less_than(&candidate) {
580                coord_bail!(generate_timestamp_not_valid_error_msg(
581                    id_bundle,
582                    compute_instance,
583                    read_holds,
584                    candidate
585                ));
586            } else {
587                candidate
588            }
589        };
590
591        Ok(RawTimestampDetermination {
592            timestamp: constraint_candidate,
593            constraints: Some(constraints),
594            session_oracle_read_ts,
595        })
596    }
597
598    /// Determines the timestamp for a query.
599    ///
600    /// Timestamp determination may fail due to the restricted validity of
601    /// traces. Each has a `since` and `upper` frontier, and are only valid
602    /// after `since` and sure to be available not after `upper`.
603    ///
604    /// The timeline that `id_bundle` belongs to is also returned, if one exists.
605    fn determine_timestamp_for(
606        &self,
607        session: &Session,
608        id_bundle: &CollectionIdBundle,
609        when: &QueryWhen,
610        compute_instance: ComputeInstanceId,
611        timeline_context: &TimelineContext,
612        oracle_read_ts: Option<Timestamp>,
613        real_time_recency_ts: Option<mz_repr::Timestamp>,
614        isolation_level: &IsolationLevel,
615        constraint_based: &ConstraintBasedTimestampSelection,
616    ) -> Result<
617        (
618            TimestampDetermination<mz_repr::Timestamp>,
619            ReadHolds<mz_repr::Timestamp>,
620        ),
621        AdapterError,
622    > {
623        // First, we acquire read holds that will ensure the queried collections
624        // stay queryable at the chosen timestamp.
625        let read_holds = self.acquire_read_holds(id_bundle);
626        let timeline = Self::get_timeline(timeline_context);
627
628        let since = read_holds.least_valid_read();
629        let upper = self.least_valid_write(id_bundle);
630        let largest_not_in_advance_of_upper = Coordinator::largest_not_in_advance_of_upper(&upper);
631
632        let raw_determination = match constraint_based {
633            ConstraintBasedTimestampSelection::Disabled => self.determine_timestamp_classical(
634                session,
635                &read_holds,
636                id_bundle,
637                when,
638                oracle_read_ts,
639                compute_instance,
640                real_time_recency_ts,
641                isolation_level,
642                &timeline,
643                largest_not_in_advance_of_upper,
644                &since,
645            )?,
646            ConstraintBasedTimestampSelection::Enabled => self
647                .determine_timestamp_via_constraints(
648                    session,
649                    &read_holds,
650                    id_bundle,
651                    when,
652                    oracle_read_ts,
653                    compute_instance,
654                    real_time_recency_ts,
655                    isolation_level,
656                    &timeline,
657                    largest_not_in_advance_of_upper,
658                )?,
659            ConstraintBasedTimestampSelection::Verify => {
660                let classical_determination = self.determine_timestamp_classical(
661                    session,
662                    &read_holds,
663                    id_bundle,
664                    when,
665                    oracle_read_ts,
666                    compute_instance,
667                    real_time_recency_ts,
668                    isolation_level,
669                    &timeline,
670                    largest_not_in_advance_of_upper,
671                    &since,
672                )?;
673
674                match self.determine_timestamp_via_constraints(
675                    session,
676                    &read_holds,
677                    id_bundle,
678                    when,
679                    oracle_read_ts,
680                    compute_instance,
681                    real_time_recency_ts,
682                    isolation_level,
683                    &timeline,
684                    largest_not_in_advance_of_upper,
685                ) {
686                    Ok(constraint_determination) => {
687                        soft_assert_eq_or_log!(
688                            classical_determination.timestamp,
689                            constraint_determination.timestamp,
690                            "timestamp determination mismatch"
691                        );
692                        if classical_determination.timestamp != constraint_determination.timestamp {
693                            tracing::info!(
694                                "timestamp constrains: {:?}",
695                                constraint_determination.constraints
696                            );
697                        }
698                        RawTimestampDetermination {
699                            timestamp: classical_determination.timestamp,
700                            constraints: constraint_determination.constraints,
701                            session_oracle_read_ts: classical_determination.session_oracle_read_ts,
702                        }
703                    }
704                    Err(e) => {
705                        event!(Level::ERROR, error = ?e, "constraint-based timestamp determination failed");
706                        RawTimestampDetermination {
707                            timestamp: classical_determination.timestamp,
708                            constraints: classical_determination.constraints,
709                            session_oracle_read_ts: classical_determination.session_oracle_read_ts,
710                        }
711                    }
712                }
713            }
714        };
715
716        let timestamp_context = TimestampContext::from_timeline_context(
717            raw_determination.timestamp,
718            oracle_read_ts,
719            timeline,
720            timeline_context,
721        );
722
723        let determination = TimestampDetermination {
724            timestamp_context,
725            since,
726            upper,
727            largest_not_in_advance_of_upper,
728            oracle_read_ts,
729            session_oracle_read_ts: raw_determination.session_oracle_read_ts,
730            real_time_recency_ts,
731            constraints: raw_determination.constraints,
732        };
733
734        Ok((determination, read_holds))
735    }
736
737    /// Acquires [ReadHolds], for the given `id_bundle` at the earliest possible
738    /// times.
739    fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds<mz_repr::Timestamp>;
740
741    /// The smallest common valid write frontier among the specified collections.
742    ///
743    /// Times that are not greater or equal to this frontier are complete for all collections
744    /// identified as arguments.
745    fn least_valid_write(&self, id_bundle: &CollectionIdBundle) -> Antichain<mz_repr::Timestamp> {
746        let mut upper = Antichain::new();
747        {
748            for (_id, _since, collection_upper) in
749                self.storage_frontiers(id_bundle.storage_ids.iter().cloned().collect_vec())
750            {
751                upper.extend(collection_upper);
752            }
753        }
754        {
755            for (instance, compute_ids) in &id_bundle.compute_ids {
756                for id in compute_ids.iter() {
757                    upper.extend(self.compute_write_frontier(*instance, *id).into_iter());
758                }
759            }
760        }
761        upper
762    }
763
764    /// Returns `least_valid_write` - 1, i.e., each time in `least_valid_write` stepped back in a
765    /// saturating way.
766    fn greatest_available_read(&self, id_bundle: &CollectionIdBundle) -> Antichain<Timestamp> {
767        let mut frontier = Antichain::new();
768        for t in self.least_valid_write(id_bundle) {
769            frontier.insert(t.step_back().unwrap_or(t));
770        }
771        frontier
772    }
773}
774
775fn generate_timestamp_not_valid_error_msg(
776    id_bundle: &CollectionIdBundle,
777    compute_instance: ComputeInstanceId,
778    read_holds: &ReadHolds<mz_repr::Timestamp>,
779    candidate: mz_repr::Timestamp,
780) -> String {
781    let mut invalid = Vec::new();
782
783    if let Some(compute_ids) = id_bundle.compute_ids.get(&compute_instance) {
784        for id in compute_ids {
785            let since = read_holds.since(id);
786            if !since.less_equal(&candidate) {
787                invalid.push((*id, since));
788            }
789        }
790    }
791
792    for id in id_bundle.storage_ids.iter() {
793        let since = read_holds.since(id);
794        if !since.less_equal(&candidate) {
795            invalid.push((*id, since));
796        }
797    }
798
799    format!(
800        "Timestamp ({}) is not valid for all inputs: {:?}",
801        candidate, invalid,
802    )
803}
804
805impl Coordinator {
806    pub(crate) async fn oracle_read_ts(
807        &self,
808        session: &Session,
809        timeline_ctx: &TimelineContext,
810        when: &QueryWhen,
811    ) -> Option<Timestamp> {
812        let isolation_level = session.vars().transaction_isolation().clone();
813        let timeline = Coordinator::get_timeline(timeline_ctx);
814        let needs_linearized_read_ts =
815            Coordinator::needs_linearized_read_ts(&isolation_level, when);
816
817        let oracle_read_ts = match timeline {
818            Some(timeline) if needs_linearized_read_ts => {
819                let timestamp_oracle = self.get_timestamp_oracle(&timeline);
820                Some(timestamp_oracle.read_ts().await)
821            }
822            Some(_) | None => None,
823        };
824
825        oracle_read_ts
826    }
827
828    /// Determines the timestamp for a query, acquires read holds that ensure the
829    /// query remains executable at that time, and returns those.
830    /// The caller is responsible for eventually dropping those read holds.
831    #[mz_ore::instrument(level = "debug")]
832    pub(crate) fn determine_timestamp(
833        &self,
834        session: &Session,
835        id_bundle: &CollectionIdBundle,
836        when: &QueryWhen,
837        compute_instance: ComputeInstanceId,
838        timeline_context: &TimelineContext,
839        oracle_read_ts: Option<Timestamp>,
840        real_time_recency_ts: Option<mz_repr::Timestamp>,
841    ) -> Result<
842        (
843            TimestampDetermination<mz_repr::Timestamp>,
844            ReadHolds<mz_repr::Timestamp>,
845        ),
846        AdapterError,
847    > {
848        let constraint_based = ConstraintBasedTimestampSelection::from_str(
849            &CONSTRAINT_BASED_TIMESTAMP_SELECTION
850                .get(self.catalog_state().system_config().dyncfgs()),
851        );
852
853        let isolation_level = session.vars().transaction_isolation();
854        let (det, read_holds) = self.determine_timestamp_for(
855            session,
856            id_bundle,
857            when,
858            compute_instance,
859            timeline_context,
860            oracle_read_ts,
861            real_time_recency_ts,
862            isolation_level,
863            &constraint_based,
864        )?;
865        self.metrics
866            .determine_timestamp
867            .with_label_values(&[
868                match det.respond_immediately() {
869                    true => "true",
870                    false => "false",
871                },
872                isolation_level.as_str(),
873                &compute_instance.to_string(),
874                constraint_based.as_str(),
875            ])
876            .inc();
877        if !det.respond_immediately()
878            && isolation_level == &IsolationLevel::StrictSerializable
879            && real_time_recency_ts.is_none()
880        {
881            if let Some(strict) = det.timestamp_context.timestamp() {
882                let (serializable_det, _tmp_read_holds) = self.determine_timestamp_for(
883                    session,
884                    id_bundle,
885                    when,
886                    compute_instance,
887                    timeline_context,
888                    oracle_read_ts,
889                    real_time_recency_ts,
890                    &IsolationLevel::Serializable,
891                    &constraint_based,
892                )?;
893
894                if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
895                    self.metrics
896                        .timestamp_difference_for_strict_serializable_ms
897                        .with_label_values(&[
898                            &compute_instance.to_string(),
899                            constraint_based.as_str(),
900                        ])
901                        .observe(f64::cast_lossy(u64::from(
902                            strict.saturating_sub(*serializable),
903                        )));
904                }
905            }
906        }
907        Ok((det, read_holds))
908    }
909
910    /// The largest timestamp not greater or equal to an element of `upper`.
911    ///
912    /// If no such timestamp exists, for example because `upper` contains only the
913    /// minimal timestamp, the return value is `Timestamp::minimum()`.
914    pub(crate) fn largest_not_in_advance_of_upper(
915        upper: &Antichain<mz_repr::Timestamp>,
916    ) -> mz_repr::Timestamp {
917        // We peek at the largest element not in advance of `upper`, which
918        // involves a subtraction. If `upper` contains a zero timestamp there
919        // is no "prior" answer, and we do not want to peek at it as it risks
920        // hanging awaiting the response to data that may never arrive.
921        if let Some(upper) = upper.as_option() {
922            upper.step_back().unwrap_or_else(Timestamp::minimum)
923        } else {
924            // A complete trace can be read in its final form with this time.
925            //
926            // This should only happen for literals that have no sources or sources that
927            // are known to have completed (non-tailed files for example).
928            Timestamp::MAX
929        }
930    }
931
932    pub(crate) fn evaluate_when(
933        catalog: &CatalogState,
934        mut timestamp: MirScalarExpr,
935        session: &Session,
936    ) -> Result<mz_repr::Timestamp, AdapterError> {
937        let temp_storage = RowArena::new();
938        prep_scalar_expr(&mut timestamp, ExprPrepStyle::AsOfUpTo)?;
939        let evaled = timestamp.eval(&[], &temp_storage)?;
940        if evaled.is_null() {
941            coord_bail!("can't use {} as a mz_timestamp for AS OF or UP TO", evaled);
942        }
943        let ty = timestamp.typ(&[]);
944        Ok(match ty.scalar_type {
945            ScalarType::MzTimestamp => evaled.unwrap_mz_timestamp(),
946            ScalarType::Numeric { .. } => {
947                let n = evaled.unwrap_numeric().0;
948                n.try_into()?
949            }
950            ScalarType::Int16 => i64::from(evaled.unwrap_int16()).try_into()?,
951            ScalarType::Int32 => i64::from(evaled.unwrap_int32()).try_into()?,
952            ScalarType::Int64 => evaled.unwrap_int64().try_into()?,
953            ScalarType::UInt16 => u64::from(evaled.unwrap_uint16()).into(),
954            ScalarType::UInt32 => u64::from(evaled.unwrap_uint32()).into(),
955            ScalarType::UInt64 => evaled.unwrap_uint64().into(),
956            ScalarType::TimestampTz { .. } => {
957                evaled.unwrap_timestamptz().timestamp_millis().try_into()?
958            }
959            ScalarType::Timestamp { .. } => evaled
960                .unwrap_timestamp()
961                .and_utc()
962                .timestamp_millis()
963                .try_into()?,
964            _ => coord_bail!(
965                "can't use {} as a mz_timestamp for AS OF or UP TO",
966                catalog
967                    .for_session(session)
968                    .humanize_column_type(&ty, false)
969            ),
970        })
971    }
972}
973
974/// Information used when determining the timestamp for a query.
975#[derive(Serialize, Deserialize, Debug, Clone)]
976pub struct TimestampDetermination<T> {
977    /// The chosen timestamp context from `determine_timestamp`.
978    pub timestamp_context: TimestampContext<T>,
979    /// The read frontier of all involved sources.
980    pub since: Antichain<T>,
981    /// The write frontier of all involved sources.
982    pub upper: Antichain<T>,
983    /// The largest timestamp not in advance of upper.
984    pub largest_not_in_advance_of_upper: T,
985    /// The value of the timeline's oracle timestamp, if used.
986    pub oracle_read_ts: Option<T>,
987    /// The value of the session local timestamp's oracle timestamp, if used.
988    pub session_oracle_read_ts: Option<T>,
989    /// The value of the real time recency timestamp, if used.
990    pub real_time_recency_ts: Option<T>,
991    /// The constraints used by the constraint based solver.
992    /// See the [`constraints`] module for more information.
993    pub constraints: Option<Constraints>,
994}
995
996impl<T: TimestampManipulation> TimestampDetermination<T> {
997    pub fn respond_immediately(&self) -> bool {
998        match &self.timestamp_context {
999            TimestampContext::TimelineTimestamp { chosen_ts, .. } => {
1000                !self.upper.less_equal(chosen_ts)
1001            }
1002            TimestampContext::NoTimestamp => true,
1003        }
1004    }
1005}
1006
1007/// Information used when determining the timestamp for a query.
1008#[derive(Clone, Debug, Serialize, Deserialize)]
1009pub struct TimestampExplanation<T> {
1010    /// The chosen timestamp from `determine_timestamp`.
1011    pub determination: TimestampDetermination<T>,
1012    /// Details about each source.
1013    pub sources: Vec<TimestampSource<T>>,
1014    /// Wall time of first statement executed in this transaction
1015    pub session_wall_time: DateTime<Utc>,
1016    /// Cached value of determination.respond_immediately()
1017    pub respond_immediately: bool,
1018}
1019
1020#[derive(Clone, Debug, Serialize, Deserialize)]
1021pub struct TimestampSource<T> {
1022    pub name: String,
1023    pub read_frontier: Vec<T>,
1024    pub write_frontier: Vec<T>,
1025}
1026
1027pub trait DisplayableInTimeline {
1028    fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result;
1029    fn display<'a>(&'a self, timeline: Option<&'a Timeline>) -> DisplayInTimeline<'a, Self> {
1030        DisplayInTimeline { t: self, timeline }
1031    }
1032}
1033
1034impl DisplayableInTimeline for mz_repr::Timestamp {
1035    fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
1036        if let Some(Timeline::EpochMilliseconds) = timeline {
1037            let ts_ms: u64 = self.into();
1038            if let Ok(ts_ms) = i64::try_from(ts_ms) {
1039                if let Some(ndt) = DateTime::from_timestamp_millis(ts_ms) {
1040                    return write!(f, "{:13} ({})", self, ndt.format("%Y-%m-%d %H:%M:%S%.3f"));
1041                }
1042            }
1043        }
1044        write!(f, "{:13}", self)
1045    }
1046}
1047
1048pub struct DisplayInTimeline<'a, T: ?Sized> {
1049    t: &'a T,
1050    timeline: Option<&'a Timeline>,
1051}
1052impl<'a, T> fmt::Display for DisplayInTimeline<'a, T>
1053where
1054    T: DisplayableInTimeline,
1055{
1056    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1057        self.t.fmt(self.timeline, f)
1058    }
1059}
1060
1061impl<'a, T> fmt::Debug for DisplayInTimeline<'a, T>
1062where
1063    T: DisplayableInTimeline,
1064{
1065    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1066        fmt::Display::fmt(&self, f)
1067    }
1068}
1069
1070impl<T: fmt::Display + fmt::Debug + DisplayableInTimeline + TimestampManipulation> fmt::Display
1071    for TimestampExplanation<T>
1072{
1073    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1074        let timeline = self.determination.timestamp_context.timeline();
1075        writeln!(
1076            f,
1077            "                query timestamp: {}",
1078            self.determination
1079                .timestamp_context
1080                .timestamp_or_default()
1081                .display(timeline)
1082        )?;
1083        if let Some(oracle_read_ts) = &self.determination.oracle_read_ts {
1084            writeln!(
1085                f,
1086                "          oracle read timestamp: {}",
1087                oracle_read_ts.display(timeline)
1088            )?;
1089        }
1090        if let Some(session_oracle_read_ts) = &self.determination.session_oracle_read_ts {
1091            writeln!(
1092                f,
1093                "  session oracle read timestamp: {}",
1094                session_oracle_read_ts.display(timeline)
1095            )?;
1096        }
1097        if let Some(real_time_recency_ts) = &self.determination.real_time_recency_ts {
1098            writeln!(
1099                f,
1100                "    real time recency timestamp: {}",
1101                real_time_recency_ts.display(timeline)
1102            )?;
1103        }
1104        writeln!(
1105            f,
1106            "largest not in advance of upper: {}",
1107            self.determination
1108                .largest_not_in_advance_of_upper
1109                .display(timeline),
1110        )?;
1111        writeln!(
1112            f,
1113            "                          upper:{:?}",
1114            self.determination
1115                .upper
1116                .iter()
1117                .map(|t| t.display(timeline))
1118                .collect::<Vec<_>>()
1119        )?;
1120        writeln!(
1121            f,
1122            "                          since:{:?}",
1123            self.determination
1124                .since
1125                .iter()
1126                .map(|t| t.display(timeline))
1127                .collect::<Vec<_>>()
1128        )?;
1129        writeln!(
1130            f,
1131            "        can respond immediately: {}",
1132            self.respond_immediately
1133        )?;
1134        writeln!(f, "                       timeline: {:?}", &timeline)?;
1135        writeln!(
1136            f,
1137            "              session wall time: {:13} ({})",
1138            self.session_wall_time.timestamp_millis(),
1139            self.session_wall_time.format("%Y-%m-%d %H:%M:%S%.3f"),
1140        )?;
1141
1142        for source in &self.sources {
1143            writeln!(f, "")?;
1144            writeln!(f, "source {}:", source.name)?;
1145            writeln!(
1146                f,
1147                "                  read frontier:{:?}",
1148                source
1149                    .read_frontier
1150                    .iter()
1151                    .map(|t| t.display(timeline))
1152                    .collect::<Vec<_>>()
1153            )?;
1154            writeln!(
1155                f,
1156                "                 write frontier:{:?}",
1157                source
1158                    .write_frontier
1159                    .iter()
1160                    .map(|t| t.display(timeline))
1161                    .collect::<Vec<_>>()
1162            )?;
1163        }
1164
1165        if let Some(constraints) = &self.determination.constraints {
1166            writeln!(f, "")?;
1167            writeln!(f, "binding constraints:")?;
1168            write!(f, "{}", constraints.display(timeline))?;
1169        }
1170
1171        Ok(())
1172    }
1173}
1174
1175/// Types and logic in support of a constraint-based approach to timestamp determination.
1176mod constraints {
1177
1178    use core::fmt;
1179    use std::fmt::Debug;
1180
1181    use differential_dataflow::lattice::Lattice;
1182    use mz_storage_types::sources::Timeline;
1183    use serde::{Deserialize, Serialize};
1184    use timely::progress::{Antichain, Timestamp};
1185
1186    use mz_compute_types::ComputeInstanceId;
1187    use mz_repr::GlobalId;
1188    use mz_sql::session::vars::IsolationLevel;
1189
1190    use super::DisplayableInTimeline;
1191
1192    /// Constraints expressed on the timestamp of a query.
1193    ///
1194    /// The constraints are expressed on the minimum and maximum values,
1195    /// resulting in a (possibly empty) interval of valid timestamps.
1196    ///
1197    /// The constraints may be redundant, in the interest of providing
1198    /// more complete explanations, but they may also be minimized at
1199    /// any point without altering their behavior by removing redundant
1200    /// constraints.
1201    ///
1202    /// When combined with a `Preference` one can determine an
1203    /// ideal timestamp to use.
1204    #[derive(Default, Serialize, Deserialize, Clone)]
1205    pub struct Constraints {
1206        /// Timestamps and reasons that impose an inclusive lower bound.
1207        pub lower: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
1208        /// Timestamps and reasons that impose an inclusive upper bound.
1209        pub upper: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
1210    }
1211
1212    impl DisplayableInTimeline for Constraints {
1213        fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
1214            if !self.lower.is_empty() {
1215                writeln!(f, "lower:")?;
1216                for (ts, reason) in &self.lower {
1217                    let ts = ts.iter().map(|t| t.display(timeline)).collect::<Vec<_>>();
1218                    writeln!(f, "  ({:?}): {:?}", reason, ts)?;
1219                }
1220            }
1221            if !self.upper.is_empty() {
1222                writeln!(f, "upper:")?;
1223                for (ts, reason) in &self.upper {
1224                    let ts = ts.iter().map(|t| t.display(timeline)).collect::<Vec<_>>();
1225                    writeln!(f, "  ({:?}): {:?}", reason, ts)?;
1226                }
1227            }
1228            Ok(())
1229        }
1230    }
1231
1232    impl Debug for Constraints {
1233        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1234            self.display(None).fmt(f)?;
1235            Ok(())
1236        }
1237    }
1238
1239    impl Constraints {
1240        /// Remove constraints that are dominated by other constraints.
1241        ///
1242        /// This removes redundant constraints, without removing constraints
1243        /// that are "tight" in the sense that the interval would be
1244        /// meaningfully different without them.
1245        /// For example, two constraints at the same
1246        /// time will both be retained, in the interest of full information.
1247        /// But a lower bound constraint at time `t` will be removed if there is a
1248        /// constraint at time `t + 1` (or any larger time).
1249        pub fn minimize(&mut self) {
1250            // Establish the upper bound of lower constraints.
1251            let lower_frontier = self.lower_bound();
1252            // Retain constraints that intersect `lower_frontier`.
1253            self.lower.retain(|(anti, _)| {
1254                anti.iter()
1255                    .any(|time| lower_frontier.elements().contains(time))
1256            });
1257
1258            // Establish the lower bound of upper constraints.
1259            let upper_frontier = self.upper_bound();
1260            // Retain constraints that intersect `upper_frontier`.
1261            self.upper.retain(|(anti, _)| {
1262                anti.iter()
1263                    .any(|time| upper_frontier.elements().contains(time))
1264            });
1265        }
1266
1267        /// An antichain equal to the least upper bound of lower bounds.
1268        pub fn lower_bound(&self) -> Antichain<mz_repr::Timestamp> {
1269            let mut lower = Antichain::from_elem(mz_repr::Timestamp::minimum());
1270            for (anti, _) in self.lower.iter() {
1271                lower = lower.join(anti);
1272            }
1273            lower
1274        }
1275        /// An antichain equal to the greatest lower bound of upper bounds.
1276        pub fn upper_bound(&self) -> Antichain<mz_repr::Timestamp> {
1277            self.upper
1278                .iter()
1279                .flat_map(|(anti, _)| anti.iter())
1280                .cloned()
1281                .collect()
1282        }
1283    }
1284
1285    /// An explanation of reasons for a timestamp constraint.
1286    #[derive(Serialize, Deserialize, Clone)]
1287    pub enum Reason {
1288        /// A compute input at a compute instance.
1289        /// This is something like an index or view
1290        /// that is mantained by compute.
1291        ComputeInput(Vec<(ComputeInstanceId, GlobalId)>),
1292        /// A storage input.
1293        StorageInput(Vec<GlobalId>),
1294        /// A specified isolation level and the timestamp it requires.
1295        IsolationLevel(IsolationLevel),
1296        /// Real-time recency may constrains the timestamp from below.
1297        RealTimeRecency,
1298        /// The query expressed its own constraint on the timestamp.
1299        QueryAsOf,
1300    }
1301
1302    impl Debug for Reason {
1303        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1304            match self {
1305                Reason::ComputeInput(ids) => write_split_ids(f, "ComputeInput", ids),
1306                Reason::StorageInput(ids) => write_split_ids(f, "StorageInput", ids),
1307                Reason::IsolationLevel(level) => {
1308                    write!(f, "IsolationLevel({:?})", level)
1309                }
1310                Reason::RealTimeRecency => {
1311                    write!(f, "RealTimeRecency")
1312                }
1313                Reason::QueryAsOf => {
1314                    write!(f, "QueryAsOf")
1315                }
1316            }
1317        }
1318    }
1319
1320    //TODO: This is a bit of a hack to make the debug output of constraints more readable.
1321    //We should probably have a more structured way to do this.
1322    fn write_split_ids<T: Debug>(f: &mut fmt::Formatter, label: &str, ids: &[T]) -> fmt::Result {
1323        let (ids, rest) = if ids.len() > 10 {
1324            ids.split_at(10)
1325        } else {
1326            let rest: &[T] = &[];
1327            (ids, rest)
1328        };
1329        if rest.is_empty() {
1330            write!(f, "{}({:?})", label, ids)
1331        } else {
1332            write!(f, "{}({:?}, ... {} more)", label, ids, rest.len())
1333        }
1334    }
1335
1336    /// Given an interval [read, write) of timestamp options,
1337    /// this expresses a preference for either end of the spectrum.
1338    pub enum Preference {
1339        /// Prefer the greatest timestamp immediately available.
1340        ///
1341        /// This considers the immediate inputs to a query and
1342        /// selects the greatest timestamp not greater or equal
1343        /// to any of their write frontiers.
1344        ///
1345        /// The preference only relates to immediate query inputs,
1346        /// but it could be extended to transitive inputs as well.
1347        /// For example, one could imagine prefering the freshest
1348        /// data known to be ingested into Materialize, under the
1349        /// premise that those answers should soon become available,
1350        /// and may be more fresh than the immediate inputs.
1351        FreshestAvailable,
1352        /// Prefer the least valid timeastamp.
1353        ///
1354        /// This is useful when one has no expressed freshness
1355        /// constraints, and wants to minimally impact others.
1356        /// For example, `AS OF AT LEAST <time>`.
1357        StalestValid,
1358    }
1359}