Skip to main content

mz_adapter/coord/
timestamp_selection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic for selecting timestamps for various operations on collections.
11
12use std::fmt;
13
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use constraints::Constraints;
17use differential_dataflow::lattice::Lattice;
18use itertools::Itertools;
19use mz_compute_types::ComputeInstanceId;
20use mz_ore::cast::CastLossy;
21use mz_repr::{GlobalId, Timestamp, TimestampManipulation};
22use mz_sql::plan::QueryWhen;
23use mz_sql::session::vars::IsolationLevel;
24use mz_storage_types::sources::Timeline;
25use serde::{Deserialize, Serialize};
26use timely::progress::{Antichain, Timestamp as _};
27
28use crate::AdapterError;
29use crate::catalog::CatalogState;
30use crate::coord::Coordinator;
31use crate::coord::id_bundle::CollectionIdBundle;
32use crate::coord::read_policy::ReadHolds;
33use crate::coord::timeline::TimelineContext;
34use crate::session::Session;
35
36/// The timeline and timestamp context of a read.
37#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
38pub enum TimestampContext {
39    /// Read is executed in a specific timeline with a specific timestamp.
40    TimelineTimestamp {
41        timeline: Timeline,
42        /// The timestamp that was chosen for a read. This can differ from the
43        /// `oracle_ts` when collections are not readable at the (linearized)
44        /// timestamp for the oracle. In those cases (when the chosen timestamp
45        /// is further ahead than the oracle timestamp) we have to delay
46        /// returning peek results until the timestamp oracle is also
47        /// sufficiently advanced.
48        chosen_ts: Timestamp,
49        /// The timestamp that would have been chosen for the read by the
50        /// (linearized) timestamp oracle). In most cases this will be picked as
51        /// the `chosen_ts`.
52        oracle_ts: Option<Timestamp>,
53    },
54    /// Read is executed without a timeline or timestamp.
55    NoTimestamp,
56}
57
58impl TimestampContext {
59    /// Creates a `TimestampContext` from a timestamp and `TimelineContext`.
60    pub fn from_timeline_context(
61        chosen_ts: Timestamp,
62        oracle_ts: Option<Timestamp>,
63        transaction_timeline: Option<Timeline>,
64        timeline_context: &TimelineContext,
65    ) -> TimestampContext {
66        match timeline_context {
67            TimelineContext::TimelineDependent(timeline) => {
68                if let Some(transaction_timeline) = transaction_timeline {
69                    assert_eq!(timeline, &transaction_timeline);
70                }
71                Self::TimelineTimestamp {
72                    timeline: timeline.clone(),
73                    chosen_ts,
74                    oracle_ts,
75                }
76            }
77            TimelineContext::TimestampDependent => {
78                // We default to the `Timeline::EpochMilliseconds` timeline if one doesn't exist.
79                Self::TimelineTimestamp {
80                    timeline: transaction_timeline.unwrap_or(Timeline::EpochMilliseconds),
81                    chosen_ts,
82                    oracle_ts,
83                }
84            }
85            TimelineContext::TimestampIndependent => Self::NoTimestamp,
86        }
87    }
88
89    /// The timeline belonging to this context, if one exists.
90    pub fn timeline(&self) -> Option<&Timeline> {
91        self.timeline_timestamp().map(|tt| tt.0)
92    }
93
94    /// The timestamp belonging to this context, if one exists.
95    pub fn timestamp(&self) -> Option<&Timestamp> {
96        self.timeline_timestamp().map(|tt| tt.1)
97    }
98
99    /// The timeline and timestamp belonging to this context, if one exists.
100    pub fn timeline_timestamp(&self) -> Option<(&Timeline, &Timestamp)> {
101        match self {
102            Self::TimelineTimestamp {
103                timeline,
104                chosen_ts,
105                ..
106            } => Some((timeline, chosen_ts)),
107            Self::NoTimestamp => None,
108        }
109    }
110
111    /// The timestamp belonging to this context, or a sensible default if one does not exists.
112    pub fn timestamp_or_default(&self) -> Timestamp {
113        match self {
114            Self::TimelineTimestamp { chosen_ts, .. } => chosen_ts.clone(),
115            // Anything without a timestamp is given the maximum possible timestamp to indicate
116            // that they have been closed up until the end of time. This allows us to SUBSCRIBE to
117            // static views.
118            Self::NoTimestamp => Timestamp::maximum(),
119        }
120    }
121
122    /// Whether or not the context contains a timestamp.
123    pub fn contains_timestamp(&self) -> bool {
124        self.timestamp().is_some()
125    }
126
127    /// Converts this `TimestampContext` to an `Antichain`.
128    pub fn antichain(&self) -> Antichain<Timestamp> {
129        Antichain::from_elem(self.timestamp_or_default())
130    }
131}
132
133#[async_trait(?Send)]
134impl TimestampProvider for Coordinator {
135    /// Reports a collection's current read frontier.
136    fn compute_read_frontier(
137        &self,
138        instance: ComputeInstanceId,
139        id: GlobalId,
140    ) -> Antichain<Timestamp> {
141        self.controller
142            .compute
143            .collection_frontiers(id, Some(instance))
144            .expect("id does not exist")
145            .read_frontier
146    }
147
148    /// Reports a collection's current write frontier.
149    fn compute_write_frontier(
150        &self,
151        instance: ComputeInstanceId,
152        id: GlobalId,
153    ) -> Antichain<Timestamp> {
154        self.controller
155            .compute
156            .collection_frontiers(id, Some(instance))
157            .expect("id does not exist")
158            .write_frontier
159    }
160
161    fn storage_frontiers(
162        &self,
163        ids: Vec<GlobalId>,
164    ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)> {
165        self.controller
166            .storage
167            .collections_frontiers(ids)
168            .expect("missing collections")
169    }
170
171    fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds {
172        self.acquire_read_holds(id_bundle)
173    }
174
175    fn catalog_state(&self) -> &CatalogState {
176        self.catalog().state()
177    }
178}
179
180/// A timestamp determination, which includes the timestamp, constraints, and session oracle read
181/// timestamp.
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct RawTimestampDetermination {
184    pub timestamp: Timestamp,
185    pub constraints: Constraints,
186    pub session_oracle_read_ts: Option<Timestamp>,
187}
188
189#[async_trait(?Send)]
190pub trait TimestampProvider {
191    fn compute_read_frontier(
192        &self,
193        instance: ComputeInstanceId,
194        id: GlobalId,
195    ) -> Antichain<Timestamp>;
196    fn compute_write_frontier(
197        &self,
198        instance: ComputeInstanceId,
199        id: GlobalId,
200    ) -> Antichain<Timestamp>;
201
202    /// Returns the implied capability (since) and write frontier (upper) for
203    /// the specified storage collections.
204    fn storage_frontiers(
205        &self,
206        ids: Vec<GlobalId>,
207    ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)>;
208
209    fn catalog_state(&self) -> &CatalogState;
210
211    fn get_timeline(timeline_context: &TimelineContext) -> Option<Timeline> {
212        let timeline = match timeline_context {
213            TimelineContext::TimelineDependent(timeline) => Some(timeline.clone()),
214            // We default to the `Timeline::EpochMilliseconds` timeline if one doesn't exist.
215            TimelineContext::TimestampDependent => Some(Timeline::EpochMilliseconds),
216            TimelineContext::TimestampIndependent => None,
217        };
218
219        timeline
220    }
221
222    /// Returns true if-and-only-if the given configuration needs a linearized
223    /// read timestamp from a timestamp oracle.
224    ///
225    /// This assumes that the query happens in the context of a timeline. If
226    /// there is no timeline, we cannot and don't have to get a linearized read
227    /// timestamp.
228    fn needs_linearized_read_ts(isolation_level: &IsolationLevel, when: &QueryWhen) -> bool {
229        // When we're in the context of a timeline (assumption) and one of these
230        // scenarios hold, we need to use a linearized read timestamp:
231        // - The isolation level is Strict Serializable and the `when` allows us to use the
232        //   the timestamp oracle (ex: queries with no AS OF).
233        // - The `when` requires us to use the timestamp oracle (ex: read-then-write queries).
234        when.must_advance_to_timeline_ts()
235            || (when.can_advance_to_timeline_ts()
236                && matches!(
237                    isolation_level,
238                    IsolationLevel::StrictSerializable | IsolationLevel::StrongSessionSerializable
239                ))
240    }
241
242    /// Uses constraints and preferences to determine a timestamp for a query.
243    /// Returns the determined timestamp, the constraints that were applied, and
244    /// session_oracle_read_ts.
245    fn determine_timestamp_via_constraints(
246        session: &Session,
247        read_holds: &ReadHolds,
248        id_bundle: &CollectionIdBundle,
249        when: &QueryWhen,
250        oracle_read_ts: Option<Timestamp>,
251        real_time_recency_ts: Option<Timestamp>,
252        isolation_level: &IsolationLevel,
253        timeline: &Option<Timeline>,
254        largest_not_in_advance_of_upper: Timestamp,
255    ) -> Result<RawTimestampDetermination, AdapterError> {
256        use constraints::{Constraints, Preference, Reason};
257
258        let mut session_oracle_read_ts = None;
259        // We start by establishing the hard constraints that must be applied to timestamp determination.
260        // These constraints are derived from the input arguments, and properties of the collections involved.
261        // TODO: Many of the constraints are expressed obliquely, and could be made more direct.
262        let constraints = {
263            // Constraints we will populate through a sequence of opinions.
264            let mut constraints = Constraints::default();
265
266            // First, we have validity constraints from the `id_bundle` argument which indicates
267            // which collections we are reading from.
268            // TODO: Refine the detail about which identifiers are binding and which are not.
269            // TODO(dov): It's not entirely clear to me that there ever would be a non
270            // binding constraint introduced by the `id_bundle`. We should revisit this.
271            let since = read_holds.least_valid_read();
272            let storage = id_bundle
273                .storage_ids
274                .iter()
275                .cloned()
276                .collect::<Vec<GlobalId>>();
277            if !storage.is_empty() {
278                constraints
279                    .lower
280                    .push((since.clone(), Reason::StorageInput(storage)));
281            }
282            let compute = id_bundle
283                .compute_ids
284                .iter()
285                .flat_map(|(key, ids)| ids.iter().map(|id| (*key, *id)))
286                .collect::<Vec<(ComputeInstanceId, GlobalId)>>();
287            if !compute.is_empty() {
288                constraints
289                    .lower
290                    .push((since.clone(), Reason::ComputeInput(compute)));
291            }
292
293            // The query's `when` may indicates a specific timestamp we must advance to, or a specific value we must use.
294            if let Some(ts) = when.advance_to_timestamp() {
295                constraints
296                    .lower
297                    .push((Antichain::from_elem(ts), Reason::QueryAsOf));
298                // If the query is at a specific timestamp, we must introduce an upper bound as well.
299                if when.constrains_upper() {
300                    constraints
301                        .upper
302                        .push((Antichain::from_elem(ts), Reason::QueryAsOf));
303                }
304            }
305
306            // The specification of an `oracle_read_ts` may indicates that we must advance to it,
307            // except in one isolation mode, or if `when` does not indicate that we should.
308            // At the moment, only `QueryWhen::FreshestTableWrite` indicates that we should.
309            // TODO: Should this just depend on the isolation level?
310            if let Some(timestamp) = &oracle_read_ts {
311                if isolation_level != &IsolationLevel::StrongSessionSerializable
312                    || when.must_advance_to_timeline_ts()
313                {
314                    // When specification of an `oracle_read_ts` is required, we must advance to it.
315                    // If it's not present, lets bail out.
316                    constraints.lower.push((
317                        Antichain::from_elem(*timestamp),
318                        Reason::IsolationLevel(*isolation_level),
319                    ));
320                }
321            }
322
323            // If a real time recency timestamp is supplied, we must advance to it.
324            if let Some(real_time_recency_ts) = real_time_recency_ts {
325                assert!(
326                    session.vars().real_time_recency()
327                        && isolation_level == &IsolationLevel::StrictSerializable,
328                    "real time recency timestamp should only be supplied when real time recency \
329                                is enabled and the isolation level is strict serializable"
330                );
331                constraints.lower.push((
332                    Antichain::from_elem(real_time_recency_ts),
333                    Reason::RealTimeRecency,
334                ));
335            }
336
337            // If we are operating in Strong Session Serializable, we use an alternate timestamp lower bound.
338            if isolation_level == &IsolationLevel::StrongSessionSerializable {
339                if let Some(timeline) = &timeline {
340                    if let Some(oracle) = session.get_timestamp_oracle(timeline) {
341                        let session_ts = oracle.read_ts();
342                        constraints.lower.push((
343                            Antichain::from_elem(session_ts),
344                            Reason::IsolationLevel(*isolation_level),
345                        ));
346                        session_oracle_read_ts = Some(session_ts);
347                    }
348
349                    // When advancing the read timestamp under Strong Session Serializable, there is a
350                    // trade-off to make between freshness and latency. We can choose a timestamp close the
351                    // `upper`, but then later queries might block if the `upper` is too far into the
352                    // future. We can chose a timestamp close to the current time, but then we may not be
353                    // getting results that are as fresh as possible. As a heuristic, we choose the minimum
354                    // of now and the upper, where we use the global timestamp oracle read timestamp as a
355                    // proxy for now. If upper > now, then we choose now and prevent blocking future
356                    // queries. If upper < now, then we choose the upper and prevent blocking the current
357                    // query.
358                    if when.can_advance_to_upper() && when.can_advance_to_timeline_ts() {
359                        let mut advance_to = largest_not_in_advance_of_upper;
360                        if let Some(oracle_read_ts) = oracle_read_ts {
361                            advance_to = std::cmp::min(advance_to, oracle_read_ts);
362                        }
363                        constraints.lower.push((
364                            Antichain::from_elem(advance_to),
365                            Reason::IsolationLevel(*isolation_level),
366                        ));
367                    }
368                }
369            }
370
371            constraints.minimize();
372            constraints
373        };
374
375        // Next we establish the preferences that we would like to apply to timestamp determination.
376        // Generally, we want to choose the freshest timestamp possible, although there are exceptions
377        // when we either want a maximally *stale* timestamp, or we want to protect other queries from
378        // a recklessly advanced timestamp.
379        let preferences = {
380            // Counter-intuitively, the only `when` that allows `can_advance_to_upper` is `Immediately`,
381            // and not `FreshestTableWrite`. This is because `FreshestTableWrite` instead imposes a lower
382            // bound through the `oracle_read_ts`, and then requires the stalest valid timestamp.
383
384            if when.can_advance_to_upper()
385                && (isolation_level == &IsolationLevel::Serializable || timeline.is_none())
386            {
387                Preference::FreshestAvailable
388            } else {
389                Preference::StalestValid
390            }
391
392            // TODO: `StrongSessionSerializable` has a different set of preferences that starts to tease
393            // out the trade-off between freshness and responsiveness. I think we don't yet know enough
394            // to properly frame these preferences, though they are clearly aimed at the right concerns.
395        };
396
397        // Determine a candidate based on constraints and preferences.
398        let constraint_candidate = {
399            let mut candidate = Timestamp::minimum();
400            // Note: These `advance_by` calls are no-ops if the given frontier is `[]`.
401            candidate.advance_by(constraints.lower_bound().borrow());
402            // If we have a preference to be the freshest available, advance to the minimum
403            // of the upper bound constraints and the `largest_not_in_advance_of_upper`.
404            if let Preference::FreshestAvailable = preferences {
405                let mut upper_bound = constraints.upper_bound();
406                upper_bound.insert(largest_not_in_advance_of_upper);
407                candidate.advance_by(upper_bound.borrow());
408            }
409            // If the candidate is strictly outside the constraints, we didn't have a viable
410            // timestamp. This can happen e.g. when the query has AS OF, or when the lower bound is
411            // `[]`.
412            if !constraints.lower_bound().less_equal(&candidate)
413                || constraints.upper_bound().less_than(&candidate)
414            {
415                return Err(AdapterError::ImpossibleTimestampConstraints {
416                    constraints: constraints.display(timeline.as_ref()).to_string(),
417                });
418            } else {
419                candidate
420            }
421        };
422
423        Ok(RawTimestampDetermination {
424            timestamp: constraint_candidate,
425            constraints,
426            session_oracle_read_ts,
427        })
428    }
429
430    /// Determines the timestamp for a query.
431    ///
432    /// Timestamp determination may fail due to the restricted validity of
433    /// traces. Each has a `since` and `upper` frontier, and are only valid
434    /// after `since` and sure to be available not after `upper`.
435    ///
436    /// The timeline that `id_bundle` belongs to is also returned, if one exists.
437    fn determine_timestamp_for(
438        &self,
439        session: &Session,
440        id_bundle: &CollectionIdBundle,
441        when: &QueryWhen,
442        timeline_context: &TimelineContext,
443        oracle_read_ts: Option<Timestamp>,
444        real_time_recency_ts: Option<Timestamp>,
445        isolation_level: &IsolationLevel,
446    ) -> Result<(TimestampDetermination, ReadHolds), AdapterError> {
447        // First, we acquire read holds that will ensure the queried collections
448        // stay queryable at the chosen timestamp.
449        let read_holds = self.acquire_read_holds(id_bundle);
450
451        let upper = self.least_valid_write(id_bundle);
452
453        Self::determine_timestamp_for_inner(
454            session,
455            id_bundle,
456            when,
457            timeline_context,
458            oracle_read_ts,
459            real_time_recency_ts,
460            isolation_level,
461            read_holds,
462            upper,
463        )
464    }
465
466    /// Same as determine_timestamp_for, but read_holds and least_valid_write are already passed in.
467    fn determine_timestamp_for_inner(
468        session: &Session,
469        id_bundle: &CollectionIdBundle,
470        when: &QueryWhen,
471        timeline_context: &TimelineContext,
472        oracle_read_ts: Option<Timestamp>,
473        real_time_recency_ts: Option<Timestamp>,
474        isolation_level: &IsolationLevel,
475        read_holds: ReadHolds,
476        upper: Antichain<Timestamp>,
477    ) -> Result<(TimestampDetermination, ReadHolds), AdapterError> {
478        let timeline = Self::get_timeline(timeline_context);
479        let largest_not_in_advance_of_upper = Coordinator::largest_not_in_advance_of_upper(&upper);
480        let since = read_holds.least_valid_read();
481
482        // If the `since` is empty, then timestamp determination would fail. Let's return a more
483        // specific error in this case: Empty `since` frontiers happen here when collections were
484        // dropped concurrently with sequencing the query.
485        if since.is_empty() {
486            // Figure out what made the since frontier empty.
487            let mut unreadable_collections = Vec::new();
488            for (coll_id, hold) in read_holds.storage_holds {
489                if hold.since().is_empty() {
490                    unreadable_collections.push(coll_id);
491                }
492            }
493            for ((_instance_id, coll_id), hold) in read_holds.compute_holds {
494                if hold.since().is_empty() {
495                    unreadable_collections.push(coll_id);
496                }
497            }
498            return Err(AdapterError::CollectionUnreadable {
499                id: unreadable_collections.into_iter().join(", "),
500            });
501        }
502
503        let raw_determination = Self::determine_timestamp_via_constraints(
504            session,
505            &read_holds,
506            id_bundle,
507            when,
508            oracle_read_ts,
509            real_time_recency_ts,
510            isolation_level,
511            &timeline,
512            largest_not_in_advance_of_upper,
513        )?;
514
515        let timestamp_context = TimestampContext::from_timeline_context(
516            raw_determination.timestamp,
517            oracle_read_ts,
518            timeline,
519            timeline_context,
520        );
521
522        let determination = TimestampDetermination {
523            timestamp_context,
524            since,
525            upper,
526            largest_not_in_advance_of_upper,
527            oracle_read_ts,
528            session_oracle_read_ts: raw_determination.session_oracle_read_ts,
529            real_time_recency_ts,
530            constraints: raw_determination.constraints,
531        };
532
533        Ok((determination, read_holds))
534    }
535
536    /// Acquires [ReadHolds], for the given `id_bundle` at the earliest possible
537    /// times.
538    fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds;
539
540    /// The smallest common valid write frontier among the specified collections.
541    ///
542    /// Times that are not greater or equal to this frontier are complete for all collections
543    /// identified as arguments.
544    fn least_valid_write(&self, id_bundle: &CollectionIdBundle) -> Antichain<mz_repr::Timestamp> {
545        let mut upper = Antichain::new();
546        {
547            for (_id, _since, collection_upper) in
548                self.storage_frontiers(id_bundle.storage_ids.iter().cloned().collect_vec())
549            {
550                upper.extend(collection_upper);
551            }
552        }
553        {
554            for (instance, compute_ids) in &id_bundle.compute_ids {
555                for id in compute_ids.iter() {
556                    upper.extend(self.compute_write_frontier(*instance, *id).into_iter());
557                }
558            }
559        }
560        upper
561    }
562
563    /// Returns `least_valid_write` - 1, i.e., each time in `least_valid_write` stepped back in a
564    /// saturating way.
565    fn greatest_available_read(&self, id_bundle: &CollectionIdBundle) -> Antichain<Timestamp> {
566        let mut frontier = Antichain::new();
567        for t in self.least_valid_write(id_bundle) {
568            frontier.insert(t.step_back().unwrap_or(t));
569        }
570        frontier
571    }
572}
573
574impl Coordinator {
575    pub(crate) async fn oracle_read_ts(
576        &self,
577        session: &Session,
578        timeline_ctx: &TimelineContext,
579        when: &QueryWhen,
580    ) -> Option<Timestamp> {
581        let isolation_level = session.vars().transaction_isolation().clone();
582        let timeline = Coordinator::get_timeline(timeline_ctx);
583        let needs_linearized_read_ts =
584            Coordinator::needs_linearized_read_ts(&isolation_level, when);
585
586        let oracle_read_ts = match timeline {
587            Some(timeline) if needs_linearized_read_ts => {
588                let timestamp_oracle = self.get_timestamp_oracle(&timeline);
589                Some(timestamp_oracle.read_ts().await)
590            }
591            Some(_) | None => None,
592        };
593
594        oracle_read_ts
595    }
596
597    /// Determines the timestamp for a query, acquires read holds that ensure the
598    /// query remains executable at that time, and returns those.
599    /// The caller is responsible for eventually dropping those read holds.
600    #[mz_ore::instrument(level = "debug")]
601    pub(crate) fn determine_timestamp(
602        &self,
603        session: &Session,
604        id_bundle: &CollectionIdBundle,
605        when: &QueryWhen,
606        compute_instance: ComputeInstanceId,
607        timeline_context: &TimelineContext,
608        oracle_read_ts: Option<Timestamp>,
609        real_time_recency_ts: Option<mz_repr::Timestamp>,
610    ) -> Result<(TimestampDetermination, ReadHolds), AdapterError> {
611        let isolation_level = session.vars().transaction_isolation();
612        let (det, read_holds) = self.determine_timestamp_for(
613            session,
614            id_bundle,
615            when,
616            timeline_context,
617            oracle_read_ts,
618            real_time_recency_ts,
619            isolation_level,
620        )?;
621        self.metrics
622            .determine_timestamp
623            .with_label_values(&[
624                match det.respond_immediately() {
625                    true => "true",
626                    false => "false",
627                },
628                isolation_level.as_str(),
629                &compute_instance.to_string(),
630            ])
631            .inc();
632        if !det.respond_immediately()
633            && isolation_level == &IsolationLevel::StrictSerializable
634            && real_time_recency_ts.is_none()
635        {
636            // Note down the difference between StrictSerializable and Serializable into a metric.
637            if let Some(strict) = det.timestamp_context.timestamp() {
638                let (serializable_det, _tmp_read_holds) = self.determine_timestamp_for(
639                    session,
640                    id_bundle,
641                    when,
642                    timeline_context,
643                    oracle_read_ts,
644                    real_time_recency_ts,
645                    &IsolationLevel::Serializable,
646                )?;
647
648                if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
649                    self.metrics
650                        .timestamp_difference_for_strict_serializable_ms
651                        .with_label_values(&[compute_instance.to_string().as_str()])
652                        .observe(f64::cast_lossy(u64::from(
653                            strict.saturating_sub(*serializable),
654                        )));
655                }
656            }
657        }
658        Ok((det, read_holds))
659    }
660
661    /// The largest timestamp not greater or equal to an element of `upper`.
662    ///
663    /// If no such timestamp exists, for example because `upper` contains only the
664    /// minimal timestamp, the return value is `Timestamp::minimum()`.
665    pub(crate) fn largest_not_in_advance_of_upper(
666        upper: &Antichain<mz_repr::Timestamp>,
667    ) -> mz_repr::Timestamp {
668        // We peek at the largest element not in advance of `upper`, which
669        // involves a subtraction. If `upper` contains a zero timestamp there
670        // is no "prior" answer, and we do not want to peek at it as it risks
671        // hanging awaiting the response to data that may never arrive.
672        if let Some(upper) = upper.as_option() {
673            upper.step_back().unwrap_or_else(Timestamp::minimum)
674        } else {
675            // A complete trace can be read in its final form with this time.
676            //
677            // This should only happen for literals that have no sources or sources that
678            // are known to have completed (non-tailed files for example).
679            Timestamp::MAX
680        }
681    }
682}
683
684/// Information used when determining the timestamp for a query.
685#[derive(Serialize, Deserialize, Debug, Clone)]
686pub struct TimestampDetermination {
687    /// The chosen timestamp context from `determine_timestamp`.
688    pub timestamp_context: TimestampContext,
689    /// The read frontier of all involved sources.
690    pub since: Antichain<Timestamp>,
691    /// The write frontier of all involved sources.
692    pub upper: Antichain<Timestamp>,
693    /// The largest timestamp not in advance of upper.
694    pub largest_not_in_advance_of_upper: Timestamp,
695    /// The value of the timeline's oracle timestamp, if used.
696    pub oracle_read_ts: Option<Timestamp>,
697    /// The value of the session local timestamp's oracle timestamp, if used.
698    pub session_oracle_read_ts: Option<Timestamp>,
699    /// The value of the real time recency timestamp, if used.
700    pub real_time_recency_ts: Option<Timestamp>,
701    /// The constraints used by the constraint based solver.
702    /// See the [`constraints`] module for more information.
703    pub constraints: Constraints,
704}
705
706impl TimestampDetermination {
707    pub fn respond_immediately(&self) -> bool {
708        match &self.timestamp_context {
709            TimestampContext::TimelineTimestamp { chosen_ts, .. } => {
710                !self.upper.less_equal(chosen_ts)
711            }
712            TimestampContext::NoTimestamp => true,
713        }
714    }
715}
716
717/// Information used when determining the timestamp for a query.
718#[derive(Clone, Debug, Serialize, Deserialize)]
719pub struct TimestampExplanation {
720    /// The chosen timestamp from `determine_timestamp`.
721    pub determination: TimestampDetermination,
722    /// Details about each source.
723    pub sources: Vec<TimestampSource>,
724    /// Wall time of first statement executed in this transaction
725    pub session_wall_time: DateTime<Utc>,
726    /// Cached value of determination.respond_immediately()
727    pub respond_immediately: bool,
728}
729
730#[derive(Clone, Debug, Serialize, Deserialize)]
731pub struct TimestampSource {
732    pub name: String,
733    pub read_frontier: Vec<Timestamp>,
734    pub write_frontier: Vec<Timestamp>,
735}
736
737pub trait DisplayableInTimeline {
738    fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result;
739    fn display<'a>(&'a self, timeline: Option<&'a Timeline>) -> DisplayInTimeline<'a, Self> {
740        DisplayInTimeline { t: self, timeline }
741    }
742}
743
744impl DisplayableInTimeline for mz_repr::Timestamp {
745    fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
746        if let Some(Timeline::EpochMilliseconds) = timeline {
747            let ts_ms: u64 = self.into();
748            if let Ok(ts_ms) = i64::try_from(ts_ms) {
749                if let Some(ndt) = DateTime::from_timestamp_millis(ts_ms) {
750                    return write!(f, "{:13} ({})", self, ndt.format("%Y-%m-%d %H:%M:%S%.3f"));
751                }
752            }
753        }
754        write!(f, "{:13}", self)
755    }
756}
757
758pub struct DisplayInTimeline<'a, T: ?Sized> {
759    t: &'a T,
760    timeline: Option<&'a Timeline>,
761}
762impl<'a, T> fmt::Display for DisplayInTimeline<'a, T>
763where
764    T: DisplayableInTimeline,
765{
766    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
767        self.t.fmt(self.timeline, f)
768    }
769}
770
771impl<'a, T> fmt::Debug for DisplayInTimeline<'a, T>
772where
773    T: DisplayableInTimeline,
774{
775    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
776        fmt::Display::fmt(&self, f)
777    }
778}
779
780impl fmt::Display for TimestampExplanation {
781    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
782        let timeline = self.determination.timestamp_context.timeline();
783        writeln!(
784            f,
785            "                query timestamp: {}",
786            self.determination
787                .timestamp_context
788                .timestamp_or_default()
789                .display(timeline)
790        )?;
791        if let Some(oracle_read_ts) = &self.determination.oracle_read_ts {
792            writeln!(
793                f,
794                "          oracle read timestamp: {}",
795                oracle_read_ts.display(timeline)
796            )?;
797        }
798        if let Some(session_oracle_read_ts) = &self.determination.session_oracle_read_ts {
799            writeln!(
800                f,
801                "  session oracle read timestamp: {}",
802                session_oracle_read_ts.display(timeline)
803            )?;
804        }
805        if let Some(real_time_recency_ts) = &self.determination.real_time_recency_ts {
806            writeln!(
807                f,
808                "    real time recency timestamp: {}",
809                real_time_recency_ts.display(timeline)
810            )?;
811        }
812        writeln!(
813            f,
814            "largest not in advance of upper: {}",
815            self.determination
816                .largest_not_in_advance_of_upper
817                .display(timeline),
818        )?;
819        writeln!(
820            f,
821            "                          upper:{:?}",
822            self.determination
823                .upper
824                .iter()
825                .map(|t| t.display(timeline))
826                .collect::<Vec<_>>()
827        )?;
828        writeln!(
829            f,
830            "                          since:{:?}",
831            self.determination
832                .since
833                .iter()
834                .map(|t| t.display(timeline))
835                .collect::<Vec<_>>()
836        )?;
837        writeln!(
838            f,
839            "        can respond immediately: {}",
840            self.respond_immediately
841        )?;
842        writeln!(f, "                       timeline: {:?}", &timeline)?;
843        writeln!(
844            f,
845            "              session wall time: {:13} ({})",
846            self.session_wall_time.timestamp_millis(),
847            self.session_wall_time.format("%Y-%m-%d %H:%M:%S%.3f"),
848        )?;
849
850        for source in &self.sources {
851            writeln!(f, "")?;
852            writeln!(f, "source {}:", source.name)?;
853            writeln!(
854                f,
855                "                  read frontier:{:?}",
856                source
857                    .read_frontier
858                    .iter()
859                    .map(|t| t.display(timeline))
860                    .collect::<Vec<_>>()
861            )?;
862            writeln!(
863                f,
864                "                 write frontier:{:?}",
865                source
866                    .write_frontier
867                    .iter()
868                    .map(|t| t.display(timeline))
869                    .collect::<Vec<_>>()
870            )?;
871        }
872
873        writeln!(f, "")?;
874        writeln!(f, "binding constraints:")?;
875        write!(f, "{}", self.determination.constraints.display(timeline))?;
876
877        Ok(())
878    }
879}
880
881/// Types and logic in support of a constraint-based approach to timestamp determination.
882mod constraints {
883
884    use core::fmt;
885    use std::fmt::Debug;
886
887    use differential_dataflow::lattice::Lattice;
888    use mz_storage_types::sources::Timeline;
889    use serde::{Deserialize, Serialize};
890    use timely::progress::{Antichain, Timestamp};
891
892    use mz_compute_types::ComputeInstanceId;
893    use mz_repr::GlobalId;
894    use mz_sql::session::vars::IsolationLevel;
895
896    use super::DisplayableInTimeline;
897
898    /// Constraints expressed on the timestamp of a query.
899    ///
900    /// The constraints are expressed on the minimum and maximum values,
901    /// resulting in a (possibly empty) interval of valid timestamps.
902    ///
903    /// The constraints may be redundant, in the interest of providing
904    /// more complete explanations, but they may also be minimized at
905    /// any point without altering their behavior by removing redundant
906    /// constraints.
907    ///
908    /// When combined with a `Preference` one can determine an
909    /// ideal timestamp to use.
910    #[derive(Default, Serialize, Deserialize, Clone)]
911    pub struct Constraints {
912        /// Timestamps and reasons that impose an inclusive lower bound.
913        pub lower: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
914        /// Timestamps and reasons that impose an inclusive upper bound.
915        pub upper: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
916    }
917
918    impl DisplayableInTimeline for Constraints {
919        fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
920            if !self.lower.is_empty() {
921                writeln!(f, "lower:")?;
922                for (ts, reason) in &self.lower {
923                    let ts: Vec<_> = ts
924                        .iter()
925                        .map(|t| format!("{}", t.display(timeline)))
926                        .collect();
927                    writeln!(f, "  ({}): [{}]", reason, ts.join(", "))?;
928                }
929            }
930            if !self.upper.is_empty() {
931                writeln!(f, "upper:")?;
932                for (ts, reason) in &self.upper {
933                    let ts: Vec<_> = ts
934                        .iter()
935                        .map(|t| format!("{}", t.display(timeline)))
936                        .collect();
937                    writeln!(f, "  ({}): [{}]", reason, ts.join(", "))?;
938                }
939            }
940            Ok(())
941        }
942    }
943
944    impl Debug for Constraints {
945        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
946            self.display(None).fmt(f)?;
947            Ok(())
948        }
949    }
950
951    impl Constraints {
952        /// Remove constraints that are dominated by other constraints.
953        ///
954        /// This removes redundant constraints, without removing constraints
955        /// that are "tight" in the sense that the interval would be
956        /// meaningfully different without them.
957        /// For example, two constraints at the same
958        /// time will both be retained, in the interest of full information.
959        /// But a lower bound constraint at time `t` will be removed if there is a
960        /// constraint at time `t + 1` (or any larger time).
961        pub fn minimize(&mut self) {
962            // Establish the upper bound of lower constraints.
963            let lower_frontier = self.lower_bound();
964            // Retain constraints that intersect `lower_frontier`.
965            self.lower.retain(|(anti, _)| {
966                anti.iter()
967                    .any(|time| lower_frontier.elements().contains(time))
968            });
969
970            // Establish the lower bound of upper constraints.
971            let upper_frontier = self.upper_bound();
972            // Retain constraints that intersect `upper_frontier`.
973            self.upper.retain(|(anti, _)| {
974                anti.iter()
975                    .any(|time| upper_frontier.elements().contains(time))
976            });
977        }
978
979        /// An antichain equal to the least upper bound of lower bounds.
980        pub fn lower_bound(&self) -> Antichain<mz_repr::Timestamp> {
981            let mut lower = Antichain::from_elem(mz_repr::Timestamp::minimum());
982            for (anti, _) in self.lower.iter() {
983                lower = lower.join(anti);
984            }
985            lower
986        }
987        /// An antichain equal to the greatest lower bound of upper bounds.
988        pub fn upper_bound(&self) -> Antichain<mz_repr::Timestamp> {
989            self.upper
990                .iter()
991                .flat_map(|(anti, _)| anti.iter())
992                .cloned()
993                .collect()
994        }
995    }
996
997    /// An explanation of reasons for a timestamp constraint.
998    #[derive(Serialize, Deserialize, Clone)]
999    pub enum Reason {
1000        /// A compute input at a compute instance.
1001        /// This is something like an index or view
1002        /// that is maintained by compute.
1003        ComputeInput(Vec<(ComputeInstanceId, GlobalId)>),
1004        /// A storage input.
1005        StorageInput(Vec<GlobalId>),
1006        /// A specified isolation level and the timestamp it requires.
1007        IsolationLevel(IsolationLevel),
1008        /// Real-time recency may constrain the timestamp from below.
1009        RealTimeRecency,
1010        /// The query expressed its own constraint on the timestamp.
1011        QueryAsOf,
1012    }
1013
1014    impl fmt::Display for Reason {
1015        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1016            match self {
1017                Reason::ComputeInput(ids) => {
1018                    let formatted: Vec<_> =
1019                        ids.iter().map(|(c, g)| format!("({}, {})", c, g)).collect();
1020                    write!(f, "Indexed inputs: [{}]", formatted.join(", "))
1021                }
1022                Reason::StorageInput(ids) => {
1023                    let formatted: Vec<_> = ids.iter().map(|g| format!("{}", g)).collect();
1024                    write!(f, "Storage inputs: [{}]", formatted.join(", "))
1025                }
1026                Reason::IsolationLevel(level) => {
1027                    write!(f, "Isolation level: {:?}", level)
1028                }
1029                Reason::RealTimeRecency => {
1030                    write!(f, "Real-time recency")
1031                }
1032                Reason::QueryAsOf => {
1033                    write!(f, "Query's AS OF")
1034                }
1035            }
1036        }
1037    }
1038
1039    /// Given an interval [read, write) of timestamp options,
1040    /// this expresses a preference for either end of the spectrum.
1041    pub enum Preference {
1042        /// Prefer the greatest timestamp immediately available.
1043        ///
1044        /// This considers the immediate inputs to a query and
1045        /// selects the greatest timestamp not greater or equal
1046        /// to any of their write frontiers.
1047        ///
1048        /// The preference only relates to immediate query inputs,
1049        /// but it could be extended to transitive inputs as well.
1050        /// For example, one could imagine preferring the freshest
1051        /// data known to be ingested into Materialize, under the
1052        /// premise that those answers should soon become available,
1053        /// and may be more fresh than the immediate inputs.
1054        FreshestAvailable,
1055        /// Prefer the least valid timestamp.
1056        ///
1057        /// This is useful when one has no expressed freshness
1058        /// constraints, and wants to minimally impact others.
1059        /// For example, `AS OF AT LEAST <time>`.
1060        StalestValid,
1061    }
1062}