Skip to main content

mz_adapter/coord/
timestamp_selection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for selecting timestamps for various operations on collections.
11
12use std::fmt;
13
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use constraints::Constraints;
17use differential_dataflow::lattice::Lattice;
18use itertools::Itertools;
19use mz_compute_types::ComputeInstanceId;
20use mz_ore::cast::CastLossy;
21use mz_repr::{GlobalId, Timestamp, TimestampManipulation};
22use mz_sql::plan::QueryWhen;
23use mz_sql::session::vars::IsolationLevel;
24use mz_storage_types::sources::Timeline;
25use serde::{Deserialize, Serialize};
26use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
27
28use crate::AdapterError;
29use crate::catalog::CatalogState;
30use crate::coord::Coordinator;
31use crate::coord::id_bundle::CollectionIdBundle;
32use crate::coord::read_policy::ReadHolds;
33use crate::coord::timeline::TimelineContext;
34use crate::session::Session;
35
36/// The timeline and timestamp context of a read.
37#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
38pub enum TimestampContext<T> {
39    /// Read is executed in a specific timeline with a specific timestamp.
40    TimelineTimestamp {
41        timeline: Timeline,
42        /// The timestamp that was chosen for a read. This can differ from the
43        /// `oracle_ts` when collections are not readable at the (linearized)
44        /// timestamp for the oracle. In those cases (when the chosen timestamp
45        /// is further ahead than the oracle timestamp) we have to delay
46        /// returning peek results until the timestamp oracle is also
47        /// sufficiently advanced.
48        chosen_ts: T,
49        /// The timestamp that would have been chosen for the read by the
50        /// (linearized) timestamp oracle). In most cases this will be picked as
51        /// the `chosen_ts`.
52        oracle_ts: Option<T>,
53    },
54    /// Read is executed without a timeline or timestamp.
55    NoTimestamp,
56}
57
58impl<T: TimestampManipulation> TimestampContext<T> {
59    /// Creates a `TimestampContext` from a timestamp and `TimelineContext`.
60    pub fn from_timeline_context(
61        chosen_ts: T,
62        oracle_ts: Option<T>,
63        transaction_timeline: Option<Timeline>,
64        timeline_context: &TimelineContext,
65    ) -> TimestampContext<T> {
66        match timeline_context {
67            TimelineContext::TimelineDependent(timeline) => {
68                if let Some(transaction_timeline) = transaction_timeline {
69                    assert_eq!(timeline, &transaction_timeline);
70                }
71                Self::TimelineTimestamp {
72                    timeline: timeline.clone(),
73                    chosen_ts,
74                    oracle_ts,
75                }
76            }
77            TimelineContext::TimestampDependent => {
78                // We default to the `Timeline::EpochMilliseconds` timeline if one doesn't exist.
79                Self::TimelineTimestamp {
80                    timeline: transaction_timeline.unwrap_or(Timeline::EpochMilliseconds),
81                    chosen_ts,
82                    oracle_ts,
83                }
84            }
85            TimelineContext::TimestampIndependent => Self::NoTimestamp,
86        }
87    }
88
89    /// The timeline belonging to this context, if one exists.
90    pub fn timeline(&self) -> Option<&Timeline> {
91        self.timeline_timestamp().map(|tt| tt.0)
92    }
93
94    /// The timestamp belonging to this context, if one exists.
95    pub fn timestamp(&self) -> Option<&T> {
96        self.timeline_timestamp().map(|tt| tt.1)
97    }
98
99    /// The timeline and timestamp belonging to this context, if one exists.
100    pub fn timeline_timestamp(&self) -> Option<(&Timeline, &T)> {
101        match self {
102            Self::TimelineTimestamp {
103                timeline,
104                chosen_ts,
105                ..
106            } => Some((timeline, chosen_ts)),
107            Self::NoTimestamp => None,
108        }
109    }
110
111    /// The timestamp belonging to this context, or a sensible default if one does not exists.
112    pub fn timestamp_or_default(&self) -> T {
113        match self {
114            Self::TimelineTimestamp { chosen_ts, .. } => chosen_ts.clone(),
115            // Anything without a timestamp is given the maximum possible timestamp to indicate
116            // that they have been closed up until the end of time. This allows us to SUBSCRIBE to
117            // static views.
118            Self::NoTimestamp => T::maximum(),
119        }
120    }
121
122    /// Whether or not the context contains a timestamp.
123    pub fn contains_timestamp(&self) -> bool {
124        self.timestamp().is_some()
125    }
126
127    /// Converts this `TimestampContext` to an `Antichain`.
128    pub fn antichain(&self) -> Antichain<T> {
129        Antichain::from_elem(self.timestamp_or_default())
130    }
131}
132
133#[async_trait(?Send)]
134impl TimestampProvider for Coordinator {
135    /// Reports a collection's current read frontier.
136    fn compute_read_frontier(
137        &self,
138        instance: ComputeInstanceId,
139        id: GlobalId,
140    ) -> Antichain<Timestamp> {
141        self.controller
142            .compute
143            .collection_frontiers(id, Some(instance))
144            .expect("id does not exist")
145            .read_frontier
146    }
147
148    /// Reports a collection's current write frontier.
149    fn compute_write_frontier(
150        &self,
151        instance: ComputeInstanceId,
152        id: GlobalId,
153    ) -> Antichain<Timestamp> {
154        self.controller
155            .compute
156            .collection_frontiers(id, Some(instance))
157            .expect("id does not exist")
158            .write_frontier
159    }
160
161    fn storage_frontiers(
162        &self,
163        ids: Vec<GlobalId>,
164    ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)> {
165        self.controller
166            .storage
167            .collections_frontiers(ids)
168            .expect("missing collections")
169    }
170
171    fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds<Timestamp> {
172        self.acquire_read_holds(id_bundle)
173    }
174
175    fn catalog_state(&self) -> &CatalogState {
176        self.catalog().state()
177    }
178}
179
180/// A timestamp determination, which includes the timestamp, constraints, and session oracle read
181/// timestamp.
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct RawTimestampDetermination<T> {
184    pub timestamp: T,
185    pub constraints: Constraints,
186    pub session_oracle_read_ts: Option<T>,
187}
188
189#[async_trait(?Send)]
190pub trait TimestampProvider {
191    fn compute_read_frontier(
192        &self,
193        instance: ComputeInstanceId,
194        id: GlobalId,
195    ) -> Antichain<Timestamp>;
196    fn compute_write_frontier(
197        &self,
198        instance: ComputeInstanceId,
199        id: GlobalId,
200    ) -> Antichain<Timestamp>;
201
202    /// Returns the implied capability (since) and write frontier (upper) for
203    /// the specified storage collections.
204    fn storage_frontiers(
205        &self,
206        ids: Vec<GlobalId>,
207    ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)>;
208
209    fn catalog_state(&self) -> &CatalogState;
210
211    fn get_timeline(timeline_context: &TimelineContext) -> Option<Timeline> {
212        let timeline = match timeline_context {
213            TimelineContext::TimelineDependent(timeline) => Some(timeline.clone()),
214            // We default to the `Timeline::EpochMilliseconds` timeline if one doesn't exist.
215            TimelineContext::TimestampDependent => Some(Timeline::EpochMilliseconds),
216            TimelineContext::TimestampIndependent => None,
217        };
218
219        timeline
220    }
221
222    /// Returns true if-and-only-if the given configuration needs a linearized
223    /// read timestamp from a timestamp oracle.
224    ///
225    /// This assumes that the query happens in the context of a timeline. If
226    /// there is no timeline, we cannot and don't have to get a linearized read
227    /// timestamp.
228    fn needs_linearized_read_ts(isolation_level: &IsolationLevel, when: &QueryWhen) -> bool {
229        // When we're in the context of a timeline (assumption) and one of these
230        // scenarios hold, we need to use a linearized read timestamp:
231        // - The isolation level is Strict Serializable and the `when` allows us to use the
232        //   the timestamp oracle (ex: queries with no AS OF).
233        // - The `when` requires us to use the timestamp oracle (ex: read-then-write queries).
234        when.must_advance_to_timeline_ts()
235            || (when.can_advance_to_timeline_ts()
236                && matches!(
237                    isolation_level,
238                    IsolationLevel::StrictSerializable | IsolationLevel::StrongSessionSerializable
239                ))
240    }
241
242    /// Uses constraints and preferences to determine a timestamp for a query.
243    /// Returns the determined timestamp, the constraints that were applied, and
244    /// session_oracle_read_ts.
245    fn determine_timestamp_via_constraints(
246        session: &Session,
247        read_holds: &ReadHolds<Timestamp>,
248        id_bundle: &CollectionIdBundle,
249        when: &QueryWhen,
250        oracle_read_ts: Option<Timestamp>,
251        real_time_recency_ts: Option<Timestamp>,
252        isolation_level: &IsolationLevel,
253        timeline: &Option<Timeline>,
254        largest_not_in_advance_of_upper: Timestamp,
255    ) -> Result<RawTimestampDetermination<Timestamp>, AdapterError> {
256        use constraints::{Constraints, Preference, Reason};
257
258        let mut session_oracle_read_ts = None;
259        // We start by establishing the hard constraints that must be applied to timestamp determination.
260        // These constraints are derived from the input arguments, and properties of the collections involved.
261        // TODO: Many of the constraints are expressed obliquely, and could be made more direct.
262        let constraints = {
263            // Constraints we will populate through a sequence of opinions.
264            let mut constraints = Constraints::default();
265
266            // First, we have validity constraints from the `id_bundle` argument which indicates
267            // which collections we are reading from.
268            // TODO: Refine the detail about which identifiers are binding and which are not.
269            // TODO(dov): It's not entirely clear to me that there ever would be a non
270            // binding constraint introduced by the `id_bundle`. We should revisit this.
271            let since = read_holds.least_valid_read();
272            let storage = id_bundle
273                .storage_ids
274                .iter()
275                .cloned()
276                .collect::<Vec<GlobalId>>();
277            if !storage.is_empty() {
278                constraints
279                    .lower
280                    .push((since.clone(), Reason::StorageInput(storage)));
281            }
282            let compute = id_bundle
283                .compute_ids
284                .iter()
285                .flat_map(|(key, ids)| ids.iter().map(|id| (*key, *id)))
286                .collect::<Vec<(ComputeInstanceId, GlobalId)>>();
287            if !compute.is_empty() {
288                constraints
289                    .lower
290                    .push((since.clone(), Reason::ComputeInput(compute)));
291            }
292
293            // The query's `when` may indicates a specific timestamp we must advance to, or a specific value we must use.
294            if let Some(ts) = when.advance_to_timestamp() {
295                constraints
296                    .lower
297                    .push((Antichain::from_elem(ts), Reason::QueryAsOf));
298                // If the query is at a specific timestamp, we must introduce an upper bound as well.
299                if when.constrains_upper() {
300                    constraints
301                        .upper
302                        .push((Antichain::from_elem(ts), Reason::QueryAsOf));
303                }
304            }
305
306            // The specification of an `oracle_read_ts` may indicates that we must advance to it,
307            // except in one isolation mode, or if `when` does not indicate that we should.
308            // At the moment, only `QueryWhen::FreshestTableWrite` indicates that we should.
309            // TODO: Should this just depend on the isolation level?
310            if let Some(timestamp) = &oracle_read_ts {
311                if isolation_level != &IsolationLevel::StrongSessionSerializable
312                    || when.must_advance_to_timeline_ts()
313                {
314                    // When specification of an `oracle_read_ts` is required, we must advance to it.
315                    // If it's not present, lets bail out.
316                    constraints.lower.push((
317                        Antichain::from_elem(*timestamp),
318                        Reason::IsolationLevel(*isolation_level),
319                    ));
320                }
321            }
322
323            // If a real time recency timestamp is supplied, we must advance to it.
324            if let Some(real_time_recency_ts) = real_time_recency_ts {
325                assert!(
326                    session.vars().real_time_recency()
327                        && isolation_level == &IsolationLevel::StrictSerializable,
328                    "real time recency timestamp should only be supplied when real time recency \
329                                is enabled and the isolation level is strict serializable"
330                );
331                constraints.lower.push((
332                    Antichain::from_elem(real_time_recency_ts),
333                    Reason::RealTimeRecency,
334                ));
335            }
336
337            // If we are operating in Strong Session Serializable, we use an alternate timestamp lower bound.
338            if isolation_level == &IsolationLevel::StrongSessionSerializable {
339                if let Some(timeline) = &timeline {
340                    if let Some(oracle) = session.get_timestamp_oracle(timeline) {
341                        let session_ts = oracle.read_ts();
342                        constraints.lower.push((
343                            Antichain::from_elem(session_ts),
344                            Reason::IsolationLevel(*isolation_level),
345                        ));
346                        session_oracle_read_ts = Some(session_ts);
347                    }
348
349                    // When advancing the read timestamp under Strong Session Serializable, there is a
350                    // trade-off to make between freshness and latency. We can choose a timestamp close the
351                    // `upper`, but then later queries might block if the `upper` is too far into the
352                    // future. We can chose a timestamp close to the current time, but then we may not be
353                    // getting results that are as fresh as possible. As a heuristic, we choose the minimum
354                    // of now and the upper, where we use the global timestamp oracle read timestamp as a
355                    // proxy for now. If upper > now, then we choose now and prevent blocking future
356                    // queries. If upper < now, then we choose the upper and prevent blocking the current
357                    // query.
358                    if when.can_advance_to_upper() && when.can_advance_to_timeline_ts() {
359                        let mut advance_to = largest_not_in_advance_of_upper;
360                        if let Some(oracle_read_ts) = oracle_read_ts {
361                            advance_to = std::cmp::min(advance_to, oracle_read_ts);
362                        }
363                        constraints.lower.push((
364                            Antichain::from_elem(advance_to),
365                            Reason::IsolationLevel(*isolation_level),
366                        ));
367                    }
368                }
369            }
370
371            constraints.minimize();
372            constraints
373        };
374
375        // Next we establish the preferences that we would like to apply to timestamp determination.
376        // Generally, we want to choose the freshest timestamp possible, although there are exceptions
377        // when we either want a maximally *stale* timestamp, or we want to protect other queries from
378        // a recklessly advanced timestamp.
379        let preferences = {
380            // Counter-intuitively, the only `when` that allows `can_advance_to_upper` is `Immediately`,
381            // and not `FreshestTableWrite`. This is because `FreshestTableWrite` instead imposes a lower
382            // bound through the `oracle_read_ts`, and then requires the stalest valid timestamp.
383
384            if when.can_advance_to_upper()
385                && (isolation_level == &IsolationLevel::Serializable || timeline.is_none())
386            {
387                Preference::FreshestAvailable
388            } else {
389                Preference::StalestValid
390            }
391
392            // TODO: `StrongSessionSerializable` has a different set of preferences that starts to tease
393            // out the trade-off between freshness and responsiveness. I think we don't yet know enough
394            // to properly frame these preferences, though they are clearly aimed at the right concerns.
395        };
396
397        // Determine a candidate based on constraints and preferences.
398        let constraint_candidate = {
399            let mut candidate = Timestamp::minimum();
400            // Note: These `advance_by` calls are no-ops if the given frontier is `[]`.
401            candidate.advance_by(constraints.lower_bound().borrow());
402            // If we have a preference to be the freshest available, advance to the minimum
403            // of the upper bound constraints and the `largest_not_in_advance_of_upper`.
404            if let Preference::FreshestAvailable = preferences {
405                let mut upper_bound = constraints.upper_bound();
406                upper_bound.insert(largest_not_in_advance_of_upper);
407                candidate.advance_by(upper_bound.borrow());
408            }
409            // If the candidate is strictly outside the constraints, we didn't have a viable
410            // timestamp. This can happen e.g. when the query has AS OF, or when the lower bound is
411            // `[]`.
412            if !constraints.lower_bound().less_equal(&candidate)
413                || constraints.upper_bound().less_than(&candidate)
414            {
415                return Err(AdapterError::ImpossibleTimestampConstraints {
416                    constraints: constraints.display(timeline.as_ref()).to_string(),
417                });
418            } else {
419                candidate
420            }
421        };
422
423        Ok(RawTimestampDetermination {
424            timestamp: constraint_candidate,
425            constraints,
426            session_oracle_read_ts,
427        })
428    }
429
430    /// Determines the timestamp for a query.
431    ///
432    /// Timestamp determination may fail due to the restricted validity of
433    /// traces. Each has a `since` and `upper` frontier, and are only valid
434    /// after `since` and sure to be available not after `upper`.
435    ///
436    /// The timeline that `id_bundle` belongs to is also returned, if one exists.
437    fn determine_timestamp_for(
438        &self,
439        session: &Session,
440        id_bundle: &CollectionIdBundle,
441        when: &QueryWhen,
442        timeline_context: &TimelineContext,
443        oracle_read_ts: Option<Timestamp>,
444        real_time_recency_ts: Option<Timestamp>,
445        isolation_level: &IsolationLevel,
446    ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
447        // First, we acquire read holds that will ensure the queried collections
448        // stay queryable at the chosen timestamp.
449        let read_holds = self.acquire_read_holds(id_bundle);
450
451        let upper = self.least_valid_write(id_bundle);
452
453        Self::determine_timestamp_for_inner(
454            session,
455            id_bundle,
456            when,
457            timeline_context,
458            oracle_read_ts,
459            real_time_recency_ts,
460            isolation_level,
461            read_holds,
462            upper,
463        )
464    }
465
466    /// Same as determine_timestamp_for, but read_holds and least_valid_write are already passed in.
467    fn determine_timestamp_for_inner(
468        session: &Session,
469        id_bundle: &CollectionIdBundle,
470        when: &QueryWhen,
471        timeline_context: &TimelineContext,
472        oracle_read_ts: Option<Timestamp>,
473        real_time_recency_ts: Option<Timestamp>,
474        isolation_level: &IsolationLevel,
475        read_holds: ReadHolds<Timestamp>,
476        upper: Antichain<Timestamp>,
477    ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
478        let timeline = Self::get_timeline(timeline_context);
479        let largest_not_in_advance_of_upper = Coordinator::largest_not_in_advance_of_upper(&upper);
480        let since = read_holds.least_valid_read();
481
482        // If the `since` is empty, then timestamp determination would fail. Let's return a more
483        // specific error in this case: Empty `since` frontiers happen here when collections were
484        // dropped concurrently with sequencing the query.
485        if since.is_empty() {
486            // Figure out what made the since frontier empty.
487            let mut unreadable_collections = Vec::new();
488            for (coll_id, hold) in read_holds.storage_holds {
489                if hold.since().is_empty() {
490                    unreadable_collections.push(coll_id);
491                }
492            }
493            for ((_instance_id, coll_id), hold) in read_holds.compute_holds {
494                if hold.since().is_empty() {
495                    unreadable_collections.push(coll_id);
496                }
497            }
498            return Err(AdapterError::CollectionUnreadable {
499                id: unreadable_collections.into_iter().join(", "),
500            });
501        }
502
503        let raw_determination = Self::determine_timestamp_via_constraints(
504            session,
505            &read_holds,
506            id_bundle,
507            when,
508            oracle_read_ts,
509            real_time_recency_ts,
510            isolation_level,
511            &timeline,
512            largest_not_in_advance_of_upper,
513        )?;
514
515        let timestamp_context = TimestampContext::from_timeline_context(
516            raw_determination.timestamp,
517            oracle_read_ts,
518            timeline,
519            timeline_context,
520        );
521
522        let determination = TimestampDetermination {
523            timestamp_context,
524            since,
525            upper,
526            largest_not_in_advance_of_upper,
527            oracle_read_ts,
528            session_oracle_read_ts: raw_determination.session_oracle_read_ts,
529            real_time_recency_ts,
530            constraints: raw_determination.constraints,
531        };
532
533        Ok((determination, read_holds))
534    }
535
536    /// Acquires [ReadHolds], for the given `id_bundle` at the earliest possible
537    /// times.
538    fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds<mz_repr::Timestamp>;
539
540    /// The smallest common valid write frontier among the specified collections.
541    ///
542    /// Times that are not greater or equal to this frontier are complete for all collections
543    /// identified as arguments.
544    fn least_valid_write(&self, id_bundle: &CollectionIdBundle) -> Antichain<mz_repr::Timestamp> {
545        let mut upper = Antichain::new();
546        {
547            for (_id, _since, collection_upper) in
548                self.storage_frontiers(id_bundle.storage_ids.iter().cloned().collect_vec())
549            {
550                upper.extend(collection_upper);
551            }
552        }
553        {
554            for (instance, compute_ids) in &id_bundle.compute_ids {
555                for id in compute_ids.iter() {
556                    upper.extend(self.compute_write_frontier(*instance, *id).into_iter());
557                }
558            }
559        }
560        upper
561    }
562
563    /// Returns `least_valid_write` - 1, i.e., each time in `least_valid_write` stepped back in a
564    /// saturating way.
565    fn greatest_available_read(&self, id_bundle: &CollectionIdBundle) -> Antichain<Timestamp> {
566        let mut frontier = Antichain::new();
567        for t in self.least_valid_write(id_bundle) {
568            frontier.insert(t.step_back().unwrap_or(t));
569        }
570        frontier
571    }
572}
573
574impl Coordinator {
575    pub(crate) async fn oracle_read_ts(
576        &self,
577        session: &Session,
578        timeline_ctx: &TimelineContext,
579        when: &QueryWhen,
580    ) -> Option<Timestamp> {
581        let isolation_level = session.vars().transaction_isolation().clone();
582        let timeline = Coordinator::get_timeline(timeline_ctx);
583        let needs_linearized_read_ts =
584            Coordinator::needs_linearized_read_ts(&isolation_level, when);
585
586        let oracle_read_ts = match timeline {
587            Some(timeline) if needs_linearized_read_ts => {
588                let timestamp_oracle = self.get_timestamp_oracle(&timeline);
589                Some(timestamp_oracle.read_ts().await)
590            }
591            Some(_) | None => None,
592        };
593
594        oracle_read_ts
595    }
596
597    /// Determines the timestamp for a query, acquires read holds that ensure the
598    /// query remains executable at that time, and returns those.
599    /// The caller is responsible for eventually dropping those read holds.
600    #[mz_ore::instrument(level = "debug")]
601    pub(crate) fn determine_timestamp(
602        &self,
603        session: &Session,
604        id_bundle: &CollectionIdBundle,
605        when: &QueryWhen,
606        compute_instance: ComputeInstanceId,
607        timeline_context: &TimelineContext,
608        oracle_read_ts: Option<Timestamp>,
609        real_time_recency_ts: Option<mz_repr::Timestamp>,
610    ) -> Result<
611        (
612            TimestampDetermination<mz_repr::Timestamp>,
613            ReadHolds<mz_repr::Timestamp>,
614        ),
615        AdapterError,
616    > {
617        let isolation_level = session.vars().transaction_isolation();
618        let (det, read_holds) = self.determine_timestamp_for(
619            session,
620            id_bundle,
621            when,
622            timeline_context,
623            oracle_read_ts,
624            real_time_recency_ts,
625            isolation_level,
626        )?;
627        self.metrics
628            .determine_timestamp
629            .with_label_values(&[
630                match det.respond_immediately() {
631                    true => "true",
632                    false => "false",
633                },
634                isolation_level.as_str(),
635                &compute_instance.to_string(),
636            ])
637            .inc();
638        if !det.respond_immediately()
639            && isolation_level == &IsolationLevel::StrictSerializable
640            && real_time_recency_ts.is_none()
641        {
642            // Note down the difference between StrictSerializable and Serializable into a metric.
643            if let Some(strict) = det.timestamp_context.timestamp() {
644                let (serializable_det, _tmp_read_holds) = self.determine_timestamp_for(
645                    session,
646                    id_bundle,
647                    when,
648                    timeline_context,
649                    oracle_read_ts,
650                    real_time_recency_ts,
651                    &IsolationLevel::Serializable,
652                )?;
653
654                if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
655                    self.metrics
656                        .timestamp_difference_for_strict_serializable_ms
657                        .with_label_values(&[compute_instance.to_string().as_str()])
658                        .observe(f64::cast_lossy(u64::from(
659                            strict.saturating_sub(*serializable),
660                        )));
661                }
662            }
663        }
664        Ok((det, read_holds))
665    }
666
667    /// The largest timestamp not greater or equal to an element of `upper`.
668    ///
669    /// If no such timestamp exists, for example because `upper` contains only the
670    /// minimal timestamp, the return value is `Timestamp::minimum()`.
671    pub(crate) fn largest_not_in_advance_of_upper(
672        upper: &Antichain<mz_repr::Timestamp>,
673    ) -> mz_repr::Timestamp {
674        // We peek at the largest element not in advance of `upper`, which
675        // involves a subtraction. If `upper` contains a zero timestamp there
676        // is no "prior" answer, and we do not want to peek at it as it risks
677        // hanging awaiting the response to data that may never arrive.
678        if let Some(upper) = upper.as_option() {
679            upper.step_back().unwrap_or_else(Timestamp::minimum)
680        } else {
681            // A complete trace can be read in its final form with this time.
682            //
683            // This should only happen for literals that have no sources or sources that
684            // are known to have completed (non-tailed files for example).
685            Timestamp::MAX
686        }
687    }
688}
689
690/// Information used when determining the timestamp for a query.
691#[derive(Serialize, Deserialize, Debug, Clone)]
692pub struct TimestampDetermination<T> {
693    /// The chosen timestamp context from `determine_timestamp`.
694    pub timestamp_context: TimestampContext<T>,
695    /// The read frontier of all involved sources.
696    pub since: Antichain<T>,
697    /// The write frontier of all involved sources.
698    pub upper: Antichain<T>,
699    /// The largest timestamp not in advance of upper.
700    pub largest_not_in_advance_of_upper: T,
701    /// The value of the timeline's oracle timestamp, if used.
702    pub oracle_read_ts: Option<T>,
703    /// The value of the session local timestamp's oracle timestamp, if used.
704    pub session_oracle_read_ts: Option<T>,
705    /// The value of the real time recency timestamp, if used.
706    pub real_time_recency_ts: Option<T>,
707    /// The constraints used by the constraint based solver.
708    /// See the [`constraints`] module for more information.
709    pub constraints: Constraints,
710}
711
712impl<T: TimestampManipulation> TimestampDetermination<T> {
713    pub fn respond_immediately(&self) -> bool {
714        match &self.timestamp_context {
715            TimestampContext::TimelineTimestamp { chosen_ts, .. } => {
716                !self.upper.less_equal(chosen_ts)
717            }
718            TimestampContext::NoTimestamp => true,
719        }
720    }
721}
722
723/// Information used when determining the timestamp for a query.
724#[derive(Clone, Debug, Serialize, Deserialize)]
725pub struct TimestampExplanation<T> {
726    /// The chosen timestamp from `determine_timestamp`.
727    pub determination: TimestampDetermination<T>,
728    /// Details about each source.
729    pub sources: Vec<TimestampSource<T>>,
730    /// Wall time of first statement executed in this transaction
731    pub session_wall_time: DateTime<Utc>,
732    /// Cached value of determination.respond_immediately()
733    pub respond_immediately: bool,
734}
735
736#[derive(Clone, Debug, Serialize, Deserialize)]
737pub struct TimestampSource<T> {
738    pub name: String,
739    pub read_frontier: Vec<T>,
740    pub write_frontier: Vec<T>,
741}
742
743pub trait DisplayableInTimeline {
744    fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result;
745    fn display<'a>(&'a self, timeline: Option<&'a Timeline>) -> DisplayInTimeline<'a, Self> {
746        DisplayInTimeline { t: self, timeline }
747    }
748}
749
750impl DisplayableInTimeline for mz_repr::Timestamp {
751    fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
752        if let Some(Timeline::EpochMilliseconds) = timeline {
753            let ts_ms: u64 = self.into();
754            if let Ok(ts_ms) = i64::try_from(ts_ms) {
755                if let Some(ndt) = DateTime::from_timestamp_millis(ts_ms) {
756                    return write!(f, "{:13} ({})", self, ndt.format("%Y-%m-%d %H:%M:%S%.3f"));
757                }
758            }
759        }
760        write!(f, "{:13}", self)
761    }
762}
763
764pub struct DisplayInTimeline<'a, T: ?Sized> {
765    t: &'a T,
766    timeline: Option<&'a Timeline>,
767}
768impl<'a, T> fmt::Display for DisplayInTimeline<'a, T>
769where
770    T: DisplayableInTimeline,
771{
772    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
773        self.t.fmt(self.timeline, f)
774    }
775}
776
777impl<'a, T> fmt::Debug for DisplayInTimeline<'a, T>
778where
779    T: DisplayableInTimeline,
780{
781    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
782        fmt::Display::fmt(&self, f)
783    }
784}
785
786impl<T: fmt::Display + fmt::Debug + DisplayableInTimeline + TimestampManipulation> fmt::Display
787    for TimestampExplanation<T>
788{
789    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
790        let timeline = self.determination.timestamp_context.timeline();
791        writeln!(
792            f,
793            "                query timestamp: {}",
794            self.determination
795                .timestamp_context
796                .timestamp_or_default()
797                .display(timeline)
798        )?;
799        if let Some(oracle_read_ts) = &self.determination.oracle_read_ts {
800            writeln!(
801                f,
802                "          oracle read timestamp: {}",
803                oracle_read_ts.display(timeline)
804            )?;
805        }
806        if let Some(session_oracle_read_ts) = &self.determination.session_oracle_read_ts {
807            writeln!(
808                f,
809                "  session oracle read timestamp: {}",
810                session_oracle_read_ts.display(timeline)
811            )?;
812        }
813        if let Some(real_time_recency_ts) = &self.determination.real_time_recency_ts {
814            writeln!(
815                f,
816                "    real time recency timestamp: {}",
817                real_time_recency_ts.display(timeline)
818            )?;
819        }
820        writeln!(
821            f,
822            "largest not in advance of upper: {}",
823            self.determination
824                .largest_not_in_advance_of_upper
825                .display(timeline),
826        )?;
827        writeln!(
828            f,
829            "                          upper:{:?}",
830            self.determination
831                .upper
832                .iter()
833                .map(|t| t.display(timeline))
834                .collect::<Vec<_>>()
835        )?;
836        writeln!(
837            f,
838            "                          since:{:?}",
839            self.determination
840                .since
841                .iter()
842                .map(|t| t.display(timeline))
843                .collect::<Vec<_>>()
844        )?;
845        writeln!(
846            f,
847            "        can respond immediately: {}",
848            self.respond_immediately
849        )?;
850        writeln!(f, "                       timeline: {:?}", &timeline)?;
851        writeln!(
852            f,
853            "              session wall time: {:13} ({})",
854            self.session_wall_time.timestamp_millis(),
855            self.session_wall_time.format("%Y-%m-%d %H:%M:%S%.3f"),
856        )?;
857
858        for source in &self.sources {
859            writeln!(f, "")?;
860            writeln!(f, "source {}:", source.name)?;
861            writeln!(
862                f,
863                "                  read frontier:{:?}",
864                source
865                    .read_frontier
866                    .iter()
867                    .map(|t| t.display(timeline))
868                    .collect::<Vec<_>>()
869            )?;
870            writeln!(
871                f,
872                "                 write frontier:{:?}",
873                source
874                    .write_frontier
875                    .iter()
876                    .map(|t| t.display(timeline))
877                    .collect::<Vec<_>>()
878            )?;
879        }
880
881        writeln!(f, "")?;
882        writeln!(f, "binding constraints:")?;
883        write!(f, "{}", self.determination.constraints.display(timeline))?;
884
885        Ok(())
886    }
887}
888
889/// Types and logic in support of a constraint-based approach to timestamp determination.
890mod constraints {
891
892    use core::fmt;
893    use std::fmt::Debug;
894
895    use differential_dataflow::lattice::Lattice;
896    use mz_storage_types::sources::Timeline;
897    use serde::{Deserialize, Serialize};
898    use timely::progress::{Antichain, Timestamp};
899
900    use mz_compute_types::ComputeInstanceId;
901    use mz_repr::GlobalId;
902    use mz_sql::session::vars::IsolationLevel;
903
904    use super::DisplayableInTimeline;
905
906    /// Constraints expressed on the timestamp of a query.
907    ///
908    /// The constraints are expressed on the minimum and maximum values,
909    /// resulting in a (possibly empty) interval of valid timestamps.
910    ///
911    /// The constraints may be redundant, in the interest of providing
912    /// more complete explanations, but they may also be minimized at
913    /// any point without altering their behavior by removing redundant
914    /// constraints.
915    ///
916    /// When combined with a `Preference` one can determine an
917    /// ideal timestamp to use.
918    #[derive(Default, Serialize, Deserialize, Clone)]
919    pub struct Constraints {
920        /// Timestamps and reasons that impose an inclusive lower bound.
921        pub lower: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
922        /// Timestamps and reasons that impose an inclusive upper bound.
923        pub upper: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
924    }
925
926    impl DisplayableInTimeline for Constraints {
927        fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
928            if !self.lower.is_empty() {
929                writeln!(f, "lower:")?;
930                for (ts, reason) in &self.lower {
931                    let ts: Vec<_> = ts
932                        .iter()
933                        .map(|t| format!("{}", t.display(timeline)))
934                        .collect();
935                    writeln!(f, "  ({}): [{}]", reason, ts.join(", "))?;
936                }
937            }
938            if !self.upper.is_empty() {
939                writeln!(f, "upper:")?;
940                for (ts, reason) in &self.upper {
941                    let ts: Vec<_> = ts
942                        .iter()
943                        .map(|t| format!("{}", t.display(timeline)))
944                        .collect();
945                    writeln!(f, "  ({}): [{}]", reason, ts.join(", "))?;
946                }
947            }
948            Ok(())
949        }
950    }
951
952    impl Debug for Constraints {
953        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
954            self.display(None).fmt(f)?;
955            Ok(())
956        }
957    }
958
959    impl Constraints {
960        /// Remove constraints that are dominated by other constraints.
961        ///
962        /// This removes redundant constraints, without removing constraints
963        /// that are "tight" in the sense that the interval would be
964        /// meaningfully different without them.
965        /// For example, two constraints at the same
966        /// time will both be retained, in the interest of full information.
967        /// But a lower bound constraint at time `t` will be removed if there is a
968        /// constraint at time `t + 1` (or any larger time).
969        pub fn minimize(&mut self) {
970            // Establish the upper bound of lower constraints.
971            let lower_frontier = self.lower_bound();
972            // Retain constraints that intersect `lower_frontier`.
973            self.lower.retain(|(anti, _)| {
974                anti.iter()
975                    .any(|time| lower_frontier.elements().contains(time))
976            });
977
978            // Establish the lower bound of upper constraints.
979            let upper_frontier = self.upper_bound();
980            // Retain constraints that intersect `upper_frontier`.
981            self.upper.retain(|(anti, _)| {
982                anti.iter()
983                    .any(|time| upper_frontier.elements().contains(time))
984            });
985        }
986
987        /// An antichain equal to the least upper bound of lower bounds.
988        pub fn lower_bound(&self) -> Antichain<mz_repr::Timestamp> {
989            let mut lower = Antichain::from_elem(mz_repr::Timestamp::minimum());
990            for (anti, _) in self.lower.iter() {
991                lower = lower.join(anti);
992            }
993            lower
994        }
995        /// An antichain equal to the greatest lower bound of upper bounds.
996        pub fn upper_bound(&self) -> Antichain<mz_repr::Timestamp> {
997            self.upper
998                .iter()
999                .flat_map(|(anti, _)| anti.iter())
1000                .cloned()
1001                .collect()
1002        }
1003    }
1004
1005    /// An explanation of reasons for a timestamp constraint.
1006    #[derive(Serialize, Deserialize, Clone)]
1007    pub enum Reason {
1008        /// A compute input at a compute instance.
1009        /// This is something like an index or view
1010        /// that is maintained by compute.
1011        ComputeInput(Vec<(ComputeInstanceId, GlobalId)>),
1012        /// A storage input.
1013        StorageInput(Vec<GlobalId>),
1014        /// A specified isolation level and the timestamp it requires.
1015        IsolationLevel(IsolationLevel),
1016        /// Real-time recency may constrain the timestamp from below.
1017        RealTimeRecency,
1018        /// The query expressed its own constraint on the timestamp.
1019        QueryAsOf,
1020    }
1021
1022    impl fmt::Display for Reason {
1023        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1024            match self {
1025                Reason::ComputeInput(ids) => {
1026                    let formatted: Vec<_> =
1027                        ids.iter().map(|(c, g)| format!("({}, {})", c, g)).collect();
1028                    write!(f, "Indexed inputs: [{}]", formatted.join(", "))
1029                }
1030                Reason::StorageInput(ids) => {
1031                    let formatted: Vec<_> = ids.iter().map(|g| format!("{}", g)).collect();
1032                    write!(f, "Storage inputs: [{}]", formatted.join(", "))
1033                }
1034                Reason::IsolationLevel(level) => {
1035                    write!(f, "Isolation level: {:?}", level)
1036                }
1037                Reason::RealTimeRecency => {
1038                    write!(f, "Real-time recency")
1039                }
1040                Reason::QueryAsOf => {
1041                    write!(f, "Query's AS OF")
1042                }
1043            }
1044        }
1045    }
1046
1047    /// Given an interval [read, write) of timestamp options,
1048    /// this expresses a preference for either end of the spectrum.
1049    pub enum Preference {
1050        /// Prefer the greatest timestamp immediately available.
1051        ///
1052        /// This considers the immediate inputs to a query and
1053        /// selects the greatest timestamp not greater or equal
1054        /// to any of their write frontiers.
1055        ///
1056        /// The preference only relates to immediate query inputs,
1057        /// but it could be extended to transitive inputs as well.
1058        /// For example, one could imagine preferring the freshest
1059        /// data known to be ingested into Materialize, under the
1060        /// premise that those answers should soon become available,
1061        /// and may be more fresh than the immediate inputs.
1062        FreshestAvailable,
1063        /// Prefer the least valid timestamp.
1064        ///
1065        /// This is useful when one has no expressed freshness
1066        /// constraints, and wants to minimally impact others.
1067        /// For example, `AS OF AT LEAST <time>`.
1068        StalestValid,
1069    }
1070}