mz_adapter/coord/
timeline.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A mechanism to ensure that a sequence of writes and reads proceed correctly through timestamps.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::sync::Arc;
15
16use chrono::{DateTime, Utc};
17use futures::Future;
18use itertools::Itertools;
19use mz_adapter_types::connection::ConnectionId;
20use mz_catalog::memory::objects::{CatalogItem, ContinualTask, MaterializedView, View};
21use mz_compute_types::ComputeInstanceId;
22use mz_expr::CollectionPlan;
23use mz_ore::collections::CollectionExt;
24use mz_ore::instrument;
25use mz_ore::now::{EpochMillis, NowFn, to_datetime};
26use mz_ore::vec::VecExt;
27use mz_repr::{CatalogItemId, GlobalId, Timestamp};
28use mz_sql::names::{ResolvedDatabaseSpecifier, SchemaSpecifier};
29use mz_storage_types::sources::Timeline;
30use mz_timestamp_oracle::batching_oracle::BatchingTimestampOracle;
31use mz_timestamp_oracle::postgres_oracle::{
32    PostgresTimestampOracle, PostgresTimestampOracleConfig,
33};
34use mz_timestamp_oracle::{self, TimestampOracle, WriteTimestamp};
35use timely::progress::Timestamp as TimelyTimestamp;
36use tracing::{Instrument, debug, error, info};
37
38use crate::AdapterError;
39use crate::coord::Coordinator;
40use crate::coord::id_bundle::CollectionIdBundle;
41use crate::coord::read_policy::ReadHolds;
42use crate::coord::timestamp_selection::TimestampProvider;
43
44/// An enum describing whether or not a query belongs to a timeline and whether the query can be
45/// affected by the timestamp at which it executes.
46#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash)]
47pub enum TimelineContext {
48    /// Can only ever belong to a single specific timeline. The answer will depend on a timestamp
49    /// chosen from that specific timeline.
50    TimelineDependent(Timeline),
51    /// Can belong to any timeline. The answer will depend on a timestamp chosen from some
52    /// timeline.
53    TimestampDependent,
54    /// The answer does not depend on a chosen timestamp.
55    TimestampIndependent,
56}
57
58impl TimelineContext {
59    /// Whether or not the context contains a timeline.
60    pub fn contains_timeline(&self) -> bool {
61        self.timeline().is_some()
62    }
63
64    /// The timeline belonging to this context, if one exists.
65    pub fn timeline(&self) -> Option<&Timeline> {
66        match self {
67            Self::TimelineDependent(timeline) => Some(timeline),
68            Self::TimestampIndependent | Self::TimestampDependent => None,
69        }
70    }
71}
72
73/// Global state for a single timeline.
74///
75/// For each timeline we maintain a timestamp oracle, which is responsible for
76/// providing read (and sometimes write) timestamps, and a set of read holds which
77/// guarantee that those read timestamps are valid.
78pub(crate) struct TimelineState<T: TimelyTimestamp> {
79    pub(crate) oracle: Arc<dyn TimestampOracle<T> + Send + Sync>,
80    pub(crate) read_holds: ReadHolds<T>,
81}
82
83impl<T: TimelyTimestamp> fmt::Debug for TimelineState<T> {
84    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85        f.debug_struct("TimelineState")
86            .field("read_holds", &self.read_holds)
87            .finish()
88    }
89}
90
91impl Coordinator {
92    pub(crate) fn now(&self) -> EpochMillis {
93        (self.catalog().config().now)()
94    }
95
96    pub(crate) fn now_datetime(&self) -> DateTime<Utc> {
97        to_datetime(self.now())
98    }
99
100    pub(crate) fn get_timestamp_oracle(
101        &self,
102        timeline: &Timeline,
103    ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync> {
104        let oracle = &self
105            .global_timelines
106            .get(timeline)
107            .expect("all timelines have a timestamp oracle")
108            .oracle;
109
110        Arc::clone(oracle)
111    }
112
113    /// Returns a [`TimestampOracle`] used for reads and writes from/to a local input.
114    pub(crate) fn get_local_timestamp_oracle(
115        &self,
116    ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync> {
117        self.get_timestamp_oracle(&Timeline::EpochMilliseconds)
118    }
119
120    /// Assign a timestamp for a read from a local input. Reads following writes
121    /// must be at a time >= the write's timestamp; we choose "equal to" for
122    /// simplicity's sake and to open as few new timestamps as possible.
123    pub(crate) async fn get_local_read_ts(&self) -> Timestamp {
124        self.get_local_timestamp_oracle().read_ts().await
125    }
126
127    /// Assign a timestamp for a write to a local input and increase the local ts.
128    /// Writes following reads must ensure that they are assigned a strictly larger
129    /// timestamp to ensure they are not visible to any real-time earlier reads.
130    #[instrument(name = "coord::get_local_write_ts")]
131    pub(crate) async fn get_local_write_ts(&mut self) -> WriteTimestamp {
132        self.global_timelines
133            .get_mut(&Timeline::EpochMilliseconds)
134            .expect("no realtime timeline")
135            .oracle
136            .write_ts()
137            .await
138    }
139
140    /// Peek the current timestamp used for operations on local inputs. Used to determine how much
141    /// to block group commits by.
142    pub(crate) async fn peek_local_write_ts(&self) -> Timestamp {
143        self.get_local_timestamp_oracle().peek_write_ts().await
144    }
145
146    /// Marks a write at `timestamp` as completed, using a [`TimestampOracle`].
147    pub(crate) fn apply_local_write(
148        &self,
149        timestamp: Timestamp,
150    ) -> impl Future<Output = ()> + Send + 'static {
151        let now = self.now().into();
152
153        let upper_bound = upper_bound(&now);
154        if timestamp > upper_bound {
155            error!(
156                %now,
157                "Setting local read timestamp to {timestamp}, which is more than \
158                the desired upper bound {upper_bound}."
159            );
160        }
161
162        let oracle = self.get_local_timestamp_oracle();
163
164        async move {
165            oracle
166                .apply_write(timestamp)
167                .instrument(tracing::debug_span!("apply_local_write_static", ?timestamp))
168                .await
169        }
170    }
171
172    /// Assign a timestamp for a write to the catalog. This timestamp should have the following
173    /// properties:
174    ///
175    ///   - Monotonically increasing.
176    ///   - Greater than or equal to the current catalog upper.
177    ///   - Greater than the largest write timestamp used in the
178    ///     [epoch millisecond timeline](Timeline::EpochMilliseconds).
179    ///
180    /// In general this is fully satisfied by the getting the current write timestamp in the
181    /// [epoch millisecond timeline](Timeline::EpochMilliseconds) from the timestamp oracle,
182    /// however, in read-only mode we cannot modify the timestamp oracle.
183    pub(crate) async fn get_catalog_write_ts(&mut self) -> Timestamp {
184        if self.read_only_controllers {
185            let (write_ts, upper) =
186                futures::future::join(self.peek_local_write_ts(), self.catalog().current_upper())
187                    .await;
188            std::cmp::max(write_ts, upper)
189        } else {
190            self.get_local_write_ts().await.timestamp
191        }
192    }
193
194    /// Ensures that a global timeline state exists for `timeline`.
195    pub(crate) async fn ensure_timeline_state<'a>(
196        &'a mut self,
197        timeline: &'a Timeline,
198    ) -> &'a mut TimelineState<Timestamp> {
199        Self::ensure_timeline_state_with_initial_time(
200            timeline,
201            Timestamp::minimum(),
202            self.catalog().config().now.clone(),
203            self.pg_timestamp_oracle_config.clone(),
204            &mut self.global_timelines,
205            self.read_only_controllers,
206        )
207        .await
208    }
209
210    /// Ensures that a global timeline state exists for `timeline`, with an initial time
211    /// of `initially`.
212    #[instrument]
213    pub(crate) async fn ensure_timeline_state_with_initial_time<'a>(
214        timeline: &'a Timeline,
215        initially: Timestamp,
216        now: NowFn,
217        pg_oracle_config: Option<PostgresTimestampOracleConfig>,
218        global_timelines: &'a mut BTreeMap<Timeline, TimelineState<Timestamp>>,
219        read_only: bool,
220    ) -> &'a mut TimelineState<Timestamp> {
221        if !global_timelines.contains_key(timeline) {
222            info!(
223                "opening a new CRDB/postgres TimestampOracle for timeline {:?}",
224                timeline,
225            );
226
227            let now_fn = if timeline == &Timeline::EpochMilliseconds {
228                now
229            } else {
230                // Timelines that are not `EpochMilliseconds` don't have an
231                // "external" clock that wants to drive forward timestamps in
232                // addition to the rule that write timestamps must be strictly
233                // monotonically increasing.
234                //
235                // Passing in a clock that always yields the minimum takes the
236                // clock out of the equation and makes timestamps advance only
237                // by the rule about strict monotonicity mentioned above.
238                NowFn::from(|| Timestamp::minimum().into())
239            };
240
241            let pg_oracle_config = pg_oracle_config.expect(
242                        "missing --timestamp-oracle-url even though the crdb-backed timestamp oracle was configured");
243
244            let batching_metrics = Arc::clone(&pg_oracle_config.metrics);
245
246            let pg_oracle: Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync> = Arc::new(
247                PostgresTimestampOracle::open(
248                    pg_oracle_config,
249                    timeline.to_string(),
250                    initially,
251                    now_fn,
252                    read_only,
253                )
254                .await,
255            );
256
257            let batching_oracle = BatchingTimestampOracle::new(batching_metrics, pg_oracle);
258
259            let oracle: Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync> =
260                Arc::new(batching_oracle);
261
262            global_timelines.insert(
263                timeline.clone(),
264                TimelineState {
265                    oracle,
266                    read_holds: ReadHolds::new(),
267                },
268            );
269        }
270        global_timelines.get_mut(timeline).expect("inserted above")
271    }
272
273    /// Groups together storage and compute resources into a [`CollectionIdBundle`]
274    pub(crate) fn build_collection_id_bundle(
275        &self,
276        storage_ids: impl IntoIterator<Item = GlobalId>,
277        compute_ids: impl IntoIterator<Item = (ComputeInstanceId, GlobalId)>,
278        clusters: impl IntoIterator<Item = ComputeInstanceId>,
279    ) -> CollectionIdBundle {
280        let mut compute: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
281
282        // Collect all compute_ids.
283        for (instance_id, id) in compute_ids {
284            compute.entry(instance_id).or_default().insert(id);
285        }
286
287        // Collect all GlobalIds associated with a compute instance ID.
288        let cluster_set: BTreeSet<_> = clusters.into_iter().collect();
289        for (_timeline, TimelineState { read_holds, .. }) in &self.global_timelines {
290            let compute_ids = read_holds
291                .compute_ids()
292                .filter(|(instance_id, _id)| cluster_set.contains(instance_id));
293            for (instance_id, id) in compute_ids {
294                compute.entry(instance_id).or_default().insert(id);
295            }
296        }
297
298        CollectionIdBundle {
299            storage_ids: storage_ids.into_iter().collect(),
300            compute_ids: compute,
301        }
302    }
303
304    /// Given a [`Timeline`] and a [`CollectionIdBundle`], removes all of the "storage ids"
305    /// and "compute ids" in the bundle, from the timeline.
306    pub(crate) fn remove_resources_associated_with_timeline(
307        &mut self,
308        timeline: Timeline,
309        ids: CollectionIdBundle,
310    ) -> bool {
311        let TimelineState { read_holds, .. } = self
312            .global_timelines
313            .get_mut(&timeline)
314            .expect("all timeslines have a timestamp oracle");
315
316        // Remove all of the underlying resources.
317        for id in ids.storage_ids {
318            read_holds.remove_storage_collection(id);
319        }
320        for (compute_id, ids) in ids.compute_ids {
321            for id in ids {
322                read_holds.remove_compute_collection(compute_id, id);
323            }
324        }
325        let became_empty = read_holds.is_empty();
326
327        became_empty
328    }
329
330    pub(crate) fn remove_compute_ids_from_timeline<I>(&mut self, ids: I) -> Vec<Timeline>
331    where
332        I: IntoIterator<Item = (ComputeInstanceId, GlobalId)>,
333    {
334        let mut empty_timelines = BTreeSet::new();
335        for (compute_instance, id) in ids {
336            for (timeline, TimelineState { read_holds, .. }) in &mut self.global_timelines {
337                read_holds.remove_compute_collection(compute_instance, id);
338                if read_holds.is_empty() {
339                    empty_timelines.insert(timeline.clone());
340                }
341            }
342        }
343        empty_timelines.into_iter().collect()
344    }
345
346    pub(crate) fn ids_in_timeline(&self, timeline: &Timeline) -> CollectionIdBundle {
347        let mut id_bundle = CollectionIdBundle::default();
348        for entry in self.catalog().entries() {
349            if let TimelineContext::TimelineDependent(entry_timeline) =
350                self.get_timeline_context(entry.id())
351            {
352                if timeline == &entry_timeline {
353                    match entry.item() {
354                        CatalogItem::Table(table) => {
355                            id_bundle.storage_ids.extend(table.global_ids());
356                        }
357                        CatalogItem::Source(source) => {
358                            id_bundle.storage_ids.insert(source.global_id());
359                        }
360                        CatalogItem::MaterializedView(mv) => {
361                            id_bundle.storage_ids.insert(mv.global_id());
362                        }
363                        CatalogItem::ContinualTask(ct) => {
364                            id_bundle.storage_ids.insert(ct.global_id());
365                        }
366                        CatalogItem::Index(index) => {
367                            id_bundle
368                                .compute_ids
369                                .entry(index.cluster_id)
370                                .or_default()
371                                .insert(index.global_id());
372                        }
373                        CatalogItem::View(_)
374                        | CatalogItem::Sink(_)
375                        | CatalogItem::Type(_)
376                        | CatalogItem::Func(_)
377                        | CatalogItem::Secret(_)
378                        | CatalogItem::Connection(_)
379                        | CatalogItem::Log(_) => {}
380                    }
381                }
382            }
383        }
384        id_bundle
385    }
386
387    /// Return an error if the ids are from incompatible [`TimelineContext`]s. This should
388    /// be used to prevent users from doing things that are either meaningless
389    /// (joining data from timelines that have similar numbers with different
390    /// meanings like two separate debezium topics) or will never complete (joining
391    /// cdcv2 and realtime data).
392    pub(crate) fn validate_timeline_context<I>(
393        &self,
394        ids: I,
395    ) -> Result<TimelineContext, AdapterError>
396    where
397        I: IntoIterator<Item = GlobalId>,
398    {
399        let items_ids = ids
400            .into_iter()
401            .filter_map(|gid| self.catalog().try_resolve_item_id(&gid));
402        let mut timeline_contexts: Vec<_> =
403            self.get_timeline_contexts(items_ids).into_iter().collect();
404        // If there's more than one timeline, we will not produce meaningful
405        // data to a user. Take, for example, some realtime source and a debezium
406        // consistency topic source. The realtime source uses something close to now
407        // for its timestamps. The debezium source starts at 1 and increments per
408        // transaction. We don't want to choose some timestamp that is valid for both
409        // of these because the debezium source will never get to the same value as the
410        // realtime source's "milliseconds since Unix epoch" value. And even if it did,
411        // it's not meaningful to join just because those two numbers happen to be the
412        // same now.
413        //
414        // Another example: assume two separate debezium consistency topics. Both
415        // start counting at 1 and thus have similarish numbers that probably overlap
416        // a lot. However it's still not meaningful to join those two at a specific
417        // transaction counter number because those counters are unrelated to the
418        // other.
419        let timelines: Vec<_> = timeline_contexts
420            .drain_filter_swapping(|timeline_context| timeline_context.contains_timeline())
421            .collect();
422
423        // A single or group of objects may contain multiple compatible timeline
424        // contexts. For example `SELECT *, 1, mz_now() FROM t` will contain all
425        // types of contexts. We choose the strongest context level to return back.
426        if timelines.len() > 1 {
427            Err(AdapterError::Unsupported(
428                "multiple timelines within one dataflow",
429            ))
430        } else if timelines.len() == 1 {
431            Ok(timelines.into_element())
432        } else if timeline_contexts
433            .iter()
434            .contains(&TimelineContext::TimestampDependent)
435        {
436            Ok(TimelineContext::TimestampDependent)
437        } else {
438            Ok(TimelineContext::TimestampIndependent)
439        }
440    }
441
442    /// Return the [`TimelineContext`] belonging to a [`CatalogItemId`], if one exists.
443    pub(crate) fn get_timeline_context(&self, id: CatalogItemId) -> TimelineContext {
444        let entry = self.catalog().get_entry(&id);
445        self.validate_timeline_context(entry.global_ids())
446            .expect("impossible for a single object to belong to incompatible timeline contexts")
447    }
448
449    /// Return the [`TimelineContext`] belonging to a [`GlobalId`], if one exists.
450    pub(crate) fn get_timeline_context_for_global_id(&self, id: GlobalId) -> TimelineContext {
451        self.validate_timeline_context(vec![id])
452            .expect("impossible for a single object to belong to incompatible timeline contexts")
453    }
454
455    /// Return the [`TimelineContext`]s belonging to a list of [`CatalogItemId`]s, if any exist.
456    fn get_timeline_contexts<I>(&self, ids: I) -> BTreeSet<TimelineContext>
457    where
458        I: IntoIterator<Item = CatalogItemId>,
459    {
460        let mut seen: BTreeSet<CatalogItemId> = BTreeSet::new();
461        let mut timelines: BTreeSet<TimelineContext> = BTreeSet::new();
462
463        // Recurse through IDs to find all sources and tables, adding new ones to
464        // the set until we reach the bottom.
465        let mut ids: Vec<_> = ids.into_iter().collect();
466        while let Some(id) = ids.pop() {
467            // Protect against possible infinite recursion. Not sure if it's possible, but
468            // a cheap prevention for the future.
469            if !seen.insert(id) {
470                continue;
471            }
472            if let Some(entry) = self.catalog().try_get_entry(&id) {
473                match entry.item() {
474                    CatalogItem::Source(source) => {
475                        timelines
476                            .insert(TimelineContext::TimelineDependent(source.timeline.clone()));
477                    }
478                    CatalogItem::Index(index) => {
479                        let on_id = self.catalog().resolve_item_id(&index.on);
480                        ids.push(on_id);
481                    }
482                    CatalogItem::View(View { optimized_expr, .. }) => {
483                        // If the definition contains a temporal function, the timeline must
484                        // be timestamp dependent.
485                        if optimized_expr.contains_temporal() {
486                            timelines.insert(TimelineContext::TimestampDependent);
487                        } else {
488                            timelines.insert(TimelineContext::TimestampIndependent);
489                        }
490                        let item_ids = optimized_expr
491                            .depends_on()
492                            .into_iter()
493                            .map(|gid| self.catalog().resolve_item_id(&gid));
494                        ids.extend(item_ids);
495                    }
496                    CatalogItem::MaterializedView(MaterializedView { optimized_expr, .. }) => {
497                        // In some cases the timestamp selected may not affect the answer to a
498                        // query, but it may affect our ability to query the materialized view.
499                        // Materialized views must durably materialize the result of a query, even
500                        // for constant queries. If we choose a timestamp larger than the upper,
501                        // which represents the current progress of the view, then the query will
502                        // need to block and wait for the materialized view to advance.
503                        timelines.insert(TimelineContext::TimestampDependent);
504                        let item_ids = optimized_expr
505                            .depends_on()
506                            .into_iter()
507                            .map(|gid| self.catalog().resolve_item_id(&gid));
508                        ids.extend(item_ids);
509                    }
510                    CatalogItem::ContinualTask(ContinualTask { raw_expr, .. }) => {
511                        // See comment in MaterializedView
512                        timelines.insert(TimelineContext::TimestampDependent);
513                        let item_ids = raw_expr
514                            .depends_on()
515                            .into_iter()
516                            .map(|gid| self.catalog().resolve_item_id(&gid));
517                        ids.extend(item_ids);
518                    }
519                    CatalogItem::Table(table) => {
520                        timelines.insert(TimelineContext::TimelineDependent(table.timeline()));
521                    }
522                    CatalogItem::Log(_) => {
523                        timelines.insert(TimelineContext::TimelineDependent(
524                            Timeline::EpochMilliseconds,
525                        ));
526                    }
527                    CatalogItem::Sink(_)
528                    | CatalogItem::Type(_)
529                    | CatalogItem::Func(_)
530                    | CatalogItem::Secret(_)
531                    | CatalogItem::Connection(_) => {}
532                }
533            }
534        }
535
536        timelines
537    }
538
539    /// Returns an iterator that partitions an id bundle by the [`TimelineContext`] that each id
540    /// belongs to.
541    pub fn partition_ids_by_timeline_context(
542        &self,
543        id_bundle: &CollectionIdBundle,
544    ) -> impl Iterator<Item = (TimelineContext, CollectionIdBundle)> + use<> {
545        let mut res: BTreeMap<TimelineContext, CollectionIdBundle> = BTreeMap::new();
546
547        for gid in &id_bundle.storage_ids {
548            let timeline_context = self.get_timeline_context_for_global_id(*gid);
549            res.entry(timeline_context)
550                .or_default()
551                .storage_ids
552                .insert(*gid);
553        }
554
555        for (compute_instance, ids) in &id_bundle.compute_ids {
556            for gid in ids {
557                let timeline_context = self.get_timeline_context_for_global_id(*gid);
558                res.entry(timeline_context)
559                    .or_default()
560                    .compute_ids
561                    .entry(*compute_instance)
562                    .or_default()
563                    .insert(*gid);
564            }
565        }
566
567        res.into_iter()
568    }
569
570    /// Return the set of ids in a timedomain and verify timeline correctness.
571    ///
572    /// When a user starts a transaction, we need to prevent compaction of anything
573    /// they might read from. We use a heuristic of "anything in the same database
574    /// schemas with the same timeline as whatever the first query is".
575    pub(crate) fn timedomain_for<'a, I>(
576        &self,
577        uses_ids: I,
578        timeline_context: &TimelineContext,
579        conn_id: &ConnectionId,
580        compute_instance: ComputeInstanceId,
581    ) -> Result<CollectionIdBundle, AdapterError>
582    where
583        I: IntoIterator<Item = &'a GlobalId>,
584    {
585        // Gather all the used schemas.
586        let mut schemas = BTreeSet::new();
587        for id in uses_ids {
588            let entry = self.catalog().get_entry_by_global_id(id);
589            let name = entry.name();
590            schemas.insert((name.qualifiers.database_spec, name.qualifiers.schema_spec));
591        }
592
593        let pg_catalog_schema = (
594            ResolvedDatabaseSpecifier::Ambient,
595            SchemaSpecifier::Id(self.catalog().get_pg_catalog_schema_id()),
596        );
597        let system_schemas: Vec<_> = self
598            .catalog()
599            .system_schema_ids()
600            .map(|id| (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)))
601            .collect();
602
603        if system_schemas.iter().any(|s| schemas.contains(s)) {
604            // If any of the system schemas is specified, add the rest of the
605            // system schemas.
606            schemas.extend(system_schemas);
607        } else if !schemas.is_empty() {
608            // Always include the pg_catalog schema, if schemas is non-empty. The pg_catalog schemas is
609            // sometimes used by applications in followup queries.
610            schemas.insert(pg_catalog_schema);
611        }
612
613        // Gather the IDs of all items in all used schemas.
614        let mut collection_ids: BTreeSet<GlobalId> = BTreeSet::new();
615        for (db, schema) in schemas {
616            let schema = self.catalog().get_schema(&db, &schema, conn_id);
617            // Note: We include just the latest `GlobalId` instead of all `GlobalId`s associated
618            // with an object, because older versions will already get included, if there are
619            // objects the depend on them.
620            let global_ids = schema
621                .items
622                .values()
623                .map(|item_id| self.catalog().get_entry(item_id).latest_global_id());
624            collection_ids.extend(global_ids);
625        }
626
627        // Gather the dependencies of those items.
628        let mut id_bundle: CollectionIdBundle = self
629            .index_oracle(compute_instance)
630            .sufficient_collections(collection_ids);
631
632        // Filter out ids from different timelines.
633        for ids in [
634            &mut id_bundle.storage_ids,
635            &mut id_bundle.compute_ids.entry(compute_instance).or_default(),
636        ] {
637            ids.retain(|gid| {
638                let id_timeline_context = self
639                    .validate_timeline_context(vec![*gid])
640                    .expect("single id should never fail");
641                match (&id_timeline_context, &timeline_context) {
642                    // If this id doesn't have a timeline, we can keep it.
643                    (
644                        TimelineContext::TimestampIndependent | TimelineContext::TimestampDependent,
645                        _,
646                    ) => true,
647                    // If there's no source timeline, we have the option to opt into a timeline,
648                    // so optimistically choose epoch ms. This is useful when the first query in a
649                    // transaction is on a static view.
650                    (
651                        TimelineContext::TimelineDependent(id_timeline),
652                        TimelineContext::TimestampIndependent | TimelineContext::TimestampDependent,
653                    ) => id_timeline == &Timeline::EpochMilliseconds,
654                    // Otherwise check if timelines are the same.
655                    (
656                        TimelineContext::TimelineDependent(id_timeline),
657                        TimelineContext::TimelineDependent(source_timeline),
658                    ) => id_timeline == source_timeline,
659                }
660            });
661        }
662
663        Ok(id_bundle)
664    }
665
666    #[instrument(level = "debug")]
667    pub(crate) async fn advance_timelines(&mut self) {
668        let global_timelines = std::mem::take(&mut self.global_timelines);
669        for (
670            timeline,
671            TimelineState {
672                oracle,
673                mut read_holds,
674            },
675        ) in global_timelines
676        {
677            // Timeline::EpochMilliseconds is advanced in group commits and doesn't need to be
678            // manually advanced here.
679            if timeline != Timeline::EpochMilliseconds && !self.read_only_controllers {
680                // For non realtime sources, we define now as the largest timestamp, not in
681                // advance of any object's upper. This is the largest timestamp that is closed
682                // to writes.
683                let id_bundle = self.ids_in_timeline(&timeline);
684
685                // Advance the timeline if-and-only-if there are objects in it.
686                // Otherwise we'd advance to the empty frontier, meaning we
687                // close it off for ever.
688                if !id_bundle.is_empty() {
689                    let least_valid_write = self.least_valid_write(&id_bundle);
690                    let now = Self::largest_not_in_advance_of_upper(&least_valid_write);
691                    oracle.apply_write(now).await;
692                    debug!(
693                        least_valid_write = ?least_valid_write,
694                        oracle_read_ts = ?oracle.read_ts().await,
695                        "advanced {:?} to {}",
696                        timeline,
697                        now,
698                    );
699                }
700            };
701            let read_ts = oracle.read_ts().await;
702            read_holds.downgrade(read_ts);
703            self.global_timelines
704                .insert(timeline, TimelineState { oracle, read_holds });
705        }
706    }
707}
708
709/// Convenience function for calculating the current upper bound that we want to
710/// prevent the global timestamp from exceeding.
711fn upper_bound(now: &mz_repr::Timestamp) -> mz_repr::Timestamp {
712    const TIMESTAMP_INTERVAL_MS: u64 = 5000;
713    const TIMESTAMP_INTERVAL_UPPER_BOUND: u64 = 2;
714
715    now.saturating_add(TIMESTAMP_INTERVAL_MS * TIMESTAMP_INTERVAL_UPPER_BOUND)
716}