mz_adapter/coord/
timeline.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A mechanism to ensure that a sequence of writes and reads proceed correctly through timestamps.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::sync::Arc;
15
16use chrono::{DateTime, Utc};
17use futures::Future;
18use mz_adapter_types::connection::ConnectionId;
19use mz_compute_types::ComputeInstanceId;
20use mz_ore::instrument;
21use mz_ore::now::{EpochMillis, NowFn, to_datetime};
22use mz_repr::{GlobalId, Timestamp};
23use mz_sql::names::{ResolvedDatabaseSpecifier, SchemaSpecifier};
24use mz_storage_types::sources::Timeline;
25use mz_timestamp_oracle::batching_oracle::BatchingTimestampOracle;
26use mz_timestamp_oracle::postgres_oracle::{
27    PostgresTimestampOracle, PostgresTimestampOracleConfig,
28};
29use mz_timestamp_oracle::{self, TimestampOracle, WriteTimestamp};
30use timely::progress::Timestamp as TimelyTimestamp;
31use tracing::{Instrument, debug, error, info};
32
33use crate::AdapterError;
34use crate::catalog::Catalog;
35use crate::coord::Coordinator;
36use crate::coord::id_bundle::CollectionIdBundle;
37use crate::coord::read_policy::ReadHolds;
38use crate::coord::timestamp_selection::TimestampProvider;
39use crate::optimize::dataflows::DataflowBuilder;
40
41/// An enum describing whether or not a query belongs to a timeline and whether the query can be
42/// affected by the timestamp at which it executes.
43#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash)]
44pub enum TimelineContext {
45    /// Can only ever belong to a single specific timeline. The answer will depend on a timestamp
46    /// chosen from that specific timeline.
47    TimelineDependent(Timeline),
48    /// Can belong to any timeline. The answer will depend on a timestamp chosen from some
49    /// timeline.
50    TimestampDependent,
51    /// The answer does not depend on a chosen timestamp.
52    TimestampIndependent,
53}
54
55impl TimelineContext {
56    /// Whether or not the context contains a timeline.
57    pub fn contains_timeline(&self) -> bool {
58        self.timeline().is_some()
59    }
60
61    /// The timeline belonging to this context, if one exists.
62    pub fn timeline(&self) -> Option<&Timeline> {
63        match self {
64            Self::TimelineDependent(timeline) => Some(timeline),
65            Self::TimestampIndependent | Self::TimestampDependent => None,
66        }
67    }
68}
69
70/// Global state for a single timeline.
71///
72/// For each timeline we maintain a timestamp oracle, which is responsible for
73/// providing read (and sometimes write) timestamps, and a set of read holds which
74/// guarantee that those read timestamps are valid.
75pub(crate) struct TimelineState<T: TimelyTimestamp> {
76    pub(crate) oracle: Arc<dyn TimestampOracle<T> + Send + Sync>,
77    pub(crate) read_holds: ReadHolds<T>,
78}
79
80impl<T: TimelyTimestamp> fmt::Debug for TimelineState<T> {
81    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82        f.debug_struct("TimelineState")
83            .field("read_holds", &self.read_holds)
84            .finish()
85    }
86}
87
88impl Coordinator {
89    pub(crate) fn now(&self) -> EpochMillis {
90        (self.catalog().config().now)()
91    }
92
93    pub(crate) fn now_datetime(&self) -> DateTime<Utc> {
94        to_datetime(self.now())
95    }
96
97    pub(crate) fn get_timestamp_oracle(
98        &self,
99        timeline: &Timeline,
100    ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync> {
101        let oracle = &self
102            .global_timelines
103            .get(timeline)
104            .expect("all timelines have a timestamp oracle")
105            .oracle;
106
107        Arc::clone(oracle)
108    }
109
110    /// Returns a [`TimestampOracle`] used for reads and writes from/to a local input.
111    pub(crate) fn get_local_timestamp_oracle(
112        &self,
113    ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync> {
114        self.get_timestamp_oracle(&Timeline::EpochMilliseconds)
115    }
116
117    /// Assign a timestamp for a read from a local input. Reads following writes
118    /// must be at a time >= the write's timestamp; we choose "equal to" for
119    /// simplicity's sake and to open as few new timestamps as possible.
120    pub(crate) async fn get_local_read_ts(&self) -> Timestamp {
121        self.get_local_timestamp_oracle().read_ts().await
122    }
123
124    /// Assign a timestamp for a write to a local input and increase the local ts.
125    /// Writes following reads must ensure that they are assigned a strictly larger
126    /// timestamp to ensure they are not visible to any real-time earlier reads.
127    #[instrument(name = "coord::get_local_write_ts")]
128    pub(crate) async fn get_local_write_ts(&mut self) -> WriteTimestamp {
129        self.global_timelines
130            .get_mut(&Timeline::EpochMilliseconds)
131            .expect("no realtime timeline")
132            .oracle
133            .write_ts()
134            .await
135    }
136
137    /// Peek the current timestamp used for operations on local inputs. Used to determine how much
138    /// to block group commits by.
139    pub(crate) async fn peek_local_write_ts(&self) -> Timestamp {
140        self.get_local_timestamp_oracle().peek_write_ts().await
141    }
142
143    /// Marks a write at `timestamp` as completed, using a [`TimestampOracle`].
144    pub(crate) fn apply_local_write(
145        &self,
146        timestamp: Timestamp,
147    ) -> impl Future<Output = ()> + Send + 'static {
148        let now = self.now().into();
149
150        let upper_bound = upper_bound(&now);
151        if timestamp > upper_bound {
152            error!(
153                %now,
154                "Setting local read timestamp to {timestamp}, which is more than \
155                the desired upper bound {upper_bound}."
156            );
157        }
158
159        let oracle = self.get_local_timestamp_oracle();
160
161        async move {
162            oracle
163                .apply_write(timestamp)
164                .instrument(tracing::debug_span!("apply_local_write_static", ?timestamp))
165                .await
166        }
167    }
168
169    /// Assign a timestamp for a write to the catalog. This timestamp should have the following
170    /// properties:
171    ///
172    ///   - Monotonically increasing.
173    ///   - Greater than or equal to the current catalog upper.
174    ///   - Greater than the largest write timestamp used in the
175    ///     [epoch millisecond timeline](Timeline::EpochMilliseconds).
176    ///
177    /// In general this is fully satisfied by the getting the current write timestamp in the
178    /// [epoch millisecond timeline](Timeline::EpochMilliseconds) from the timestamp oracle,
179    /// however, in read-only mode we cannot modify the timestamp oracle.
180    pub(crate) async fn get_catalog_write_ts(&mut self) -> Timestamp {
181        if self.read_only_controllers {
182            let (write_ts, upper) =
183                futures::future::join(self.peek_local_write_ts(), self.catalog().current_upper())
184                    .await;
185            std::cmp::max(write_ts, upper)
186        } else {
187            self.get_local_write_ts().await.timestamp
188        }
189    }
190
191    /// Ensures that a global timeline state exists for `timeline`.
192    pub(crate) async fn ensure_timeline_state<'a>(
193        &'a mut self,
194        timeline: &'a Timeline,
195    ) -> &'a mut TimelineState<Timestamp> {
196        Self::ensure_timeline_state_with_initial_time(
197            timeline,
198            Timestamp::minimum(),
199            self.catalog().config().now.clone(),
200            self.pg_timestamp_oracle_config.clone(),
201            &mut self.global_timelines,
202            self.read_only_controllers,
203        )
204        .await
205    }
206
207    /// Ensures that a global timeline state exists for `timeline`, with an initial time
208    /// of `initially`.
209    #[instrument]
210    pub(crate) async fn ensure_timeline_state_with_initial_time<'a>(
211        timeline: &'a Timeline,
212        initially: Timestamp,
213        now: NowFn,
214        pg_oracle_config: Option<PostgresTimestampOracleConfig>,
215        global_timelines: &'a mut BTreeMap<Timeline, TimelineState<Timestamp>>,
216        read_only: bool,
217    ) -> &'a mut TimelineState<Timestamp> {
218        if !global_timelines.contains_key(timeline) {
219            info!(
220                "opening a new CRDB/postgres TimestampOracle for timeline {:?}",
221                timeline,
222            );
223
224            let now_fn = if timeline == &Timeline::EpochMilliseconds {
225                now
226            } else {
227                // Timelines that are not `EpochMilliseconds` don't have an
228                // "external" clock that wants to drive forward timestamps in
229                // addition to the rule that write timestamps must be strictly
230                // monotonically increasing.
231                //
232                // Passing in a clock that always yields the minimum takes the
233                // clock out of the equation and makes timestamps advance only
234                // by the rule about strict monotonicity mentioned above.
235                NowFn::from(|| Timestamp::minimum().into())
236            };
237
238            let pg_oracle_config = pg_oracle_config.expect(
239                        "missing --timestamp-oracle-url even though the crdb-backed timestamp oracle was configured");
240
241            let batching_metrics = Arc::clone(&pg_oracle_config.metrics);
242
243            let pg_oracle: Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync> = Arc::new(
244                PostgresTimestampOracle::open(
245                    pg_oracle_config,
246                    timeline.to_string(),
247                    initially,
248                    now_fn,
249                    read_only,
250                )
251                .await,
252            );
253
254            let batching_oracle = BatchingTimestampOracle::new(batching_metrics, pg_oracle);
255
256            let oracle: Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync> =
257                Arc::new(batching_oracle);
258
259            global_timelines.insert(
260                timeline.clone(),
261                TimelineState {
262                    oracle,
263                    read_holds: ReadHolds::new(),
264                },
265            );
266        }
267        global_timelines.get_mut(timeline).expect("inserted above")
268    }
269
270    /// Given a [`Timeline`] and a [`CollectionIdBundle`], removes all of the "storage ids"
271    /// and "compute ids" in the bundle, from the timeline.
272    pub(crate) fn remove_resources_associated_with_timeline(
273        &mut self,
274        timeline: Timeline,
275        ids: CollectionIdBundle,
276    ) -> bool {
277        let TimelineState { read_holds, .. } = self
278            .global_timelines
279            .get_mut(&timeline)
280            .expect("all timeslines have a timestamp oracle");
281
282        // Remove all of the underlying resources.
283        for id in ids.storage_ids {
284            read_holds.remove_storage_collection(id);
285        }
286        for (compute_id, ids) in ids.compute_ids {
287            for id in ids {
288                read_holds.remove_compute_collection(compute_id, id);
289            }
290        }
291        let became_empty = read_holds.is_empty();
292
293        became_empty
294    }
295
296    pub(crate) fn remove_compute_ids_from_timeline<I>(&mut self, ids: I) -> Vec<Timeline>
297    where
298        I: IntoIterator<Item = (ComputeInstanceId, GlobalId)>,
299    {
300        let mut empty_timelines = BTreeSet::new();
301        for (compute_instance, id) in ids {
302            for (timeline, TimelineState { read_holds, .. }) in &mut self.global_timelines {
303                read_holds.remove_compute_collection(compute_instance, id);
304                if read_holds.is_empty() {
305                    empty_timelines.insert(timeline.clone());
306                }
307            }
308        }
309        empty_timelines.into_iter().collect()
310    }
311
312    #[instrument(level = "debug")]
313    pub(crate) async fn advance_timelines(&mut self) {
314        let global_timelines = std::mem::take(&mut self.global_timelines);
315        for (
316            timeline,
317            TimelineState {
318                oracle,
319                mut read_holds,
320            },
321        ) in global_timelines
322        {
323            // Timeline::EpochMilliseconds is advanced in group commits and doesn't need to be
324            // manually advanced here.
325            if timeline != Timeline::EpochMilliseconds && !self.read_only_controllers {
326                // For non realtime sources, we define now as the largest timestamp, not in
327                // advance of any object's upper. This is the largest timestamp that is closed
328                // to writes.
329                let id_bundle = self.catalog().ids_in_timeline(&timeline);
330
331                // Advance the timeline if-and-only-if there are objects in it.
332                // Otherwise we'd advance to the empty frontier, meaning we
333                // close it off for ever.
334                if !id_bundle.is_empty() {
335                    let least_valid_write = self.least_valid_write(&id_bundle);
336                    let now = Self::largest_not_in_advance_of_upper(&least_valid_write);
337                    oracle.apply_write(now).await;
338                    debug!(
339                        least_valid_write = ?least_valid_write,
340                        oracle_read_ts = ?oracle.read_ts().await,
341                        "advanced {:?} to {}",
342                        timeline,
343                        now,
344                    );
345                }
346            };
347            let read_ts = oracle.read_ts().await;
348            read_holds.downgrade(read_ts);
349            self.global_timelines
350                .insert(timeline, TimelineState { oracle, read_holds });
351        }
352    }
353}
354
355/// Convenience function for calculating the current upper bound that we want to
356/// prevent the global timestamp from exceeding.
357fn upper_bound(now: &mz_repr::Timestamp) -> mz_repr::Timestamp {
358    const TIMESTAMP_INTERVAL_MS: u64 = 5000;
359    const TIMESTAMP_INTERVAL_UPPER_BOUND: u64 = 2;
360
361    now.saturating_add(TIMESTAMP_INTERVAL_MS * TIMESTAMP_INTERVAL_UPPER_BOUND)
362}
363
364/// Return the set of ids in a timedomain and verify timeline correctness.
365///
366/// When a user starts a transaction, we need to prevent compaction of anything
367/// they might read from. We use a heuristic of "anything in the same database
368/// schemas with the same timeline as whatever the first query is".
369///
370/// This is a free-standing function that can be called from both the old peek sequencing
371/// and the new frontend peek sequencing.
372pub(crate) fn timedomain_for<'a, I>(
373    catalog: &Catalog,
374    dataflow_builder: &DataflowBuilder,
375    uses_ids: I,
376    timeline_context: &TimelineContext,
377    conn_id: &ConnectionId,
378    compute_instance: ComputeInstanceId,
379) -> Result<CollectionIdBundle, AdapterError>
380where
381    I: IntoIterator<Item = &'a GlobalId>,
382{
383    // Gather all the used schemas.
384    let mut schemas = BTreeSet::new();
385    for id in uses_ids {
386        let entry = catalog.get_entry_by_global_id(id);
387        let name = entry.name();
388        schemas.insert((name.qualifiers.database_spec, name.qualifiers.schema_spec));
389    }
390
391    let pg_catalog_schema = (
392        ResolvedDatabaseSpecifier::Ambient,
393        SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
394    );
395    let system_schemas: Vec<_> = catalog
396        .system_schema_ids()
397        .map(|id| (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)))
398        .collect();
399
400    if system_schemas.iter().any(|s| schemas.contains(s)) {
401        // If any of the system schemas is specified, add the rest of the
402        // system schemas.
403        schemas.extend(system_schemas);
404    } else if !schemas.is_empty() {
405        // Always include the pg_catalog schema, if schemas is non-empty. The pg_catalog schemas is
406        // sometimes used by applications in followup queries.
407        schemas.insert(pg_catalog_schema);
408    }
409
410    // Gather the IDs of all items in all used schemas.
411    let mut collection_ids: BTreeSet<GlobalId> = BTreeSet::new();
412    for (db, schema) in schemas {
413        let schema = catalog.get_schema(&db, &schema, conn_id);
414        // Note: We include just the latest `GlobalId` instead of all `GlobalId`s associated
415        // with an object, because older versions will already get included, if there are
416        // objects the depend on them.
417        let global_ids = schema
418            .items
419            .values()
420            .map(|item_id| catalog.get_entry(item_id).latest_global_id());
421        collection_ids.extend(global_ids);
422    }
423
424    // Gather the dependencies of those items.
425    let mut id_bundle: CollectionIdBundle = dataflow_builder.sufficient_collections(collection_ids);
426
427    // Filter out ids from different timelines.
428    for ids in [
429        &mut id_bundle.storage_ids,
430        &mut id_bundle.compute_ids.entry(compute_instance).or_default(),
431    ] {
432        ids.retain(|gid| {
433            let id_timeline_context = catalog
434                .validate_timeline_context(vec![*gid])
435                .expect("single id should never fail");
436            match (&id_timeline_context, &timeline_context) {
437                // If this id doesn't have a timeline, we can keep it.
438                (
439                    TimelineContext::TimestampIndependent | TimelineContext::TimestampDependent,
440                    _,
441                ) => true,
442                // If there's no source timeline, we have the option to opt into a timeline,
443                // so optimistically choose epoch ms. This is useful when the first query in a
444                // transaction is on a static view.
445                (
446                    TimelineContext::TimelineDependent(id_timeline),
447                    TimelineContext::TimestampIndependent | TimelineContext::TimestampDependent,
448                ) => *id_timeline == Timeline::EpochMilliseconds,
449                // Otherwise check if timelines are the same.
450                (
451                    TimelineContext::TimelineDependent(id_timeline),
452                    TimelineContext::TimelineDependent(source_timeline),
453                ) => id_timeline == source_timeline,
454            }
455        });
456    }
457
458    Ok(id_bundle)
459}