Skip to main content

mz_adapter/coord/
timestamp_selection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for selecting timestamps for various operations on collections.
11
12use std::fmt;
13
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use constraints::Constraints;
17use differential_dataflow::lattice::Lattice;
18use itertools::Itertools;
19use mz_compute_types::ComputeInstanceId;
20use mz_ore::cast::CastLossy;
21use mz_repr::{GlobalId, Timestamp, TimestampManipulation};
22use mz_sql::plan::QueryWhen;
23use mz_sql::session::vars::IsolationLevel;
24use mz_storage_types::sources::Timeline;
25use serde::{Deserialize, Serialize};
26use timely::progress::{Antichain, Timestamp as _};
27
28use crate::AdapterError;
29use crate::catalog::CatalogState;
30use crate::coord::Coordinator;
31use crate::coord::id_bundle::CollectionIdBundle;
32use crate::coord::read_policy::ReadHolds;
33use crate::coord::timeline::TimelineContext;
34use crate::session::Session;
35
36/// The timeline and timestamp context of a read.
37#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
38pub enum TimestampContext {
39    /// Read is executed in a specific timeline with a specific timestamp.
40    TimelineTimestamp {
41        timeline: Timeline,
42        /// The timestamp that was chosen for a read. This can differ from the
43        /// `oracle_ts` when collections are not readable at the (linearized)
44        /// timestamp for the oracle. In those cases (when the chosen timestamp
45        /// is further ahead than the oracle timestamp) we have to delay
46        /// returning peek results until the timestamp oracle is also
47        /// sufficiently advanced.
48        chosen_ts: Timestamp,
49        /// The timestamp that would have been chosen for the read by the
50        /// (linearized) timestamp oracle). In most cases this will be picked as
51        /// the `chosen_ts`.
52        oracle_ts: Option<Timestamp>,
53    },
54    /// Read is executed without a timeline or timestamp.
55    NoTimestamp,
56}
57
58impl TimestampContext {
59    /// Creates a `TimestampContext` from a timestamp and `TimelineContext`.
60    pub fn from_timeline_context(
61        chosen_ts: Timestamp,
62        oracle_ts: Option<Timestamp>,
63        transaction_timeline: Option<Timeline>,
64        timeline_context: &TimelineContext,
65    ) -> TimestampContext {
66        match timeline_context {
67            TimelineContext::TimelineDependent(timeline) => {
68                if let Some(transaction_timeline) = transaction_timeline {
69                    assert_eq!(timeline, &transaction_timeline);
70                }
71                Self::TimelineTimestamp {
72                    timeline: timeline.clone(),
73                    chosen_ts,
74                    oracle_ts,
75                }
76            }
77            TimelineContext::TimestampDependent => {
78                // We default to the `Timeline::EpochMilliseconds` timeline if one doesn't exist.
79                Self::TimelineTimestamp {
80                    timeline: transaction_timeline.unwrap_or(Timeline::EpochMilliseconds),
81                    chosen_ts,
82                    oracle_ts,
83                }
84            }
85            TimelineContext::TimestampIndependent => Self::NoTimestamp,
86        }
87    }
88
89    /// The timeline belonging to this context, if one exists.
90    pub fn timeline(&self) -> Option<&Timeline> {
91        self.timeline_timestamp().map(|tt| tt.0)
92    }
93
94    /// The timestamp belonging to this context, if one exists.
95    pub fn timestamp(&self) -> Option<&Timestamp> {
96        self.timeline_timestamp().map(|tt| tt.1)
97    }
98
99    /// The timeline and timestamp belonging to this context, if one exists.
100    pub fn timeline_timestamp(&self) -> Option<(&Timeline, &Timestamp)> {
101        match self {
102            Self::TimelineTimestamp {
103                timeline,
104                chosen_ts,
105                ..
106            } => Some((timeline, chosen_ts)),
107            Self::NoTimestamp => None,
108        }
109    }
110
111    /// The timestamp belonging to this context, or a sensible default if one does not exists.
112    pub fn timestamp_or_default(&self) -> Timestamp {
113        match self {
114            Self::TimelineTimestamp { chosen_ts, .. } => chosen_ts.clone(),
115            // Anything without a timestamp is given the maximum possible timestamp to indicate
116            // that they have been closed up until the end of time. This allows us to SUBSCRIBE to
117            // static views.
118            Self::NoTimestamp => Timestamp::maximum(),
119        }
120    }
121
122    /// Whether or not the context contains a timestamp.
123    pub fn contains_timestamp(&self) -> bool {
124        self.timestamp().is_some()
125    }
126
127    /// Converts this `TimestampContext` to an `Antichain`.
128    pub fn antichain(&self) -> Antichain<Timestamp> {
129        Antichain::from_elem(self.timestamp_or_default())
130    }
131}
132
133#[async_trait(?Send)]
134impl TimestampProvider for Coordinator {
135    /// Reports a collection's current read frontier.
136    fn compute_read_frontier(
137        &self,
138        instance: ComputeInstanceId,
139        id: GlobalId,
140    ) -> Antichain<Timestamp> {
141        self.controller
142            .compute
143            .collection_frontiers(id, Some(instance))
144            .expect("id does not exist")
145            .read_frontier
146    }
147
148    /// Reports a collection's current write frontier.
149    fn compute_write_frontier(
150        &self,
151        instance: ComputeInstanceId,
152        id: GlobalId,
153    ) -> Antichain<Timestamp> {
154        self.controller
155            .compute
156            .collection_frontiers(id, Some(instance))
157            .expect("id does not exist")
158            .write_frontier
159    }
160
161    fn storage_frontiers(
162        &self,
163        ids: Vec<GlobalId>,
164    ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)> {
165        self.controller
166            .storage
167            .collections_frontiers(ids)
168            .expect("missing collections")
169    }
170
171    fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds {
172        self.acquire_read_holds(id_bundle)
173    }
174
175    fn catalog_state(&self) -> &CatalogState {
176        self.catalog().state()
177    }
178}
179
180/// A timestamp determination, which includes the timestamp, constraints, and session oracle read
181/// timestamp.
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct RawTimestampDetermination {
184    pub timestamp: Timestamp,
185    pub constraints: Constraints,
186    pub session_oracle_read_ts: Option<Timestamp>,
187}
188
189#[async_trait(?Send)]
190pub trait TimestampProvider {
191    fn compute_read_frontier(
192        &self,
193        instance: ComputeInstanceId,
194        id: GlobalId,
195    ) -> Antichain<Timestamp>;
196    fn compute_write_frontier(
197        &self,
198        instance: ComputeInstanceId,
199        id: GlobalId,
200    ) -> Antichain<Timestamp>;
201
202    /// Returns the implied capability (since) and write frontier (upper) for
203    /// the specified storage collections.
204    fn storage_frontiers(
205        &self,
206        ids: Vec<GlobalId>,
207    ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)>;
208
209    fn catalog_state(&self) -> &CatalogState;
210
211    fn get_timeline(timeline_context: &TimelineContext) -> Option<Timeline> {
212        let timeline = match timeline_context {
213            TimelineContext::TimelineDependent(timeline) => Some(timeline.clone()),
214            // We default to the `Timeline::EpochMilliseconds` timeline if one doesn't exist.
215            TimelineContext::TimestampDependent => Some(Timeline::EpochMilliseconds),
216            TimelineContext::TimestampIndependent => None,
217        };
218
219        timeline
220    }
221
222    /// Returns true if-and-only-if the given configuration needs a linearized
223    /// read timestamp from a timestamp oracle.
224    ///
225    /// This assumes that the query happens in the context of a timeline. If
226    /// there is no timeline, we cannot and don't have to get a linearized read
227    /// timestamp.
228    fn needs_linearized_read_ts(isolation_level: &IsolationLevel, when: &QueryWhen) -> bool {
229        // When we're in the context of a timeline (assumption) and one of these
230        // scenarios hold, we need to use a linearized read timestamp:
231        // - The isolation level anchors against the oracle (Strict Serializable,
232        //   Strong Session Serializable, Bounded Staleness) and the `when` allows
233        //   us to use the timestamp oracle (ex: queries with no AS OF).
234        // - The `when` requires us to use the timestamp oracle (ex: read-then-write
235        //   queries).
236        when.must_advance_to_timeline_ts()
237            || (when.can_advance_to_timeline_ts()
238                && matches!(
239                    isolation_level,
240                    IsolationLevel::StrictSerializable
241                        | IsolationLevel::StrongSessionSerializable
242                        | IsolationLevel::BoundedStaleness(_)
243                ))
244    }
245
246    /// Uses constraints and preferences to determine a timestamp for a query.
247    /// Returns the determined timestamp, the constraints that were applied, and
248    /// session_oracle_read_ts.
249    fn determine_timestamp_via_constraints(
250        session: &Session,
251        read_holds: &ReadHolds,
252        id_bundle: &CollectionIdBundle,
253        when: &QueryWhen,
254        oracle_read_ts: Option<Timestamp>,
255        real_time_recency_ts: Option<Timestamp>,
256        isolation_level: &IsolationLevel,
257        timeline: &Option<Timeline>,
258        largest_not_in_advance_of_upper: Timestamp,
259    ) -> Result<RawTimestampDetermination, AdapterError> {
260        use constraints::{Constraints, Preference, Reason};
261
262        let mut session_oracle_read_ts = None;
263        // We start by establishing the hard constraints that must be applied to timestamp determination.
264        // These constraints are derived from the input arguments, and properties of the collections involved.
265        // TODO: Many of the constraints are expressed obliquely, and could be made more direct.
266        let constraints = {
267            // Constraints we will populate through a sequence of opinions.
268            let mut constraints = Constraints::default();
269
270            // First, we have validity constraints from the `id_bundle` argument which indicates
271            // which collections we are reading from.
272            // TODO: Refine the detail about which identifiers are binding and which are not.
273            // TODO(dov): It's not entirely clear to me that there ever would be a non
274            // binding constraint introduced by the `id_bundle`. We should revisit this.
275            let since = read_holds.least_valid_read();
276            let storage = id_bundle
277                .storage_ids
278                .iter()
279                .cloned()
280                .collect::<Vec<GlobalId>>();
281            if !storage.is_empty() {
282                constraints
283                    .lower
284                    .push((since.clone(), Reason::StorageInput(storage)));
285            }
286            let compute = id_bundle
287                .compute_ids
288                .iter()
289                .flat_map(|(key, ids)| ids.iter().map(|id| (*key, *id)))
290                .collect::<Vec<(ComputeInstanceId, GlobalId)>>();
291            if !compute.is_empty() {
292                constraints
293                    .lower
294                    .push((since.clone(), Reason::ComputeInput(compute)));
295            }
296
297            // The query's `when` may indicates a specific timestamp we must advance to, or a specific value we must use.
298            if let Some(ts) = when.advance_to_timestamp() {
299                constraints
300                    .lower
301                    .push((Antichain::from_elem(ts), Reason::QueryAsOf));
302                // If the query is at a specific timestamp, we must introduce an upper bound as well.
303                if when.constrains_upper() {
304                    constraints
305                        .upper
306                        .push((Antichain::from_elem(ts), Reason::QueryAsOf));
307                }
308            }
309
310            // The specification of an `oracle_read_ts` may indicates that we must advance to it,
311            // except in some isolation modes, or if `when` does not indicate that we should.
312            // At the moment, only `QueryWhen::FreshestTableWrite` indicates that we should.
313            // TODO: Should this just depend on the isolation level?
314            if let Some(timestamp) = &oracle_read_ts {
315                // Whether this isolation level treats `oracle_read_ts` as a hard
316                // lower bound. Strong session serializable (session-local oracle)
317                // and bounded staleness (`oracle - D` anchor) instead consult it
318                // below; pushing it here would shadow those semantics.
319                let hard_lower_bound = match isolation_level {
320                    IsolationLevel::StrongSessionSerializable
321                    | IsolationLevel::BoundedStaleness(_) => false,
322                    IsolationLevel::ReadUncommitted
323                    | IsolationLevel::ReadCommitted
324                    | IsolationLevel::RepeatableRead
325                    | IsolationLevel::Serializable
326                    | IsolationLevel::StrictSerializable => true,
327                };
328                // `must_advance_to_timeline_ts()` (only `FreshestTableWrite`)
329                // forces the bound regardless; bounded staleness rejects writes
330                // upstream, so that path is unreachable for it in practice.
331                if hard_lower_bound || when.must_advance_to_timeline_ts() {
332                    constraints.lower.push((
333                        Antichain::from_elem(*timestamp),
334                        Reason::IsolationLevel(*isolation_level),
335                    ));
336                }
337            }
338
339            // If a real time recency timestamp is supplied, we must advance to it.
340            if let Some(real_time_recency_ts) = real_time_recency_ts {
341                assert!(
342                    session.vars().real_time_recency()
343                        && isolation_level == &IsolationLevel::StrictSerializable,
344                    "real time recency timestamp should only be supplied when real time recency \
345                                is enabled and the isolation level is strict serializable"
346                );
347                constraints.lower.push((
348                    Antichain::from_elem(real_time_recency_ts),
349                    Reason::RealTimeRecency,
350                ));
351            }
352
353            // Bounded staleness anchors the freshness floor at `oracle.read_ts - D`
354            // and clamps the upper at `largest_not_in_advance_of_upper`. The upper
355            // clamp is what makes an infeasible bound fail fast in the coordinator
356            // instead of blocking on compute for the upper to advance — which would
357            // make the failure mode cluster-shape-dependent. `oracle_read_ts` is
358            // `None` under `AS OF`, where the user-chosen `T` is the only constraint.
359            if let IsolationLevel::BoundedStaleness(d) = isolation_level {
360                if let Some(anchor) = oracle_read_ts {
361                    let bound_ms = u64::try_from(d.as_millis()).unwrap_or(u64::MAX);
362                    let lower = anchor.saturating_sub(bound_ms);
363                    constraints.lower.push((
364                        Antichain::from_elem(lower),
365                        Reason::IsolationLevel(*isolation_level),
366                    ));
367                    constraints.upper.push((
368                        Antichain::from_elem(largest_not_in_advance_of_upper),
369                        Reason::IsolationLevel(*isolation_level),
370                    ));
371                }
372            }
373
374            // If we are operating in Strong Session Serializable, we use an alternate timestamp lower bound.
375            if isolation_level == &IsolationLevel::StrongSessionSerializable {
376                if let Some(timeline) = &timeline {
377                    if let Some(oracle) = session.get_timestamp_oracle(timeline) {
378                        let session_ts = oracle.read_ts();
379                        constraints.lower.push((
380                            Antichain::from_elem(session_ts),
381                            Reason::IsolationLevel(*isolation_level),
382                        ));
383                        session_oracle_read_ts = Some(session_ts);
384                    }
385
386                    // When advancing the read timestamp under Strong Session Serializable, there is a
387                    // trade-off to make between freshness and latency. We can choose a timestamp close the
388                    // `upper`, but then later queries might block if the `upper` is too far into the
389                    // future. We can chose a timestamp close to the current time, but then we may not be
390                    // getting results that are as fresh as possible. As a heuristic, we choose the minimum
391                    // of now and the upper, where we use the global timestamp oracle read timestamp as a
392                    // proxy for now. If upper > now, then we choose now and prevent blocking future
393                    // queries. If upper < now, then we choose the upper and prevent blocking the current
394                    // query.
395                    if when.can_advance_to_upper() && when.can_advance_to_timeline_ts() {
396                        let mut advance_to = largest_not_in_advance_of_upper;
397                        if let Some(oracle_read_ts) = oracle_read_ts {
398                            advance_to = std::cmp::min(advance_to, oracle_read_ts);
399                        }
400                        constraints.lower.push((
401                            Antichain::from_elem(advance_to),
402                            Reason::IsolationLevel(*isolation_level),
403                        ));
404                    }
405                }
406            }
407
408            constraints.minimize();
409            constraints
410        };
411
412        // Next we establish the preferences that we would like to apply to timestamp determination.
413        // Generally, we want to choose the freshest timestamp possible, although there are exceptions
414        // when we either want a maximally *stale* timestamp, or we want to protect other queries from
415        // a recklessly advanced timestamp.
416        let preferences = {
417            // Counter-intuitively, the only `when` that allows `can_advance_to_upper` is `Immediately`,
418            // and not `FreshestTableWrite`. This is because `FreshestTableWrite` instead imposes a lower
419            // bound through the `oracle_read_ts`, and then requires the stalest valid timestamp.
420
421            if when.can_advance_to_upper()
422                && (isolation_level == &IsolationLevel::Serializable
423                    || matches!(isolation_level, IsolationLevel::BoundedStaleness(_))
424                    || timeline.is_none())
425            {
426                Preference::FreshestAvailable
427            } else {
428                Preference::StalestValid
429            }
430
431            // TODO: `StrongSessionSerializable` has a different set of preferences that starts to tease
432            // out the trade-off between freshness and responsiveness. I think we don't yet know enough
433            // to properly frame these preferences, though they are clearly aimed at the right concerns.
434        };
435
436        // Determine a candidate based on constraints and preferences.
437        let constraint_candidate = {
438            let mut candidate = Timestamp::minimum();
439            // Note: These `advance_by` calls are no-ops if the given frontier is `[]`.
440            candidate.advance_by(constraints.lower_bound().borrow());
441            // If we have a preference to be the freshest available, advance to the minimum
442            // of the upper bound constraints and the `largest_not_in_advance_of_upper`.
443            if let Preference::FreshestAvailable = preferences {
444                let mut upper_bound = constraints.upper_bound();
445                upper_bound.insert(largest_not_in_advance_of_upper);
446                candidate.advance_by(upper_bound.borrow());
447            }
448            // If the candidate is strictly outside the constraints, we didn't have a viable
449            // timestamp. This can happen e.g. when the query has AS OF, or when the lower bound is
450            // `[]`.
451            if !constraints.lower_bound().less_equal(&candidate)
452                || constraints.upper_bound().less_than(&candidate)
453            {
454                // Bounded staleness wants a specific error describing the staleness
455                // gap. Derive it directly from `anchor - D` and the inputs' upper,
456                // not from `constraints.{lower,upper}_bound()` — those join *all*
457                // reasons (`since`, `AS OF`, …), so a non-bs constraint could
458                // dominate and the reported gap would not describe the bs failure.
459                // A zero gap means the bs floor was satisfiable and something else
460                // caused the infeasibility; fall through to the generic error.
461                // `oracle_read_ts` is unset under `AS OF`, where bs added no
462                // constraint, so we cannot reach here for a bs-specific failure.
463                if let IsolationLevel::BoundedStaleness(d) = isolation_level {
464                    if let Some(anchor) = oracle_read_ts {
465                        let bound_ms = u64::try_from(d.as_millis()).unwrap_or(u64::MAX);
466                        let bs_lower: u64 = anchor.saturating_sub(bound_ms).into();
467                        let upper: u64 = largest_not_in_advance_of_upper.into();
468                        let gap = bs_lower.saturating_sub(upper);
469                        if gap > 0 {
470                            return Err(AdapterError::BoundedStalenessExceeded {
471                                bound: *d,
472                                gap_ms: gap,
473                                slowest_input: None,
474                            });
475                        }
476                    }
477                }
478                return Err(AdapterError::ImpossibleTimestampConstraints {
479                    constraints: constraints.display(timeline.as_ref()).to_string(),
480                });
481            } else {
482                candidate
483            }
484        };
485
486        Ok(RawTimestampDetermination {
487            timestamp: constraint_candidate,
488            constraints,
489            session_oracle_read_ts,
490        })
491    }
492
493    /// Determines the timestamp for a query.
494    ///
495    /// Timestamp determination may fail due to the restricted validity of
496    /// traces. Each has a `since` and `upper` frontier, and are only valid
497    /// after `since` and sure to be available not after `upper`.
498    ///
499    /// The timeline that `id_bundle` belongs to is also returned, if one exists.
500    fn determine_timestamp_for(
501        &self,
502        session: &Session,
503        id_bundle: &CollectionIdBundle,
504        when: &QueryWhen,
505        timeline_context: &TimelineContext,
506        oracle_read_ts: Option<Timestamp>,
507        real_time_recency_ts: Option<Timestamp>,
508        isolation_level: &IsolationLevel,
509    ) -> Result<(TimestampDetermination, ReadHolds), AdapterError> {
510        // First, we acquire read holds that will ensure the queried collections
511        // stay queryable at the chosen timestamp.
512        let read_holds = self.acquire_read_holds(id_bundle);
513
514        let upper = self.least_valid_write(id_bundle);
515
516        Self::determine_timestamp_for_inner(
517            session,
518            id_bundle,
519            when,
520            timeline_context,
521            oracle_read_ts,
522            real_time_recency_ts,
523            isolation_level,
524            read_holds,
525            upper,
526        )
527    }
528
529    /// Same as determine_timestamp_for, but read_holds and least_valid_write are already passed in.
530    fn determine_timestamp_for_inner(
531        session: &Session,
532        id_bundle: &CollectionIdBundle,
533        when: &QueryWhen,
534        timeline_context: &TimelineContext,
535        oracle_read_ts: Option<Timestamp>,
536        real_time_recency_ts: Option<Timestamp>,
537        isolation_level: &IsolationLevel,
538        read_holds: ReadHolds,
539        upper: Antichain<Timestamp>,
540    ) -> Result<(TimestampDetermination, ReadHolds), AdapterError> {
541        let timeline = Self::get_timeline(timeline_context);
542        let largest_not_in_advance_of_upper = Coordinator::largest_not_in_advance_of_upper(&upper);
543        let since = read_holds.least_valid_read();
544
545        // If the `since` is empty, then timestamp determination would fail. Let's return a more
546        // specific error in this case: Empty `since` frontiers happen here when collections were
547        // dropped concurrently with sequencing the query.
548        if since.is_empty() {
549            // Figure out what made the since frontier empty.
550            let mut unreadable_collections = Vec::new();
551            for (coll_id, hold) in read_holds.storage_holds {
552                if hold.since().is_empty() {
553                    unreadable_collections.push(coll_id);
554                }
555            }
556            for ((_instance_id, coll_id), hold) in read_holds.compute_holds {
557                if hold.since().is_empty() {
558                    unreadable_collections.push(coll_id);
559                }
560            }
561            return Err(AdapterError::CollectionUnreadable {
562                id: unreadable_collections.into_iter().join(", "),
563            });
564        }
565
566        // Bounded staleness freshness math assumes the EpochMilliseconds timeline,
567        // where timestamps are wall-clock milliseconds. Timeline-less queries
568        // (`TimestampIndependent`, e.g. constant queries) are fine — the freshness
569        // contract is vacuous for them.
570        if isolation_level.is_bounded_staleness()
571            && matches!(&timeline, Some(t) if *t != Timeline::EpochMilliseconds)
572        {
573            return Err(AdapterError::BoundedStalenessTimelineUnsupported);
574        }
575
576        let raw_determination = Self::determine_timestamp_via_constraints(
577            session,
578            &read_holds,
579            id_bundle,
580            when,
581            oracle_read_ts,
582            real_time_recency_ts,
583            isolation_level,
584            &timeline,
585            largest_not_in_advance_of_upper,
586        )?;
587
588        let timestamp_context = TimestampContext::from_timeline_context(
589            raw_determination.timestamp,
590            oracle_read_ts,
591            timeline,
592            timeline_context,
593        );
594
595        let determination = TimestampDetermination {
596            timestamp_context,
597            since,
598            upper,
599            largest_not_in_advance_of_upper,
600            oracle_read_ts,
601            session_oracle_read_ts: raw_determination.session_oracle_read_ts,
602            real_time_recency_ts,
603            constraints: raw_determination.constraints,
604        };
605
606        Ok((determination, read_holds))
607    }
608
609    /// Acquires [ReadHolds], for the given `id_bundle` at the earliest possible
610    /// times.
611    fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds;
612
613    /// The smallest common valid write frontier among the specified collections.
614    ///
615    /// Times that are not greater or equal to this frontier are complete for all collections
616    /// identified as arguments.
617    fn least_valid_write(&self, id_bundle: &CollectionIdBundle) -> Antichain<mz_repr::Timestamp> {
618        let mut upper = Antichain::new();
619        {
620            for (_id, _since, collection_upper) in
621                self.storage_frontiers(id_bundle.storage_ids.iter().cloned().collect_vec())
622            {
623                upper.extend(collection_upper);
624            }
625        }
626        {
627            for (instance, compute_ids) in &id_bundle.compute_ids {
628                for id in compute_ids.iter() {
629                    upper.extend(self.compute_write_frontier(*instance, *id));
630                }
631            }
632        }
633        upper
634    }
635
636    /// Returns `least_valid_write` - 1, i.e., each time in `least_valid_write` stepped back in a
637    /// saturating way.
638    fn greatest_available_read(&self, id_bundle: &CollectionIdBundle) -> Antichain<Timestamp> {
639        let mut frontier = Antichain::new();
640        for t in self.least_valid_write(id_bundle) {
641            frontier.insert(t.step_back().unwrap_or(t));
642        }
643        frontier
644    }
645}
646
647impl Coordinator {
648    pub(crate) async fn oracle_read_ts(
649        &self,
650        session: &Session,
651        timeline_ctx: &TimelineContext,
652        when: &QueryWhen,
653    ) -> Option<Timestamp> {
654        let isolation_level = session.vars().transaction_isolation().clone();
655        let timeline = Coordinator::get_timeline(timeline_ctx);
656        let needs_linearized_read_ts =
657            Coordinator::needs_linearized_read_ts(&isolation_level, when);
658
659        let oracle_read_ts = match timeline {
660            Some(timeline) if needs_linearized_read_ts => {
661                let timestamp_oracle = self.get_timestamp_oracle(&timeline);
662                Some(timestamp_oracle.read_ts().await)
663            }
664            Some(_) | None => None,
665        };
666
667        oracle_read_ts
668    }
669
670    /// Determines the timestamp for a query, acquires read holds that ensure the
671    /// query remains executable at that time, and returns those.
672    /// The caller is responsible for eventually dropping those read holds.
673    #[mz_ore::instrument(level = "debug")]
674    pub(crate) fn determine_timestamp(
675        &self,
676        session: &Session,
677        id_bundle: &CollectionIdBundle,
678        when: &QueryWhen,
679        compute_instance: ComputeInstanceId,
680        timeline_context: &TimelineContext,
681        oracle_read_ts: Option<Timestamp>,
682        real_time_recency_ts: Option<mz_repr::Timestamp>,
683    ) -> Result<(TimestampDetermination, ReadHolds), AdapterError> {
684        let isolation_level = session.vars().transaction_isolation();
685        let (det, read_holds) = self.determine_timestamp_for(
686            session,
687            id_bundle,
688            when,
689            timeline_context,
690            oracle_read_ts,
691            real_time_recency_ts,
692            isolation_level,
693        )?;
694        self.metrics
695            .determine_timestamp
696            .with_label_values(&[
697                match det.respond_immediately() {
698                    true => "true",
699                    false => "false",
700                },
701                isolation_level.as_variant_str(),
702                &compute_instance.to_string(),
703            ])
704            .inc();
705        if !det.respond_immediately()
706            && isolation_level == &IsolationLevel::StrictSerializable
707            && real_time_recency_ts.is_none()
708        {
709            // Note down the difference between StrictSerializable and Serializable into a metric.
710            if let Some(strict) = det.timestamp_context.timestamp() {
711                let (serializable_det, _tmp_read_holds) = self.determine_timestamp_for(
712                    session,
713                    id_bundle,
714                    when,
715                    timeline_context,
716                    oracle_read_ts,
717                    real_time_recency_ts,
718                    &IsolationLevel::Serializable,
719                )?;
720
721                if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
722                    self.metrics
723                        .timestamp_difference_for_strict_serializable_ms
724                        .with_label_values(&[compute_instance.to_string().as_str()])
725                        .observe(f64::cast_lossy(u64::from(
726                            strict.saturating_sub(*serializable),
727                        )));
728                }
729            }
730        }
731        if !det.respond_immediately()
732            && isolation_level.is_bounded_staleness()
733            && real_time_recency_ts.is_none()
734        {
735            // Note down the difference between BoundedStaleness and Serializable into a metric.
736            if let Some(bs_ts) = det.timestamp_context.timestamp() {
737                let (serializable_det, _tmp_read_holds) = self.determine_timestamp_for(
738                    session,
739                    id_bundle,
740                    when,
741                    timeline_context,
742                    oracle_read_ts,
743                    real_time_recency_ts,
744                    &IsolationLevel::Serializable,
745                )?;
746                if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
747                    self.metrics
748                        .timestamp_difference_for_bounded_staleness_ms
749                        .with_label_values(&[compute_instance.to_string().as_str()])
750                        .observe(f64::cast_lossy(u64::from(
751                            serializable.saturating_sub(*bs_ts),
752                        )));
753                }
754            }
755        }
756        Ok((det, read_holds))
757    }
758
759    /// The largest timestamp not greater or equal to an element of `upper`.
760    ///
761    /// If no such timestamp exists, for example because `upper` contains only the
762    /// minimal timestamp, the return value is `Timestamp::minimum()`.
763    pub(crate) fn largest_not_in_advance_of_upper(
764        upper: &Antichain<mz_repr::Timestamp>,
765    ) -> mz_repr::Timestamp {
766        // We peek at the largest element not in advance of `upper`, which
767        // involves a subtraction. If `upper` contains a zero timestamp there
768        // is no "prior" answer, and we do not want to peek at it as it risks
769        // hanging awaiting the response to data that may never arrive.
770        if let Some(upper) = upper.as_option() {
771            upper.step_back().unwrap_or_else(Timestamp::minimum)
772        } else {
773            // A complete trace can be read in its final form with this time.
774            //
775            // This should only happen for literals that have no sources or sources that
776            // are known to have completed (non-tailed files for example).
777            Timestamp::MAX
778        }
779    }
780}
781
782/// Information used when determining the timestamp for a query.
783#[derive(Serialize, Deserialize, Debug, Clone)]
784pub struct TimestampDetermination {
785    /// The chosen timestamp context from `determine_timestamp`.
786    pub timestamp_context: TimestampContext,
787    /// The read frontier of all involved sources.
788    pub since: Antichain<Timestamp>,
789    /// The write frontier of all involved sources.
790    pub upper: Antichain<Timestamp>,
791    /// The largest timestamp not in advance of upper.
792    pub largest_not_in_advance_of_upper: Timestamp,
793    /// The value of the timeline's oracle timestamp, if used.
794    pub oracle_read_ts: Option<Timestamp>,
795    /// The value of the session local timestamp's oracle timestamp, if used.
796    pub session_oracle_read_ts: Option<Timestamp>,
797    /// The value of the real time recency timestamp, if used.
798    pub real_time_recency_ts: Option<Timestamp>,
799    /// The constraints used by the constraint based solver.
800    /// See the [`constraints`] module for more information.
801    pub constraints: Constraints,
802}
803
804impl TimestampDetermination {
805    pub fn respond_immediately(&self) -> bool {
806        match &self.timestamp_context {
807            TimestampContext::TimelineTimestamp { chosen_ts, .. } => {
808                !self.upper.less_equal(chosen_ts)
809            }
810            TimestampContext::NoTimestamp => true,
811        }
812    }
813}
814
815/// Information used when determining the timestamp for a query.
816#[derive(Clone, Debug, Serialize, Deserialize)]
817pub struct TimestampExplanation {
818    /// The chosen timestamp from `determine_timestamp`.
819    pub determination: TimestampDetermination,
820    /// Details about each source.
821    pub sources: Vec<TimestampSource>,
822    /// Wall time of first statement executed in this transaction
823    pub session_wall_time: DateTime<Utc>,
824    /// Cached value of determination.respond_immediately()
825    pub respond_immediately: bool,
826}
827
828#[derive(Clone, Debug, Serialize, Deserialize)]
829pub struct TimestampSource {
830    pub name: String,
831    pub read_frontier: Vec<Timestamp>,
832    pub write_frontier: Vec<Timestamp>,
833}
834
835pub trait DisplayableInTimeline {
836    fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result;
837    fn display<'a>(&'a self, timeline: Option<&'a Timeline>) -> DisplayInTimeline<'a, Self> {
838        DisplayInTimeline { t: self, timeline }
839    }
840}
841
842impl DisplayableInTimeline for mz_repr::Timestamp {
843    fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
844        if let Some(Timeline::EpochMilliseconds) = timeline {
845            let ts_ms: u64 = self.into();
846            if let Ok(ts_ms) = i64::try_from(ts_ms) {
847                if let Some(ndt) = DateTime::from_timestamp_millis(ts_ms) {
848                    return write!(f, "{:13} ({})", self, ndt.format("%Y-%m-%d %H:%M:%S%.3f"));
849                }
850            }
851        }
852        write!(f, "{:13}", self)
853    }
854}
855
856pub struct DisplayInTimeline<'a, T: ?Sized> {
857    t: &'a T,
858    timeline: Option<&'a Timeline>,
859}
860impl<'a, T> fmt::Display for DisplayInTimeline<'a, T>
861where
862    T: DisplayableInTimeline,
863{
864    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
865        self.t.fmt(self.timeline, f)
866    }
867}
868
869impl<'a, T> fmt::Debug for DisplayInTimeline<'a, T>
870where
871    T: DisplayableInTimeline,
872{
873    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
874        fmt::Display::fmt(&self, f)
875    }
876}
877
878impl fmt::Display for TimestampExplanation {
879    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
880        let timeline = self.determination.timestamp_context.timeline();
881        writeln!(
882            f,
883            "                query timestamp: {}",
884            self.determination
885                .timestamp_context
886                .timestamp_or_default()
887                .display(timeline)
888        )?;
889        if let Some(oracle_read_ts) = &self.determination.oracle_read_ts {
890            writeln!(
891                f,
892                "          oracle read timestamp: {}",
893                oracle_read_ts.display(timeline)
894            )?;
895        }
896        if let Some(session_oracle_read_ts) = &self.determination.session_oracle_read_ts {
897            writeln!(
898                f,
899                "  session oracle read timestamp: {}",
900                session_oracle_read_ts.display(timeline)
901            )?;
902        }
903        if let Some(real_time_recency_ts) = &self.determination.real_time_recency_ts {
904            writeln!(
905                f,
906                "    real time recency timestamp: {}",
907                real_time_recency_ts.display(timeline)
908            )?;
909        }
910        writeln!(
911            f,
912            "largest not in advance of upper: {}",
913            self.determination
914                .largest_not_in_advance_of_upper
915                .display(timeline),
916        )?;
917        writeln!(
918            f,
919            "                          upper:{:?}",
920            self.determination
921                .upper
922                .iter()
923                .map(|t| t.display(timeline))
924                .collect::<Vec<_>>()
925        )?;
926        writeln!(
927            f,
928            "                          since:{:?}",
929            self.determination
930                .since
931                .iter()
932                .map(|t| t.display(timeline))
933                .collect::<Vec<_>>()
934        )?;
935        writeln!(
936            f,
937            "        can respond immediately: {}",
938            self.respond_immediately
939        )?;
940        writeln!(f, "                       timeline: {:?}", &timeline)?;
941        writeln!(
942            f,
943            "              session wall time: {:13} ({})",
944            self.session_wall_time.timestamp_millis(),
945            self.session_wall_time.format("%Y-%m-%d %H:%M:%S%.3f"),
946        )?;
947
948        for source in &self.sources {
949            writeln!(f, "")?;
950            writeln!(f, "source {}:", source.name)?;
951            writeln!(
952                f,
953                "                  read frontier:{:?}",
954                source
955                    .read_frontier
956                    .iter()
957                    .map(|t| t.display(timeline))
958                    .collect::<Vec<_>>()
959            )?;
960            writeln!(
961                f,
962                "                 write frontier:{:?}",
963                source
964                    .write_frontier
965                    .iter()
966                    .map(|t| t.display(timeline))
967                    .collect::<Vec<_>>()
968            )?;
969        }
970
971        writeln!(f, "")?;
972        writeln!(f, "binding constraints:")?;
973        write!(f, "{}", self.determination.constraints.display(timeline))?;
974
975        Ok(())
976    }
977}
978
979/// Types and logic in support of a constraint-based approach to timestamp determination.
980mod constraints {
981
982    use core::fmt;
983    use std::fmt::Debug;
984
985    use differential_dataflow::lattice::Lattice;
986    use mz_storage_types::sources::Timeline;
987    use serde::{Deserialize, Serialize};
988    use timely::progress::{Antichain, Timestamp};
989
990    use mz_compute_types::ComputeInstanceId;
991    use mz_repr::GlobalId;
992    use mz_sql::session::vars::IsolationLevel;
993
994    use super::DisplayableInTimeline;
995
996    /// Constraints expressed on the timestamp of a query.
997    ///
998    /// The constraints are expressed on the minimum and maximum values,
999    /// resulting in a (possibly empty) interval of valid timestamps.
1000    ///
1001    /// The constraints may be redundant, in the interest of providing
1002    /// more complete explanations, but they may also be minimized at
1003    /// any point without altering their behavior by removing redundant
1004    /// constraints.
1005    ///
1006    /// When combined with a `Preference` one can determine an
1007    /// ideal timestamp to use.
1008    #[derive(Default, Serialize, Deserialize, Clone)]
1009    pub struct Constraints {
1010        /// Timestamps and reasons that impose an inclusive lower bound.
1011        pub lower: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
1012        /// Timestamps and reasons that impose an inclusive upper bound.
1013        pub upper: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
1014    }
1015
1016    impl DisplayableInTimeline for Constraints {
1017        fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
1018            if !self.lower.is_empty() {
1019                writeln!(f, "lower:")?;
1020                for (ts, reason) in &self.lower {
1021                    let ts: Vec<_> = ts
1022                        .iter()
1023                        .map(|t| format!("{}", t.display(timeline)))
1024                        .collect();
1025                    writeln!(f, "  ({}): [{}]", reason, ts.join(", "))?;
1026                }
1027            }
1028            if !self.upper.is_empty() {
1029                writeln!(f, "upper:")?;
1030                for (ts, reason) in &self.upper {
1031                    let ts: Vec<_> = ts
1032                        .iter()
1033                        .map(|t| format!("{}", t.display(timeline)))
1034                        .collect();
1035                    writeln!(f, "  ({}): [{}]", reason, ts.join(", "))?;
1036                }
1037            }
1038            Ok(())
1039        }
1040    }
1041
1042    impl Debug for Constraints {
1043        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1044            self.display(None).fmt(f)?;
1045            Ok(())
1046        }
1047    }
1048
1049    impl Constraints {
1050        /// Remove constraints that are dominated by other constraints.
1051        ///
1052        /// This removes redundant constraints, without removing constraints
1053        /// that are "tight" in the sense that the interval would be
1054        /// meaningfully different without them.
1055        /// For example, two constraints at the same
1056        /// time will both be retained, in the interest of full information.
1057        /// But a lower bound constraint at time `t` will be removed if there is a
1058        /// constraint at time `t + 1` (or any larger time).
1059        pub fn minimize(&mut self) {
1060            // Establish the upper bound of lower constraints.
1061            let lower_frontier = self.lower_bound();
1062            // Retain constraints that intersect `lower_frontier`.
1063            self.lower.retain(|(anti, _)| {
1064                anti.iter()
1065                    .any(|time| lower_frontier.elements().contains(time))
1066            });
1067
1068            // Establish the lower bound of upper constraints.
1069            let upper_frontier = self.upper_bound();
1070            // Retain constraints that intersect `upper_frontier`.
1071            self.upper.retain(|(anti, _)| {
1072                anti.iter()
1073                    .any(|time| upper_frontier.elements().contains(time))
1074            });
1075        }
1076
1077        /// An antichain equal to the least upper bound of lower bounds.
1078        pub fn lower_bound(&self) -> Antichain<mz_repr::Timestamp> {
1079            let mut lower = Antichain::from_elem(mz_repr::Timestamp::minimum());
1080            for (anti, _) in self.lower.iter() {
1081                lower = lower.join(anti);
1082            }
1083            lower
1084        }
1085        /// An antichain equal to the greatest lower bound of upper bounds.
1086        pub fn upper_bound(&self) -> Antichain<mz_repr::Timestamp> {
1087            self.upper
1088                .iter()
1089                .flat_map(|(anti, _)| anti.iter())
1090                .cloned()
1091                .collect()
1092        }
1093    }
1094
1095    /// An explanation of reasons for a timestamp constraint.
1096    #[derive(Serialize, Deserialize, Clone)]
1097    pub enum Reason {
1098        /// A compute input at a compute instance.
1099        /// This is something like an index or view
1100        /// that is maintained by compute.
1101        ComputeInput(Vec<(ComputeInstanceId, GlobalId)>),
1102        /// A storage input.
1103        StorageInput(Vec<GlobalId>),
1104        /// A specified isolation level and the timestamp it requires.
1105        IsolationLevel(IsolationLevel),
1106        /// Real-time recency may constrain the timestamp from below.
1107        RealTimeRecency,
1108        /// The query expressed its own constraint on the timestamp.
1109        QueryAsOf,
1110    }
1111
1112    impl fmt::Display for Reason {
1113        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1114            match self {
1115                Reason::ComputeInput(ids) => {
1116                    let formatted: Vec<_> =
1117                        ids.iter().map(|(c, g)| format!("({}, {})", c, g)).collect();
1118                    write!(f, "Indexed inputs: [{}]", formatted.join(", "))
1119                }
1120                Reason::StorageInput(ids) => {
1121                    let formatted: Vec<_> = ids.iter().map(|g| format!("{}", g)).collect();
1122                    write!(f, "Storage inputs: [{}]", formatted.join(", "))
1123                }
1124                Reason::IsolationLevel(level) => {
1125                    write!(f, "Isolation level: {:?}", level)
1126                }
1127                Reason::RealTimeRecency => {
1128                    write!(f, "Real-time recency")
1129                }
1130                Reason::QueryAsOf => {
1131                    write!(f, "Query's AS OF")
1132                }
1133            }
1134        }
1135    }
1136
1137    /// Given an interval [read, write) of timestamp options,
1138    /// this expresses a preference for either end of the spectrum.
1139    pub enum Preference {
1140        /// Prefer the greatest timestamp immediately available.
1141        ///
1142        /// This considers the immediate inputs to a query and
1143        /// selects the greatest timestamp not greater or equal
1144        /// to any of their write frontiers.
1145        ///
1146        /// The preference only relates to immediate query inputs,
1147        /// but it could be extended to transitive inputs as well.
1148        /// For example, one could imagine preferring the freshest
1149        /// data known to be ingested into Materialize, under the
1150        /// premise that those answers should soon become available,
1151        /// and may be more fresh than the immediate inputs.
1152        FreshestAvailable,
1153        /// Prefer the least valid timestamp.
1154        ///
1155        /// This is useful when one has no expressed freshness
1156        /// constraints, and wants to minimally impact others.
1157        /// For example, `AS OF AT LEAST <time>`.
1158        StalestValid,
1159    }
1160}