mz_adapter/coord/
timeline.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A mechanism to ensure that a sequence of writes and reads proceed correctly through timestamps.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::sync::Arc;
15
16use chrono::{DateTime, Utc};
17use futures::Future;
18use mz_adapter_types::connection::ConnectionId;
19use mz_compute_types::ComputeInstanceId;
20use mz_ore::instrument;
21use mz_ore::now::{EpochMillis, NowFn, to_datetime};
22use mz_repr::{GlobalId, Timestamp};
23use mz_sql::names::{ResolvedDatabaseSpecifier, SchemaSpecifier};
24use mz_storage_types::sources::Timeline;
25use mz_timestamp_oracle::batching_oracle::BatchingTimestampOracle;
26use mz_timestamp_oracle::postgres_oracle::{
27    PostgresTimestampOracle, PostgresTimestampOracleConfig,
28};
29use mz_timestamp_oracle::{self, TimestampOracle, WriteTimestamp};
30use timely::progress::Timestamp as TimelyTimestamp;
31use tracing::{Instrument, debug, error, info};
32
33use crate::AdapterError;
34use crate::coord::Coordinator;
35use crate::coord::id_bundle::CollectionIdBundle;
36use crate::coord::read_policy::ReadHolds;
37use crate::coord::timestamp_selection::TimestampProvider;
38
39/// An enum describing whether or not a query belongs to a timeline and whether the query can be
40/// affected by the timestamp at which it executes.
41#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash)]
42pub enum TimelineContext {
43    /// Can only ever belong to a single specific timeline. The answer will depend on a timestamp
44    /// chosen from that specific timeline.
45    TimelineDependent(Timeline),
46    /// Can belong to any timeline. The answer will depend on a timestamp chosen from some
47    /// timeline.
48    TimestampDependent,
49    /// The answer does not depend on a chosen timestamp.
50    TimestampIndependent,
51}
52
53impl TimelineContext {
54    /// Whether or not the context contains a timeline.
55    pub fn contains_timeline(&self) -> bool {
56        self.timeline().is_some()
57    }
58
59    /// The timeline belonging to this context, if one exists.
60    pub fn timeline(&self) -> Option<&Timeline> {
61        match self {
62            Self::TimelineDependent(timeline) => Some(timeline),
63            Self::TimestampIndependent | Self::TimestampDependent => None,
64        }
65    }
66}
67
68/// Global state for a single timeline.
69///
70/// For each timeline we maintain a timestamp oracle, which is responsible for
71/// providing read (and sometimes write) timestamps, and a set of read holds which
72/// guarantee that those read timestamps are valid.
73pub(crate) struct TimelineState<T: TimelyTimestamp> {
74    pub(crate) oracle: Arc<dyn TimestampOracle<T> + Send + Sync>,
75    pub(crate) read_holds: ReadHolds<T>,
76}
77
78impl<T: TimelyTimestamp> fmt::Debug for TimelineState<T> {
79    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80        f.debug_struct("TimelineState")
81            .field("read_holds", &self.read_holds)
82            .finish()
83    }
84}
85
86impl Coordinator {
87    pub(crate) fn now(&self) -> EpochMillis {
88        (self.catalog().config().now)()
89    }
90
91    pub(crate) fn now_datetime(&self) -> DateTime<Utc> {
92        to_datetime(self.now())
93    }
94
95    pub(crate) fn get_timestamp_oracle(
96        &self,
97        timeline: &Timeline,
98    ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync> {
99        let oracle = &self
100            .global_timelines
101            .get(timeline)
102            .expect("all timelines have a timestamp oracle")
103            .oracle;
104
105        Arc::clone(oracle)
106    }
107
108    /// Returns a [`TimestampOracle`] used for reads and writes from/to a local input.
109    pub(crate) fn get_local_timestamp_oracle(
110        &self,
111    ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync> {
112        self.get_timestamp_oracle(&Timeline::EpochMilliseconds)
113    }
114
115    /// Assign a timestamp for a read from a local input. Reads following writes
116    /// must be at a time >= the write's timestamp; we choose "equal to" for
117    /// simplicity's sake and to open as few new timestamps as possible.
118    pub(crate) async fn get_local_read_ts(&self) -> Timestamp {
119        self.get_local_timestamp_oracle().read_ts().await
120    }
121
122    /// Assign a timestamp for a write to a local input and increase the local ts.
123    /// Writes following reads must ensure that they are assigned a strictly larger
124    /// timestamp to ensure they are not visible to any real-time earlier reads.
125    #[instrument(name = "coord::get_local_write_ts")]
126    pub(crate) async fn get_local_write_ts(&mut self) -> WriteTimestamp {
127        self.global_timelines
128            .get_mut(&Timeline::EpochMilliseconds)
129            .expect("no realtime timeline")
130            .oracle
131            .write_ts()
132            .await
133    }
134
135    /// Peek the current timestamp used for operations on local inputs. Used to determine how much
136    /// to block group commits by.
137    pub(crate) async fn peek_local_write_ts(&self) -> Timestamp {
138        self.get_local_timestamp_oracle().peek_write_ts().await
139    }
140
141    /// Marks a write at `timestamp` as completed, using a [`TimestampOracle`].
142    pub(crate) fn apply_local_write(
143        &self,
144        timestamp: Timestamp,
145    ) -> impl Future<Output = ()> + Send + 'static {
146        let now = self.now().into();
147
148        let upper_bound = upper_bound(&now);
149        if timestamp > upper_bound {
150            error!(
151                %now,
152                "Setting local read timestamp to {timestamp}, which is more than \
153                the desired upper bound {upper_bound}."
154            );
155        }
156
157        let oracle = self.get_local_timestamp_oracle();
158
159        async move {
160            oracle
161                .apply_write(timestamp)
162                .instrument(tracing::debug_span!("apply_local_write_static", ?timestamp))
163                .await
164        }
165    }
166
167    /// Assign a timestamp for a write to the catalog. This timestamp should have the following
168    /// properties:
169    ///
170    ///   - Monotonically increasing.
171    ///   - Greater than or equal to the current catalog upper.
172    ///   - Greater than the largest write timestamp used in the
173    ///     [epoch millisecond timeline](Timeline::EpochMilliseconds).
174    ///
175    /// In general this is fully satisfied by the getting the current write timestamp in the
176    /// [epoch millisecond timeline](Timeline::EpochMilliseconds) from the timestamp oracle,
177    /// however, in read-only mode we cannot modify the timestamp oracle.
178    pub(crate) async fn get_catalog_write_ts(&mut self) -> Timestamp {
179        if self.read_only_controllers {
180            let (write_ts, upper) =
181                futures::future::join(self.peek_local_write_ts(), self.catalog().current_upper())
182                    .await;
183            std::cmp::max(write_ts, upper)
184        } else {
185            self.get_local_write_ts().await.timestamp
186        }
187    }
188
189    /// Ensures that a global timeline state exists for `timeline`.
190    pub(crate) async fn ensure_timeline_state<'a>(
191        &'a mut self,
192        timeline: &'a Timeline,
193    ) -> &'a mut TimelineState<Timestamp> {
194        Self::ensure_timeline_state_with_initial_time(
195            timeline,
196            Timestamp::minimum(),
197            self.catalog().config().now.clone(),
198            self.pg_timestamp_oracle_config.clone(),
199            &mut self.global_timelines,
200            self.read_only_controllers,
201        )
202        .await
203    }
204
205    /// Ensures that a global timeline state exists for `timeline`, with an initial time
206    /// of `initially`.
207    #[instrument]
208    pub(crate) async fn ensure_timeline_state_with_initial_time<'a>(
209        timeline: &'a Timeline,
210        initially: Timestamp,
211        now: NowFn,
212        pg_oracle_config: Option<PostgresTimestampOracleConfig>,
213        global_timelines: &'a mut BTreeMap<Timeline, TimelineState<Timestamp>>,
214        read_only: bool,
215    ) -> &'a mut TimelineState<Timestamp> {
216        if !global_timelines.contains_key(timeline) {
217            info!(
218                "opening a new CRDB/postgres TimestampOracle for timeline {:?}",
219                timeline,
220            );
221
222            let now_fn = if timeline == &Timeline::EpochMilliseconds {
223                now
224            } else {
225                // Timelines that are not `EpochMilliseconds` don't have an
226                // "external" clock that wants to drive forward timestamps in
227                // addition to the rule that write timestamps must be strictly
228                // monotonically increasing.
229                //
230                // Passing in a clock that always yields the minimum takes the
231                // clock out of the equation and makes timestamps advance only
232                // by the rule about strict monotonicity mentioned above.
233                NowFn::from(|| Timestamp::minimum().into())
234            };
235
236            let pg_oracle_config = pg_oracle_config.expect(
237                        "missing --timestamp-oracle-url even though the crdb-backed timestamp oracle was configured");
238
239            let batching_metrics = Arc::clone(&pg_oracle_config.metrics);
240
241            let pg_oracle: Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync> = Arc::new(
242                PostgresTimestampOracle::open(
243                    pg_oracle_config,
244                    timeline.to_string(),
245                    initially,
246                    now_fn,
247                    read_only,
248                )
249                .await,
250            );
251
252            let batching_oracle = BatchingTimestampOracle::new(batching_metrics, pg_oracle);
253
254            let oracle: Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync> =
255                Arc::new(batching_oracle);
256
257            global_timelines.insert(
258                timeline.clone(),
259                TimelineState {
260                    oracle,
261                    read_holds: ReadHolds::new(),
262                },
263            );
264        }
265        global_timelines.get_mut(timeline).expect("inserted above")
266    }
267
268    /// Groups together storage and compute resources into a [`CollectionIdBundle`]
269    pub(crate) fn build_collection_id_bundle(
270        &self,
271        storage_ids: impl IntoIterator<Item = GlobalId>,
272        compute_ids: impl IntoIterator<Item = (ComputeInstanceId, GlobalId)>,
273        clusters: impl IntoIterator<Item = ComputeInstanceId>,
274    ) -> CollectionIdBundle {
275        let mut compute: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
276
277        // Collect all compute_ids.
278        for (instance_id, id) in compute_ids {
279            compute.entry(instance_id).or_default().insert(id);
280        }
281
282        // Collect all GlobalIds associated with a compute instance ID.
283        let cluster_set: BTreeSet<_> = clusters.into_iter().collect();
284        for (_timeline, TimelineState { read_holds, .. }) in &self.global_timelines {
285            let compute_ids = read_holds
286                .compute_ids()
287                .filter(|(instance_id, _id)| cluster_set.contains(instance_id));
288            for (instance_id, id) in compute_ids {
289                compute.entry(instance_id).or_default().insert(id);
290            }
291        }
292
293        CollectionIdBundle {
294            storage_ids: storage_ids.into_iter().collect(),
295            compute_ids: compute,
296        }
297    }
298
299    /// Given a [`Timeline`] and a [`CollectionIdBundle`], removes all of the "storage ids"
300    /// and "compute ids" in the bundle, from the timeline.
301    pub(crate) fn remove_resources_associated_with_timeline(
302        &mut self,
303        timeline: Timeline,
304        ids: CollectionIdBundle,
305    ) -> bool {
306        let TimelineState { read_holds, .. } = self
307            .global_timelines
308            .get_mut(&timeline)
309            .expect("all timeslines have a timestamp oracle");
310
311        // Remove all of the underlying resources.
312        for id in ids.storage_ids {
313            read_holds.remove_storage_collection(id);
314        }
315        for (compute_id, ids) in ids.compute_ids {
316            for id in ids {
317                read_holds.remove_compute_collection(compute_id, id);
318            }
319        }
320        let became_empty = read_holds.is_empty();
321
322        became_empty
323    }
324
325    pub(crate) fn remove_compute_ids_from_timeline<I>(&mut self, ids: I) -> Vec<Timeline>
326    where
327        I: IntoIterator<Item = (ComputeInstanceId, GlobalId)>,
328    {
329        let mut empty_timelines = BTreeSet::new();
330        for (compute_instance, id) in ids {
331            for (timeline, TimelineState { read_holds, .. }) in &mut self.global_timelines {
332                read_holds.remove_compute_collection(compute_instance, id);
333                if read_holds.is_empty() {
334                    empty_timelines.insert(timeline.clone());
335                }
336            }
337        }
338        empty_timelines.into_iter().collect()
339    }
340
341    /// Return the set of ids in a timedomain and verify timeline correctness.
342    ///
343    /// When a user starts a transaction, we need to prevent compaction of anything
344    /// they might read from. We use a heuristic of "anything in the same database
345    /// schemas with the same timeline as whatever the first query is".
346    pub(crate) fn timedomain_for<'a, I>(
347        &self,
348        uses_ids: I,
349        timeline_context: &TimelineContext,
350        conn_id: &ConnectionId,
351        compute_instance: ComputeInstanceId,
352    ) -> Result<CollectionIdBundle, AdapterError>
353    where
354        I: IntoIterator<Item = &'a GlobalId>,
355    {
356        // Gather all the used schemas.
357        let mut schemas = BTreeSet::new();
358        for id in uses_ids {
359            let entry = self.catalog().get_entry_by_global_id(id);
360            let name = entry.name();
361            schemas.insert((name.qualifiers.database_spec, name.qualifiers.schema_spec));
362        }
363
364        let pg_catalog_schema = (
365            ResolvedDatabaseSpecifier::Ambient,
366            SchemaSpecifier::Id(self.catalog().get_pg_catalog_schema_id()),
367        );
368        let system_schemas: Vec<_> = self
369            .catalog()
370            .system_schema_ids()
371            .map(|id| (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)))
372            .collect();
373
374        if system_schemas.iter().any(|s| schemas.contains(s)) {
375            // If any of the system schemas is specified, add the rest of the
376            // system schemas.
377            schemas.extend(system_schemas);
378        } else if !schemas.is_empty() {
379            // Always include the pg_catalog schema, if schemas is non-empty. The pg_catalog schemas is
380            // sometimes used by applications in followup queries.
381            schemas.insert(pg_catalog_schema);
382        }
383
384        // Gather the IDs of all items in all used schemas.
385        let mut collection_ids: BTreeSet<GlobalId> = BTreeSet::new();
386        for (db, schema) in schemas {
387            let schema = self.catalog().get_schema(&db, &schema, conn_id);
388            // Note: We include just the latest `GlobalId` instead of all `GlobalId`s associated
389            // with an object, because older versions will already get included, if there are
390            // objects the depend on them.
391            let global_ids = schema
392                .items
393                .values()
394                .map(|item_id| self.catalog().get_entry(item_id).latest_global_id());
395            collection_ids.extend(global_ids);
396        }
397
398        // Gather the dependencies of those items.
399        let mut id_bundle: CollectionIdBundle = self
400            .index_oracle(compute_instance)
401            .sufficient_collections(collection_ids);
402
403        // Filter out ids from different timelines.
404        for ids in [
405            &mut id_bundle.storage_ids,
406            &mut id_bundle.compute_ids.entry(compute_instance).or_default(),
407        ] {
408            ids.retain(|gid| {
409                let id_timeline_context = self
410                    .catalog()
411                    .validate_timeline_context(vec![*gid])
412                    .expect("single id should never fail");
413                match (&id_timeline_context, &timeline_context) {
414                    // If this id doesn't have a timeline, we can keep it.
415                    (
416                        TimelineContext::TimestampIndependent | TimelineContext::TimestampDependent,
417                        _,
418                    ) => true,
419                    // If there's no source timeline, we have the option to opt into a timeline,
420                    // so optimistically choose epoch ms. This is useful when the first query in a
421                    // transaction is on a static view.
422                    (
423                        TimelineContext::TimelineDependent(id_timeline),
424                        TimelineContext::TimestampIndependent | TimelineContext::TimestampDependent,
425                    ) => id_timeline == &Timeline::EpochMilliseconds,
426                    // Otherwise check if timelines are the same.
427                    (
428                        TimelineContext::TimelineDependent(id_timeline),
429                        TimelineContext::TimelineDependent(source_timeline),
430                    ) => id_timeline == source_timeline,
431                }
432            });
433        }
434
435        Ok(id_bundle)
436    }
437
438    #[instrument(level = "debug")]
439    pub(crate) async fn advance_timelines(&mut self) {
440        let global_timelines = std::mem::take(&mut self.global_timelines);
441        for (
442            timeline,
443            TimelineState {
444                oracle,
445                mut read_holds,
446            },
447        ) in global_timelines
448        {
449            // Timeline::EpochMilliseconds is advanced in group commits and doesn't need to be
450            // manually advanced here.
451            if timeline != Timeline::EpochMilliseconds && !self.read_only_controllers {
452                // For non realtime sources, we define now as the largest timestamp, not in
453                // advance of any object's upper. This is the largest timestamp that is closed
454                // to writes.
455                let id_bundle = self.catalog().ids_in_timeline(&timeline);
456
457                // Advance the timeline if-and-only-if there are objects in it.
458                // Otherwise we'd advance to the empty frontier, meaning we
459                // close it off for ever.
460                if !id_bundle.is_empty() {
461                    let least_valid_write = self.least_valid_write(&id_bundle);
462                    let now = Self::largest_not_in_advance_of_upper(&least_valid_write);
463                    oracle.apply_write(now).await;
464                    debug!(
465                        least_valid_write = ?least_valid_write,
466                        oracle_read_ts = ?oracle.read_ts().await,
467                        "advanced {:?} to {}",
468                        timeline,
469                        now,
470                    );
471                }
472            };
473            let read_ts = oracle.read_ts().await;
474            read_holds.downgrade(read_ts);
475            self.global_timelines
476                .insert(timeline, TimelineState { oracle, read_holds });
477        }
478    }
479}
480
481/// Convenience function for calculating the current upper bound that we want to
482/// prevent the global timestamp from exceeding.
483fn upper_bound(now: &mz_repr::Timestamp) -> mz_repr::Timestamp {
484    const TIMESTAMP_INTERVAL_MS: u64 = 5000;
485    const TIMESTAMP_INTERVAL_UPPER_BOUND: u64 = 2;
486
487    now.saturating_add(TIMESTAMP_INTERVAL_MS * TIMESTAMP_INTERVAL_UPPER_BOUND)
488}