mz_adapter/coord/
timestamp_selection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for selecting timestamps for various operations on collections.
11
12use std::fmt;
13
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use constraints::Constraints;
17use differential_dataflow::lattice::Lattice;
18use itertools::Itertools;
19use mz_adapter_types::dyncfgs::CONSTRAINT_BASED_TIMESTAMP_SELECTION;
20use mz_adapter_types::timestamp_selection::ConstraintBasedTimestampSelection;
21use mz_compute_types::ComputeInstanceId;
22use mz_ore::cast::CastLossy;
23use mz_ore::soft_assert_eq_or_log;
24use mz_repr::{GlobalId, Timestamp, TimestampManipulation};
25use mz_sql::plan::QueryWhen;
26use mz_sql::session::metadata::SessionMetadata;
27use mz_sql::session::vars::IsolationLevel;
28use mz_storage_types::sources::Timeline;
29use serde::{Deserialize, Serialize};
30use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
31use tracing::{Level, event};
32
33use crate::AdapterError;
34use crate::catalog::CatalogState;
35use crate::coord::Coordinator;
36use crate::coord::id_bundle::CollectionIdBundle;
37use crate::coord::read_policy::ReadHolds;
38use crate::coord::timeline::TimelineContext;
39use crate::session::Session;
40
41/// The timeline and timestamp context of a read.
42#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
43pub enum TimestampContext<T> {
44    /// Read is executed in a specific timeline with a specific timestamp.
45    TimelineTimestamp {
46        timeline: Timeline,
47        /// The timestamp that was chosen for a read. This can differ from the
48        /// `oracle_ts` when collections are not readable at the (linearized)
49        /// timestamp for the oracle. In those cases (when the chosen timestamp
50        /// is further ahead than the oracle timestamp) we have to delay
51        /// returning peek results until the timestamp oracle is also
52        /// sufficiently advanced.
53        chosen_ts: T,
54        /// The timestamp that would have been chosen for the read by the
55        /// (linearized) timestamp oracle). In most cases this will be picked as
56        /// the `chosen_ts`.
57        oracle_ts: Option<T>,
58    },
59    /// Read is executed without a timeline or timestamp.
60    NoTimestamp,
61}
62
63impl<T: TimestampManipulation> TimestampContext<T> {
64    /// Creates a `TimestampContext` from a timestamp and `TimelineContext`.
65    pub fn from_timeline_context(
66        chosen_ts: T,
67        oracle_ts: Option<T>,
68        transaction_timeline: Option<Timeline>,
69        timeline_context: &TimelineContext,
70    ) -> TimestampContext<T> {
71        match timeline_context {
72            TimelineContext::TimelineDependent(timeline) => {
73                if let Some(transaction_timeline) = transaction_timeline {
74                    assert_eq!(timeline, &transaction_timeline);
75                }
76                Self::TimelineTimestamp {
77                    timeline: timeline.clone(),
78                    chosen_ts,
79                    oracle_ts,
80                }
81            }
82            TimelineContext::TimestampDependent => {
83                // We default to the `Timeline::EpochMilliseconds` timeline if one doesn't exist.
84                Self::TimelineTimestamp {
85                    timeline: transaction_timeline.unwrap_or(Timeline::EpochMilliseconds),
86                    chosen_ts,
87                    oracle_ts,
88                }
89            }
90            TimelineContext::TimestampIndependent => Self::NoTimestamp,
91        }
92    }
93
94    /// The timeline belonging to this context, if one exists.
95    pub fn timeline(&self) -> Option<&Timeline> {
96        self.timeline_timestamp().map(|tt| tt.0)
97    }
98
99    /// The timestamp belonging to this context, if one exists.
100    pub fn timestamp(&self) -> Option<&T> {
101        self.timeline_timestamp().map(|tt| tt.1)
102    }
103
104    /// The timeline and timestamp belonging to this context, if one exists.
105    pub fn timeline_timestamp(&self) -> Option<(&Timeline, &T)> {
106        match self {
107            Self::TimelineTimestamp {
108                timeline,
109                chosen_ts,
110                ..
111            } => Some((timeline, chosen_ts)),
112            Self::NoTimestamp => None,
113        }
114    }
115
116    /// The timestamp belonging to this context, or a sensible default if one does not exists.
117    pub fn timestamp_or_default(&self) -> T {
118        match self {
119            Self::TimelineTimestamp { chosen_ts, .. } => chosen_ts.clone(),
120            // Anything without a timestamp is given the maximum possible timestamp to indicate
121            // that they have been closed up until the end of time. This allows us to SUBSCRIBE to
122            // static views.
123            Self::NoTimestamp => T::maximum(),
124        }
125    }
126
127    /// Whether or not the context contains a timestamp.
128    pub fn contains_timestamp(&self) -> bool {
129        self.timestamp().is_some()
130    }
131
132    /// Converts this `TimestampContext` to an `Antichain`.
133    pub fn antichain(&self) -> Antichain<T> {
134        Antichain::from_elem(self.timestamp_or_default())
135    }
136}
137
138#[async_trait(?Send)]
139impl TimestampProvider for Coordinator {
140    /// Reports a collection's current read frontier.
141    fn compute_read_frontier(
142        &self,
143        instance: ComputeInstanceId,
144        id: GlobalId,
145    ) -> Antichain<Timestamp> {
146        self.controller
147            .compute
148            .collection_frontiers(id, Some(instance))
149            .expect("id does not exist")
150            .read_frontier
151    }
152
153    /// Reports a collection's current write frontier.
154    fn compute_write_frontier(
155        &self,
156        instance: ComputeInstanceId,
157        id: GlobalId,
158    ) -> Antichain<Timestamp> {
159        self.controller
160            .compute
161            .collection_frontiers(id, Some(instance))
162            .expect("id does not exist")
163            .write_frontier
164    }
165
166    fn storage_frontiers(
167        &self,
168        ids: Vec<GlobalId>,
169    ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)> {
170        self.controller
171            .storage
172            .collections_frontiers(ids)
173            .expect("missing collections")
174    }
175
176    fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds<Timestamp> {
177        self.acquire_read_holds(id_bundle)
178    }
179
180    fn catalog_state(&self) -> &CatalogState {
181        self.catalog().state()
182    }
183}
184
185/// A timestamp determination, which includes the timestamp, constraints, and session oracle read
186/// timestamp.
187#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct RawTimestampDetermination<T> {
189    pub timestamp: T,
190    pub constraints: Option<Constraints>,
191    pub session_oracle_read_ts: Option<T>,
192}
193
194#[async_trait(?Send)]
195pub trait TimestampProvider {
196    fn compute_read_frontier(
197        &self,
198        instance: ComputeInstanceId,
199        id: GlobalId,
200    ) -> Antichain<Timestamp>;
201    fn compute_write_frontier(
202        &self,
203        instance: ComputeInstanceId,
204        id: GlobalId,
205    ) -> Antichain<Timestamp>;
206
207    /// Returns the implied capability (since) and write frontier (upper) for
208    /// the specified storage collections.
209    fn storage_frontiers(
210        &self,
211        ids: Vec<GlobalId>,
212    ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)>;
213
214    fn catalog_state(&self) -> &CatalogState;
215
216    fn get_timeline(timeline_context: &TimelineContext) -> Option<Timeline> {
217        let timeline = match timeline_context {
218            TimelineContext::TimelineDependent(timeline) => Some(timeline.clone()),
219            // We default to the `Timeline::EpochMilliseconds` timeline if one doesn't exist.
220            TimelineContext::TimestampDependent => Some(Timeline::EpochMilliseconds),
221            TimelineContext::TimestampIndependent => None,
222        };
223
224        timeline
225    }
226
227    /// Returns true if-and-only-if the given configuration needs a linearized
228    /// read timestamp from a timestamp oracle.
229    ///
230    /// This assumes that the query happens in the context of a timeline. If
231    /// there is no timeline, we cannot and don't have to get a linearized read
232    /// timestamp.
233    fn needs_linearized_read_ts(isolation_level: &IsolationLevel, when: &QueryWhen) -> bool {
234        // When we're in the context of a timeline (assumption) and one of these
235        // scenarios hold, we need to use a linearized read timestamp:
236        // - The isolation level is Strict Serializable and the `when` allows us to use the
237        //   the timestamp oracle (ex: queries with no AS OF).
238        // - The `when` requires us to use the timestamp oracle (ex: read-then-write queries).
239        when.must_advance_to_timeline_ts()
240            || (when.can_advance_to_timeline_ts()
241                && matches!(
242                    isolation_level,
243                    IsolationLevel::StrictSerializable | IsolationLevel::StrongSessionSerializable
244                ))
245    }
246
247    /// Determines the timestamp for a query using the classical logic (as opposed to constraint-based).
248    fn determine_timestamp_classical(
249        session: &Session,
250        read_holds: &ReadHolds<Timestamp>,
251        id_bundle: &CollectionIdBundle,
252        when: &QueryWhen,
253        oracle_read_ts: Option<Timestamp>,
254        compute_instance: ComputeInstanceId,
255        real_time_recency_ts: Option<Timestamp>,
256        isolation_level: &IsolationLevel,
257        timeline: &Option<Timeline>,
258        largest_not_in_advance_of_upper: Timestamp,
259        since: &Antichain<Timestamp>,
260    ) -> Result<RawTimestampDetermination<Timestamp>, AdapterError> {
261        let mut session_oracle_read_ts = None;
262        // Each involved trace has a validity interval `[since, upper)`.
263        // The contents of a trace are only guaranteed to be correct when
264        // accumulated at a time greater or equal to `since`, and they
265        // are only guaranteed to be currently present for times not
266        // greater or equal to `upper`.
267        //
268        // The plan is to first determine a timestamp, based on the requested
269        // timestamp policy, and then determine if it can be satisfied using
270        // the compacted arrangements we have at hand. It remains unresolved
271        // what to do if it cannot be satisfied (perhaps the query should use
272        // a larger timestamp and block, perhaps the user should intervene).
273
274        {
275            // TODO: We currently split out getting the oracle timestamp because
276            // it's a potentially expensive call, but a call that can be done in an
277            // async task. TimestampProvider is not Send (nor Sync), so we cannot do
278            // the call to `determine_timestamp_for` (including the oracle call) on
279            // an async task. If/when TimestampProvider can become Send, we can fold
280            // the call to the TimestampOracle back into this function.
281            //
282            // We assert here that the logic that determines the oracle timestamp
283            // matches our expectations.
284
285            if timeline.is_some() && Self::needs_linearized_read_ts(isolation_level, when) {
286                assert!(
287                    oracle_read_ts.is_some(),
288                    "should get a timestamp from the oracle for linearized timeline {:?} but didn't",
289                    timeline
290                );
291            }
292        }
293
294        // Initialize candidate to the minimum correct time.
295        let mut candidate = Timestamp::minimum();
296
297        if let Some(ts) = when.advance_to_timestamp() {
298            candidate.join_assign(&ts);
299        }
300
301        if when.advance_to_since() {
302            // Note: This `advance_by` is a no-op if the given frontier is `[]`.
303            candidate.advance_by(since.borrow());
304        }
305
306        // If we've acquired a read timestamp from the timestamp oracle, use it
307        // as the new lower bound for the candidate.
308        // In Strong Session Serializable, we ignore the oracle timestamp for now, unless we need
309        // to use it.
310        if let Some(timestamp) = &oracle_read_ts {
311            if isolation_level != &IsolationLevel::StrongSessionSerializable
312                || when.must_advance_to_timeline_ts()
313            {
314                candidate.join_assign(timestamp);
315            }
316        }
317
318        // We advance to the upper in the following scenarios:
319        // - The isolation level is Serializable and the `when` allows us to advance to upper (ex:
320        //   queries with no AS OF). We avoid using the upper in Strict Serializable to prevent
321        //   reading source data that is being written to in the future.
322        // - The isolation level is Strict Serializable but there is no timelines and the `when`
323        //   allows us to advance to upper.
324        if when.can_advance_to_upper()
325            && (isolation_level == &IsolationLevel::Serializable || timeline.is_none())
326        {
327            candidate.join_assign(&largest_not_in_advance_of_upper);
328        }
329
330        if let Some(real_time_recency_ts) = real_time_recency_ts {
331            if !(session.vars().real_time_recency()
332                && isolation_level == &IsolationLevel::StrictSerializable)
333            {
334                // Erring on the side of caution, lets bail out here.
335                // This should never happen in practice, as the real time recency timestamp should
336                // only be supplied when real time recency is enabled.
337                coord_bail!(
338                    "real time recency timestamp should only be supplied when real time recency \
339                            is enabled and the isolation level is strict serializable"
340                );
341            }
342            candidate.join_assign(&real_time_recency_ts);
343        }
344
345        if isolation_level == &IsolationLevel::StrongSessionSerializable {
346            if let Some(timeline) = &timeline {
347                if let Some(oracle) = session.get_timestamp_oracle(timeline) {
348                    let session_ts = oracle.read_ts();
349                    candidate.join_assign(&session_ts);
350                    session_oracle_read_ts = Some(session_ts);
351                }
352            }
353
354            // When advancing the read timestamp under Strong Session Serializable, there is a
355            // trade-off to make between freshness and latency. We can choose a timestamp close the
356            // `upper`, but then later queries might block if the `upper` is too far into the
357            // future. We can chose a timestamp close to the current time, but then we may not be
358            // getting results that are as fresh as possible. As a heuristic, we choose the minimum
359            // of now and the upper, where we use the global timestamp oracle read timestamp as a
360            // proxy for now. If upper > now, then we choose now and prevent blocking future
361            // queries. If upper < now, then we choose the upper and prevent blocking the current
362            // query.
363            if when.can_advance_to_upper() && when.can_advance_to_timeline_ts() {
364                let mut advance_to = largest_not_in_advance_of_upper;
365                if let Some(oracle_read_ts) = oracle_read_ts {
366                    advance_to = std::cmp::min(advance_to, oracle_read_ts);
367                }
368                candidate.join_assign(&advance_to);
369            }
370        }
371
372        // If the timestamp is greater or equal to some element in `since` we are
373        // assured that the answer will be correct.
374        //
375        // It's ok for this timestamp to be larger than the current timestamp of
376        // the timestamp oracle. For Strict Serializable queries, the Coord will
377        // linearize the query by holding back the result until the timestamp
378        // oracle catches up.
379        let timestamp = if since.less_equal(&candidate) {
380            event!(
381                Level::DEBUG,
382                conn_id = format!("{}", session.conn_id()),
383                since = format!("{since:?}"),
384                largest_not_in_advance_of_upper = format!("{largest_not_in_advance_of_upper}"),
385                timestamp = format!("{candidate}")
386            );
387            candidate
388        } else {
389            // This can happen not just when the query has AS OF, but also when the passed in
390            // `since` is `[]`.
391            coord_bail!(generate_timestamp_not_valid_error_msg(
392                id_bundle,
393                compute_instance,
394                read_holds,
395                candidate
396            ));
397        };
398        Ok(RawTimestampDetermination {
399            timestamp,
400            constraints: None,
401            session_oracle_read_ts,
402        })
403    }
404
405    /// Uses constraints and preferences to determine a timestamp for a query.
406    /// Returns the determined timestamp, the constraints that were applied, and
407    /// session_oracle_read_ts.
408    fn determine_timestamp_via_constraints(
409        session: &Session,
410        read_holds: &ReadHolds<Timestamp>,
411        id_bundle: &CollectionIdBundle,
412        when: &QueryWhen,
413        oracle_read_ts: Option<Timestamp>,
414        compute_instance: ComputeInstanceId,
415        real_time_recency_ts: Option<Timestamp>,
416        isolation_level: &IsolationLevel,
417        timeline: &Option<Timeline>,
418        largest_not_in_advance_of_upper: Timestamp,
419    ) -> Result<RawTimestampDetermination<Timestamp>, AdapterError> {
420        use constraints::{Constraints, Preference, Reason};
421
422        let mut session_oracle_read_ts = None;
423        // We start by establishing the hard constraints that must be applied to timestamp determination.
424        // These constraints are derived from the input arguments, and properties of the collections involved.
425        // TODO: Many of the constraints are expressed obliquely, and could be made more direct.
426        let constraints = {
427            // Constraints we will populate through a sequence of opinions.
428            let mut constraints = Constraints::default();
429
430            // First, we have validity constraints from the `id_bundle` argument which indicates
431            // which collections we are reading from.
432            // TODO: Refine the detail about which identifiers are binding and which are not.
433            // TODO(dov): It's not entirely clear to me that there ever would be a non
434            // binding constraint introduced by the `id_bundle`. We should revisit this.
435            let since = read_holds.least_valid_read();
436            let storage = id_bundle
437                .storage_ids
438                .iter()
439                .cloned()
440                .collect::<Vec<GlobalId>>();
441            if !storage.is_empty() {
442                constraints
443                    .lower
444                    .push((since.clone(), Reason::StorageInput(storage)));
445            }
446            let compute = id_bundle
447                .compute_ids
448                .iter()
449                .flat_map(|(key, ids)| ids.iter().map(|id| (*key, *id)))
450                .collect::<Vec<(ComputeInstanceId, GlobalId)>>();
451            if !compute.is_empty() {
452                constraints
453                    .lower
454                    .push((since.clone(), Reason::ComputeInput(compute)));
455            }
456
457            // The query's `when` may indicates a specific timestamp we must advance to, or a specific value we must use.
458            if let Some(ts) = when.advance_to_timestamp() {
459                constraints
460                    .lower
461                    .push((Antichain::from_elem(ts), Reason::QueryAsOf));
462                // If the query is at a specific timestamp, we must introduce an upper bound as well.
463                if when.constrains_upper() {
464                    constraints
465                        .upper
466                        .push((Antichain::from_elem(ts), Reason::QueryAsOf));
467                }
468            }
469
470            // The specification of an `oracle_read_ts` may indicates that we must advance to it,
471            // except in one isolation mode, or if `when` does not indicate that we should.
472            // At the moment, only `QueryWhen::FreshestTableWrite` indicates that we should.
473            // TODO: Should this just depend on the isolation level?
474            if let Some(timestamp) = &oracle_read_ts {
475                if isolation_level != &IsolationLevel::StrongSessionSerializable
476                    || when.must_advance_to_timeline_ts()
477                {
478                    // When specification of an `oracle_read_ts` is required, we must advance to it.
479                    // If it's not present, lets bail out.
480                    constraints.lower.push((
481                        Antichain::from_elem(*timestamp),
482                        Reason::IsolationLevel(*isolation_level),
483                    ));
484                }
485            }
486
487            // If a real time recency timestamp is supplied, we must advance to it.
488            if let Some(real_time_recency_ts) = real_time_recency_ts {
489                assert!(
490                    session.vars().real_time_recency()
491                        && isolation_level == &IsolationLevel::StrictSerializable,
492                    "real time recency timestamp should only be supplied when real time recency \
493                                is enabled and the isolation level is strict serializable"
494                );
495                constraints.lower.push((
496                    Antichain::from_elem(real_time_recency_ts),
497                    Reason::RealTimeRecency,
498                ));
499            }
500
501            // If we are operating in Strong Session Serializable, we use an alternate timestamp lower bound.
502            if isolation_level == &IsolationLevel::StrongSessionSerializable {
503                if let Some(timeline) = &timeline {
504                    if let Some(oracle) = session.get_timestamp_oracle(timeline) {
505                        let session_ts = oracle.read_ts();
506                        constraints.lower.push((
507                            Antichain::from_elem(session_ts),
508                            Reason::IsolationLevel(*isolation_level),
509                        ));
510                        session_oracle_read_ts = Some(session_ts);
511                    }
512
513                    // When advancing the read timestamp under Strong Session Serializable, there is a
514                    // trade-off to make between freshness and latency. We can choose a timestamp close the
515                    // `upper`, but then later queries might block if the `upper` is too far into the
516                    // future. We can chose a timestamp close to the current time, but then we may not be
517                    // getting results that are as fresh as possible. As a heuristic, we choose the minimum
518                    // of now and the upper, where we use the global timestamp oracle read timestamp as a
519                    // proxy for now. If upper > now, then we choose now and prevent blocking future
520                    // queries. If upper < now, then we choose the upper and prevent blocking the current
521                    // query.
522                    if when.can_advance_to_upper() && when.can_advance_to_timeline_ts() {
523                        let mut advance_to = largest_not_in_advance_of_upper;
524                        if let Some(oracle_read_ts) = oracle_read_ts {
525                            advance_to = std::cmp::min(advance_to, oracle_read_ts);
526                        }
527                        constraints.lower.push((
528                            Antichain::from_elem(advance_to),
529                            Reason::IsolationLevel(*isolation_level),
530                        ));
531                    }
532                }
533            }
534
535            constraints.minimize();
536            constraints
537        };
538
539        // Next we establish the preferences that we would like to apply to timestamp determination.
540        // Generally, we want to choose the freshest timestamp possible, although there are exceptions
541        // when we either want a maximally *stale* timestamp, or we want to protect other queries from
542        // a recklessly advanced timestamp.
543        let preferences = {
544            // Counter-intuitively, the only `when` that allows `can_advance_to_upper` is `Immediately`,
545            // and not `FreshestTableWrite`. This is because `FreshestTableWrite` instead imposes a lower
546            // bound through the `oracle_read_ts`, and then requires the stalest valid timestamp.
547
548            if when.can_advance_to_upper()
549                && (isolation_level == &IsolationLevel::Serializable || timeline.is_none())
550            {
551                Preference::FreshestAvailable
552            } else {
553                Preference::StalestValid
554            }
555
556            // TODO: `StrongSessionSerializable` has a different set of preferences that starts to tease
557            // out the trade-off between freshness and responsiveness. I think we don't yet know enough
558            // to properly frame these preferences, though they are clearly aimed at the right concerns.
559        };
560
561        // Determine a candidate based on constraints and preferences.
562        let constraint_candidate = {
563            let mut candidate = Timestamp::minimum();
564            // Note: These `advance_by` calls are no-ops if the given frontier is `[]`.
565            candidate.advance_by(constraints.lower_bound().borrow());
566            // If we have a preference to be the freshest available, advance to the minimum
567            // of the upper bound constraints and the `largest_not_in_advance_of_upper`.
568            if let Preference::FreshestAvailable = preferences {
569                let mut upper_bound = constraints.upper_bound();
570                upper_bound.insert(largest_not_in_advance_of_upper);
571                candidate.advance_by(upper_bound.borrow());
572            }
573            // If the candidate is strictly outside the constraints, we didn't have a viable
574            // timestamp. This can happen e.g. when the query has AS OF, or when the lower bound is
575            // `[]`.
576            if !constraints.lower_bound().less_equal(&candidate)
577                || constraints.upper_bound().less_than(&candidate)
578            {
579                // TODO: Generate a better error msg, which includes all the constraints.
580                coord_bail!(generate_timestamp_not_valid_error_msg(
581                    id_bundle,
582                    compute_instance,
583                    read_holds,
584                    candidate
585                ));
586            } else {
587                candidate
588            }
589        };
590
591        Ok(RawTimestampDetermination {
592            timestamp: constraint_candidate,
593            constraints: Some(constraints),
594            session_oracle_read_ts,
595        })
596    }
597
598    /// Determines the timestamp for a query.
599    ///
600    /// Timestamp determination may fail due to the restricted validity of
601    /// traces. Each has a `since` and `upper` frontier, and are only valid
602    /// after `since` and sure to be available not after `upper`.
603    ///
604    /// The timeline that `id_bundle` belongs to is also returned, if one exists.
605    fn determine_timestamp_for(
606        &self,
607        session: &Session,
608        id_bundle: &CollectionIdBundle,
609        when: &QueryWhen,
610        compute_instance: ComputeInstanceId,
611        timeline_context: &TimelineContext,
612        oracle_read_ts: Option<Timestamp>,
613        real_time_recency_ts: Option<Timestamp>,
614        isolation_level: &IsolationLevel,
615        constraint_based: &ConstraintBasedTimestampSelection,
616    ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
617        // First, we acquire read holds that will ensure the queried collections
618        // stay queryable at the chosen timestamp.
619        let read_holds = self.acquire_read_holds(id_bundle);
620
621        let upper = self.least_valid_write(id_bundle);
622
623        Self::determine_timestamp_for_inner(
624            session,
625            id_bundle,
626            when,
627            compute_instance,
628            timeline_context,
629            oracle_read_ts,
630            real_time_recency_ts,
631            isolation_level,
632            constraint_based,
633            read_holds,
634            upper,
635        )
636    }
637
638    /// Same as determine_timestamp_for, but read_holds and least_valid_write are already passed in.
639    fn determine_timestamp_for_inner(
640        session: &Session,
641        id_bundle: &CollectionIdBundle,
642        when: &QueryWhen,
643        compute_instance: ComputeInstanceId,
644        timeline_context: &TimelineContext,
645        oracle_read_ts: Option<Timestamp>,
646        real_time_recency_ts: Option<Timestamp>,
647        isolation_level: &IsolationLevel,
648        constraint_based: &ConstraintBasedTimestampSelection,
649        read_holds: ReadHolds<Timestamp>,
650        upper: Antichain<Timestamp>,
651    ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
652        let timeline = Self::get_timeline(timeline_context);
653        let largest_not_in_advance_of_upper = Coordinator::largest_not_in_advance_of_upper(&upper);
654        let since = read_holds.least_valid_read();
655
656        // If the `since` is empty, then timestamp determination would fail. Let's return a more
657        // specific error in this case: Empty `since` frontiers happen here when collections were
658        // dropped concurrently with sequencing the query.
659        if since.is_empty() {
660            // Figure out what made the since frontier empty.
661            let mut unreadable_collections = Vec::new();
662            for (coll_id, hold) in read_holds.storage_holds {
663                if hold.since().is_empty() {
664                    unreadable_collections.push(coll_id);
665                }
666            }
667            for ((_instance_id, coll_id), hold) in read_holds.compute_holds {
668                if hold.since().is_empty() {
669                    unreadable_collections.push(coll_id);
670                }
671            }
672            return Err(AdapterError::CollectionUnreadable {
673                id: unreadable_collections.into_iter().join(", "),
674            });
675        }
676
677        let raw_determination = match constraint_based {
678            ConstraintBasedTimestampSelection::Disabled => Self::determine_timestamp_classical(
679                session,
680                &read_holds,
681                id_bundle,
682                when,
683                oracle_read_ts,
684                compute_instance,
685                real_time_recency_ts,
686                isolation_level,
687                &timeline,
688                largest_not_in_advance_of_upper,
689                &since,
690            )?,
691            ConstraintBasedTimestampSelection::Enabled => {
692                Self::determine_timestamp_via_constraints(
693                    session,
694                    &read_holds,
695                    id_bundle,
696                    when,
697                    oracle_read_ts,
698                    compute_instance,
699                    real_time_recency_ts,
700                    isolation_level,
701                    &timeline,
702                    largest_not_in_advance_of_upper,
703                )?
704            }
705            ConstraintBasedTimestampSelection::Verify => {
706                let classical_determination = Self::determine_timestamp_classical(
707                    session,
708                    &read_holds,
709                    id_bundle,
710                    when,
711                    oracle_read_ts,
712                    compute_instance,
713                    real_time_recency_ts,
714                    isolation_level,
715                    &timeline,
716                    largest_not_in_advance_of_upper,
717                    &since,
718                );
719
720                let constraint_determination = Self::determine_timestamp_via_constraints(
721                    session,
722                    &read_holds,
723                    id_bundle,
724                    when,
725                    oracle_read_ts,
726                    compute_instance,
727                    real_time_recency_ts,
728                    isolation_level,
729                    &timeline,
730                    largest_not_in_advance_of_upper,
731                );
732
733                match (classical_determination, constraint_determination) {
734                    (Ok(classical_determination), Ok(constraint_determination)) => {
735                        soft_assert_eq_or_log!(
736                            classical_determination.timestamp,
737                            constraint_determination.timestamp,
738                            "timestamp determination mismatch"
739                        );
740                        if classical_determination.timestamp != constraint_determination.timestamp {
741                            tracing::info!(
742                                "timestamp constrains: {:?}",
743                                constraint_determination.constraints
744                            );
745                        }
746                        RawTimestampDetermination {
747                            timestamp: classical_determination.timestamp,
748                            constraints: constraint_determination.constraints,
749                            session_oracle_read_ts: classical_determination.session_oracle_read_ts,
750                        }
751                    }
752                    (Err(classical_determination_err), Err(_constraint_determination_err)) => {
753                        // This is ok: The errors don't have to exactly match.
754                        return Err(classical_determination_err);
755                    }
756                    (Ok(classical_determination), Err(constraint_determination_err)) => {
757                        event!(
758                            Level::ERROR,
759                            classical = ?classical_determination,
760                            constraint_based = ?constraint_determination_err,
761                            "classical timestamp determination succeeded, but constraint-based failed"
762                        );
763                        RawTimestampDetermination {
764                            timestamp: classical_determination.timestamp,
765                            constraints: classical_determination.constraints,
766                            session_oracle_read_ts: classical_determination.session_oracle_read_ts,
767                        }
768                    }
769                    (Err(classical_determination_err), Ok(constraint_determination)) => {
770                        event!(
771                            Level::ERROR,
772                            classical = ?classical_determination_err,
773                            constraint_based = ?constraint_determination,
774                            "classical timestamp determination failed, but constraint-based succeeded"
775                        );
776                        return Err(classical_determination_err);
777                    }
778                }
779            }
780        };
781
782        let timestamp_context = TimestampContext::from_timeline_context(
783            raw_determination.timestamp,
784            oracle_read_ts,
785            timeline,
786            timeline_context,
787        );
788
789        let determination = TimestampDetermination {
790            timestamp_context,
791            since,
792            upper,
793            largest_not_in_advance_of_upper,
794            oracle_read_ts,
795            session_oracle_read_ts: raw_determination.session_oracle_read_ts,
796            real_time_recency_ts,
797            constraints: raw_determination.constraints,
798        };
799
800        Ok((determination, read_holds))
801    }
802
803    /// Acquires [ReadHolds], for the given `id_bundle` at the earliest possible
804    /// times.
805    fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds<mz_repr::Timestamp>;
806
807    /// The smallest common valid write frontier among the specified collections.
808    ///
809    /// Times that are not greater or equal to this frontier are complete for all collections
810    /// identified as arguments.
811    fn least_valid_write(&self, id_bundle: &CollectionIdBundle) -> Antichain<mz_repr::Timestamp> {
812        let mut upper = Antichain::new();
813        {
814            for (_id, _since, collection_upper) in
815                self.storage_frontiers(id_bundle.storage_ids.iter().cloned().collect_vec())
816            {
817                upper.extend(collection_upper);
818            }
819        }
820        {
821            for (instance, compute_ids) in &id_bundle.compute_ids {
822                for id in compute_ids.iter() {
823                    upper.extend(self.compute_write_frontier(*instance, *id).into_iter());
824                }
825            }
826        }
827        upper
828    }
829
830    /// Returns `least_valid_write` - 1, i.e., each time in `least_valid_write` stepped back in a
831    /// saturating way.
832    fn greatest_available_read(&self, id_bundle: &CollectionIdBundle) -> Antichain<Timestamp> {
833        let mut frontier = Antichain::new();
834        for t in self.least_valid_write(id_bundle) {
835            frontier.insert(t.step_back().unwrap_or(t));
836        }
837        frontier
838    }
839}
840
841fn generate_timestamp_not_valid_error_msg(
842    id_bundle: &CollectionIdBundle,
843    compute_instance: ComputeInstanceId,
844    read_holds: &ReadHolds<mz_repr::Timestamp>,
845    candidate: mz_repr::Timestamp,
846) -> String {
847    let mut invalid = Vec::new();
848
849    if let Some(compute_ids) = id_bundle.compute_ids.get(&compute_instance) {
850        for id in compute_ids {
851            let since = read_holds.since(id);
852            if !since.less_equal(&candidate) {
853                invalid.push((*id, since));
854            }
855        }
856    }
857
858    for id in id_bundle.storage_ids.iter() {
859        let since = read_holds.since(id);
860        if !since.less_equal(&candidate) {
861            invalid.push((*id, since));
862        }
863    }
864
865    format!(
866        "Timestamp ({}) is not valid for all inputs: {:?}",
867        candidate, invalid,
868    )
869}
870
871impl Coordinator {
872    pub(crate) async fn oracle_read_ts(
873        &self,
874        session: &Session,
875        timeline_ctx: &TimelineContext,
876        when: &QueryWhen,
877    ) -> Option<Timestamp> {
878        let isolation_level = session.vars().transaction_isolation().clone();
879        let timeline = Coordinator::get_timeline(timeline_ctx);
880        let needs_linearized_read_ts =
881            Coordinator::needs_linearized_read_ts(&isolation_level, when);
882
883        let oracle_read_ts = match timeline {
884            Some(timeline) if needs_linearized_read_ts => {
885                let timestamp_oracle = self.get_timestamp_oracle(&timeline);
886                Some(timestamp_oracle.read_ts().await)
887            }
888            Some(_) | None => None,
889        };
890
891        oracle_read_ts
892    }
893
894    /// Determines the timestamp for a query, acquires read holds that ensure the
895    /// query remains executable at that time, and returns those.
896    /// The caller is responsible for eventually dropping those read holds.
897    #[mz_ore::instrument(level = "debug")]
898    pub(crate) fn determine_timestamp(
899        &self,
900        session: &Session,
901        id_bundle: &CollectionIdBundle,
902        when: &QueryWhen,
903        compute_instance: ComputeInstanceId,
904        timeline_context: &TimelineContext,
905        oracle_read_ts: Option<Timestamp>,
906        real_time_recency_ts: Option<mz_repr::Timestamp>,
907    ) -> Result<
908        (
909            TimestampDetermination<mz_repr::Timestamp>,
910            ReadHolds<mz_repr::Timestamp>,
911        ),
912        AdapterError,
913    > {
914        let constraint_based = ConstraintBasedTimestampSelection::from_str(
915            &CONSTRAINT_BASED_TIMESTAMP_SELECTION
916                .get(self.catalog_state().system_config().dyncfgs()),
917        );
918
919        let isolation_level = session.vars().transaction_isolation();
920        let (det, read_holds) = self.determine_timestamp_for(
921            session,
922            id_bundle,
923            when,
924            compute_instance,
925            timeline_context,
926            oracle_read_ts,
927            real_time_recency_ts,
928            isolation_level,
929            &constraint_based,
930        )?;
931        self.metrics
932            .determine_timestamp
933            .with_label_values(&[
934                match det.respond_immediately() {
935                    true => "true",
936                    false => "false",
937                },
938                isolation_level.as_str(),
939                &compute_instance.to_string(),
940                constraint_based.as_str(),
941            ])
942            .inc();
943        if !det.respond_immediately()
944            && isolation_level == &IsolationLevel::StrictSerializable
945            && real_time_recency_ts.is_none()
946        {
947            // Note down the difference between StrictSerializable and Serializable into a metric.
948            if let Some(strict) = det.timestamp_context.timestamp() {
949                let (serializable_det, _tmp_read_holds) = self.determine_timestamp_for(
950                    session,
951                    id_bundle,
952                    when,
953                    compute_instance,
954                    timeline_context,
955                    oracle_read_ts,
956                    real_time_recency_ts,
957                    &IsolationLevel::Serializable,
958                    &constraint_based,
959                )?;
960
961                if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
962                    self.metrics
963                        .timestamp_difference_for_strict_serializable_ms
964                        .with_label_values(&[
965                            compute_instance.to_string().as_ref(),
966                            constraint_based.as_str(),
967                        ])
968                        .observe(f64::cast_lossy(u64::from(
969                            strict.saturating_sub(*serializable),
970                        )));
971                }
972            }
973        }
974        Ok((det, read_holds))
975    }
976
977    /// The largest timestamp not greater or equal to an element of `upper`.
978    ///
979    /// If no such timestamp exists, for example because `upper` contains only the
980    /// minimal timestamp, the return value is `Timestamp::minimum()`.
981    pub(crate) fn largest_not_in_advance_of_upper(
982        upper: &Antichain<mz_repr::Timestamp>,
983    ) -> mz_repr::Timestamp {
984        // We peek at the largest element not in advance of `upper`, which
985        // involves a subtraction. If `upper` contains a zero timestamp there
986        // is no "prior" answer, and we do not want to peek at it as it risks
987        // hanging awaiting the response to data that may never arrive.
988        if let Some(upper) = upper.as_option() {
989            upper.step_back().unwrap_or_else(Timestamp::minimum)
990        } else {
991            // A complete trace can be read in its final form with this time.
992            //
993            // This should only happen for literals that have no sources or sources that
994            // are known to have completed (non-tailed files for example).
995            Timestamp::MAX
996        }
997    }
998}
999
1000/// Information used when determining the timestamp for a query.
1001#[derive(Serialize, Deserialize, Debug, Clone)]
1002pub struct TimestampDetermination<T> {
1003    /// The chosen timestamp context from `determine_timestamp`.
1004    pub timestamp_context: TimestampContext<T>,
1005    /// The read frontier of all involved sources.
1006    pub since: Antichain<T>,
1007    /// The write frontier of all involved sources.
1008    pub upper: Antichain<T>,
1009    /// The largest timestamp not in advance of upper.
1010    pub largest_not_in_advance_of_upper: T,
1011    /// The value of the timeline's oracle timestamp, if used.
1012    pub oracle_read_ts: Option<T>,
1013    /// The value of the session local timestamp's oracle timestamp, if used.
1014    pub session_oracle_read_ts: Option<T>,
1015    /// The value of the real time recency timestamp, if used.
1016    pub real_time_recency_ts: Option<T>,
1017    /// The constraints used by the constraint based solver.
1018    /// See the [`constraints`] module for more information.
1019    pub constraints: Option<Constraints>,
1020}
1021
1022impl<T: TimestampManipulation> TimestampDetermination<T> {
1023    pub fn respond_immediately(&self) -> bool {
1024        match &self.timestamp_context {
1025            TimestampContext::TimelineTimestamp { chosen_ts, .. } => {
1026                !self.upper.less_equal(chosen_ts)
1027            }
1028            TimestampContext::NoTimestamp => true,
1029        }
1030    }
1031}
1032
1033/// Information used when determining the timestamp for a query.
1034#[derive(Clone, Debug, Serialize, Deserialize)]
1035pub struct TimestampExplanation<T> {
1036    /// The chosen timestamp from `determine_timestamp`.
1037    pub determination: TimestampDetermination<T>,
1038    /// Details about each source.
1039    pub sources: Vec<TimestampSource<T>>,
1040    /// Wall time of first statement executed in this transaction
1041    pub session_wall_time: DateTime<Utc>,
1042    /// Cached value of determination.respond_immediately()
1043    pub respond_immediately: bool,
1044}
1045
1046#[derive(Clone, Debug, Serialize, Deserialize)]
1047pub struct TimestampSource<T> {
1048    pub name: String,
1049    pub read_frontier: Vec<T>,
1050    pub write_frontier: Vec<T>,
1051}
1052
1053pub trait DisplayableInTimeline {
1054    fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result;
1055    fn display<'a>(&'a self, timeline: Option<&'a Timeline>) -> DisplayInTimeline<'a, Self> {
1056        DisplayInTimeline { t: self, timeline }
1057    }
1058}
1059
1060impl DisplayableInTimeline for mz_repr::Timestamp {
1061    fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
1062        if let Some(Timeline::EpochMilliseconds) = timeline {
1063            let ts_ms: u64 = self.into();
1064            if let Ok(ts_ms) = i64::try_from(ts_ms) {
1065                if let Some(ndt) = DateTime::from_timestamp_millis(ts_ms) {
1066                    return write!(f, "{:13} ({})", self, ndt.format("%Y-%m-%d %H:%M:%S%.3f"));
1067                }
1068            }
1069        }
1070        write!(f, "{:13}", self)
1071    }
1072}
1073
1074pub struct DisplayInTimeline<'a, T: ?Sized> {
1075    t: &'a T,
1076    timeline: Option<&'a Timeline>,
1077}
1078impl<'a, T> fmt::Display for DisplayInTimeline<'a, T>
1079where
1080    T: DisplayableInTimeline,
1081{
1082    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1083        self.t.fmt(self.timeline, f)
1084    }
1085}
1086
1087impl<'a, T> fmt::Debug for DisplayInTimeline<'a, T>
1088where
1089    T: DisplayableInTimeline,
1090{
1091    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1092        fmt::Display::fmt(&self, f)
1093    }
1094}
1095
1096impl<T: fmt::Display + fmt::Debug + DisplayableInTimeline + TimestampManipulation> fmt::Display
1097    for TimestampExplanation<T>
1098{
1099    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1100        let timeline = self.determination.timestamp_context.timeline();
1101        writeln!(
1102            f,
1103            "                query timestamp: {}",
1104            self.determination
1105                .timestamp_context
1106                .timestamp_or_default()
1107                .display(timeline)
1108        )?;
1109        if let Some(oracle_read_ts) = &self.determination.oracle_read_ts {
1110            writeln!(
1111                f,
1112                "          oracle read timestamp: {}",
1113                oracle_read_ts.display(timeline)
1114            )?;
1115        }
1116        if let Some(session_oracle_read_ts) = &self.determination.session_oracle_read_ts {
1117            writeln!(
1118                f,
1119                "  session oracle read timestamp: {}",
1120                session_oracle_read_ts.display(timeline)
1121            )?;
1122        }
1123        if let Some(real_time_recency_ts) = &self.determination.real_time_recency_ts {
1124            writeln!(
1125                f,
1126                "    real time recency timestamp: {}",
1127                real_time_recency_ts.display(timeline)
1128            )?;
1129        }
1130        writeln!(
1131            f,
1132            "largest not in advance of upper: {}",
1133            self.determination
1134                .largest_not_in_advance_of_upper
1135                .display(timeline),
1136        )?;
1137        writeln!(
1138            f,
1139            "                          upper:{:?}",
1140            self.determination
1141                .upper
1142                .iter()
1143                .map(|t| t.display(timeline))
1144                .collect::<Vec<_>>()
1145        )?;
1146        writeln!(
1147            f,
1148            "                          since:{:?}",
1149            self.determination
1150                .since
1151                .iter()
1152                .map(|t| t.display(timeline))
1153                .collect::<Vec<_>>()
1154        )?;
1155        writeln!(
1156            f,
1157            "        can respond immediately: {}",
1158            self.respond_immediately
1159        )?;
1160        writeln!(f, "                       timeline: {:?}", &timeline)?;
1161        writeln!(
1162            f,
1163            "              session wall time: {:13} ({})",
1164            self.session_wall_time.timestamp_millis(),
1165            self.session_wall_time.format("%Y-%m-%d %H:%M:%S%.3f"),
1166        )?;
1167
1168        for source in &self.sources {
1169            writeln!(f, "")?;
1170            writeln!(f, "source {}:", source.name)?;
1171            writeln!(
1172                f,
1173                "                  read frontier:{:?}",
1174                source
1175                    .read_frontier
1176                    .iter()
1177                    .map(|t| t.display(timeline))
1178                    .collect::<Vec<_>>()
1179            )?;
1180            writeln!(
1181                f,
1182                "                 write frontier:{:?}",
1183                source
1184                    .write_frontier
1185                    .iter()
1186                    .map(|t| t.display(timeline))
1187                    .collect::<Vec<_>>()
1188            )?;
1189        }
1190
1191        if let Some(constraints) = &self.determination.constraints {
1192            writeln!(f, "")?;
1193            writeln!(f, "binding constraints:")?;
1194            write!(f, "{}", constraints.display(timeline))?;
1195        }
1196
1197        Ok(())
1198    }
1199}
1200
1201/// Types and logic in support of a constraint-based approach to timestamp determination.
1202mod constraints {
1203
1204    use core::fmt;
1205    use std::fmt::Debug;
1206
1207    use differential_dataflow::lattice::Lattice;
1208    use mz_storage_types::sources::Timeline;
1209    use serde::{Deserialize, Serialize};
1210    use timely::progress::{Antichain, Timestamp};
1211
1212    use mz_compute_types::ComputeInstanceId;
1213    use mz_repr::GlobalId;
1214    use mz_sql::session::vars::IsolationLevel;
1215
1216    use super::DisplayableInTimeline;
1217
1218    /// Constraints expressed on the timestamp of a query.
1219    ///
1220    /// The constraints are expressed on the minimum and maximum values,
1221    /// resulting in a (possibly empty) interval of valid timestamps.
1222    ///
1223    /// The constraints may be redundant, in the interest of providing
1224    /// more complete explanations, but they may also be minimized at
1225    /// any point without altering their behavior by removing redundant
1226    /// constraints.
1227    ///
1228    /// When combined with a `Preference` one can determine an
1229    /// ideal timestamp to use.
1230    #[derive(Default, Serialize, Deserialize, Clone)]
1231    pub struct Constraints {
1232        /// Timestamps and reasons that impose an inclusive lower bound.
1233        pub lower: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
1234        /// Timestamps and reasons that impose an inclusive upper bound.
1235        pub upper: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
1236    }
1237
1238    impl DisplayableInTimeline for Constraints {
1239        fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
1240            if !self.lower.is_empty() {
1241                writeln!(f, "lower:")?;
1242                for (ts, reason) in &self.lower {
1243                    let ts = ts.iter().map(|t| t.display(timeline)).collect::<Vec<_>>();
1244                    writeln!(f, "  ({:?}): {:?}", reason, ts)?;
1245                }
1246            }
1247            if !self.upper.is_empty() {
1248                writeln!(f, "upper:")?;
1249                for (ts, reason) in &self.upper {
1250                    let ts = ts.iter().map(|t| t.display(timeline)).collect::<Vec<_>>();
1251                    writeln!(f, "  ({:?}): {:?}", reason, ts)?;
1252                }
1253            }
1254            Ok(())
1255        }
1256    }
1257
1258    impl Debug for Constraints {
1259        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1260            self.display(None).fmt(f)?;
1261            Ok(())
1262        }
1263    }
1264
1265    impl Constraints {
1266        /// Remove constraints that are dominated by other constraints.
1267        ///
1268        /// This removes redundant constraints, without removing constraints
1269        /// that are "tight" in the sense that the interval would be
1270        /// meaningfully different without them.
1271        /// For example, two constraints at the same
1272        /// time will both be retained, in the interest of full information.
1273        /// But a lower bound constraint at time `t` will be removed if there is a
1274        /// constraint at time `t + 1` (or any larger time).
1275        pub fn minimize(&mut self) {
1276            // Establish the upper bound of lower constraints.
1277            let lower_frontier = self.lower_bound();
1278            // Retain constraints that intersect `lower_frontier`.
1279            self.lower.retain(|(anti, _)| {
1280                anti.iter()
1281                    .any(|time| lower_frontier.elements().contains(time))
1282            });
1283
1284            // Establish the lower bound of upper constraints.
1285            let upper_frontier = self.upper_bound();
1286            // Retain constraints that intersect `upper_frontier`.
1287            self.upper.retain(|(anti, _)| {
1288                anti.iter()
1289                    .any(|time| upper_frontier.elements().contains(time))
1290            });
1291        }
1292
1293        /// An antichain equal to the least upper bound of lower bounds.
1294        pub fn lower_bound(&self) -> Antichain<mz_repr::Timestamp> {
1295            let mut lower = Antichain::from_elem(mz_repr::Timestamp::minimum());
1296            for (anti, _) in self.lower.iter() {
1297                lower = lower.join(anti);
1298            }
1299            lower
1300        }
1301        /// An antichain equal to the greatest lower bound of upper bounds.
1302        pub fn upper_bound(&self) -> Antichain<mz_repr::Timestamp> {
1303            self.upper
1304                .iter()
1305                .flat_map(|(anti, _)| anti.iter())
1306                .cloned()
1307                .collect()
1308        }
1309    }
1310
1311    /// An explanation of reasons for a timestamp constraint.
1312    #[derive(Serialize, Deserialize, Clone)]
1313    pub enum Reason {
1314        /// A compute input at a compute instance.
1315        /// This is something like an index or view
1316        /// that is mantained by compute.
1317        ComputeInput(Vec<(ComputeInstanceId, GlobalId)>),
1318        /// A storage input.
1319        StorageInput(Vec<GlobalId>),
1320        /// A specified isolation level and the timestamp it requires.
1321        IsolationLevel(IsolationLevel),
1322        /// Real-time recency may constrains the timestamp from below.
1323        RealTimeRecency,
1324        /// The query expressed its own constraint on the timestamp.
1325        QueryAsOf,
1326    }
1327
1328    impl Debug for Reason {
1329        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1330            match self {
1331                Reason::ComputeInput(ids) => write_split_ids(f, "ComputeInput", ids),
1332                Reason::StorageInput(ids) => write_split_ids(f, "StorageInput", ids),
1333                Reason::IsolationLevel(level) => {
1334                    write!(f, "IsolationLevel({:?})", level)
1335                }
1336                Reason::RealTimeRecency => {
1337                    write!(f, "RealTimeRecency")
1338                }
1339                Reason::QueryAsOf => {
1340                    write!(f, "QueryAsOf")
1341                }
1342            }
1343        }
1344    }
1345
1346    //TODO: This is a bit of a hack to make the debug output of constraints more readable.
1347    //We should probably have a more structured way to do this.
1348    fn write_split_ids<T: Debug>(f: &mut fmt::Formatter, label: &str, ids: &[T]) -> fmt::Result {
1349        let (ids, rest) = if ids.len() > 10 {
1350            ids.split_at(10)
1351        } else {
1352            let rest: &[T] = &[];
1353            (ids, rest)
1354        };
1355        if rest.is_empty() {
1356            write!(f, "{}({:?})", label, ids)
1357        } else {
1358            write!(f, "{}({:?}, ... {} more)", label, ids, rest.len())
1359        }
1360    }
1361
1362    /// Given an interval [read, write) of timestamp options,
1363    /// this expresses a preference for either end of the spectrum.
1364    pub enum Preference {
1365        /// Prefer the greatest timestamp immediately available.
1366        ///
1367        /// This considers the immediate inputs to a query and
1368        /// selects the greatest timestamp not greater or equal
1369        /// to any of their write frontiers.
1370        ///
1371        /// The preference only relates to immediate query inputs,
1372        /// but it could be extended to transitive inputs as well.
1373        /// For example, one could imagine prefering the freshest
1374        /// data known to be ingested into Materialize, under the
1375        /// premise that those answers should soon become available,
1376        /// and may be more fresh than the immediate inputs.
1377        FreshestAvailable,
1378        /// Prefer the least valid timeastamp.
1379        ///
1380        /// This is useful when one has no expressed freshness
1381        /// constraints, and wants to minimally impact others.
1382        /// For example, `AS OF AT LEAST <time>`.
1383        StalestValid,
1384    }
1385}