Skip to main content

mz_adapter/coord/
timeline.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A mechanism to ensure that a sequence of writes and reads proceed correctly through timestamps.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::sync::Arc;
15
16use chrono::{DateTime, Utc};
17use futures::Future;
18use mz_adapter_types::connection::ConnectionId;
19use mz_compute_types::ComputeInstanceId;
20use mz_ore::now::{EpochMillis, NowFn, to_datetime};
21use mz_ore::{instrument, soft_assert_or_log};
22use mz_repr::{GlobalId, Timestamp};
23use mz_sql::names::{ResolvedDatabaseSpecifier, SchemaSpecifier};
24use mz_storage_types::sources::Timeline;
25use mz_timestamp_oracle::batching_oracle::BatchingTimestampOracle;
26use mz_timestamp_oracle::{self, TimestampOracle, TimestampOracleConfig, WriteTimestamp};
27use timely::progress::Timestamp as TimelyTimestamp;
28use tracing::{Instrument, debug, error, info};
29
30use crate::AdapterError;
31use crate::catalog::Catalog;
32use crate::coord::Coordinator;
33use crate::coord::id_bundle::CollectionIdBundle;
34use crate::coord::read_policy::ReadHolds;
35use crate::coord::timestamp_selection::TimestampProvider;
36use crate::optimize::dataflows::DataflowBuilder;
37
38/// An enum describing whether or not a query belongs to a timeline and whether the query can be
39/// affected by the timestamp at which it executes.
40#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash)]
41pub enum TimelineContext {
42    /// Can only ever belong to a single specific timeline. The answer will depend on a timestamp
43    /// chosen from that specific timeline.
44    TimelineDependent(Timeline),
45    /// Can belong to any timeline. The answer will depend on a timestamp chosen from some
46    /// timeline.
47    TimestampDependent,
48    /// The answer does not depend on a chosen timestamp.
49    TimestampIndependent,
50}
51
52impl TimelineContext {
53    /// Whether or not the context contains a timeline.
54    pub fn contains_timeline(&self) -> bool {
55        self.timeline().is_some()
56    }
57
58    /// The timeline belonging to this context, if one exists.
59    pub fn timeline(&self) -> Option<&Timeline> {
60        match self {
61            Self::TimelineDependent(timeline) => Some(timeline),
62            Self::TimestampIndependent | Self::TimestampDependent => None,
63        }
64    }
65}
66
67/// Global state for a single timeline.
68///
69/// For each timeline we maintain a timestamp oracle, which is responsible for
70/// providing read (and sometimes write) timestamps, and a set of read holds which
71/// guarantee that those read timestamps are valid.
72pub(crate) struct TimelineState<T: TimelyTimestamp> {
73    pub(crate) oracle: Arc<dyn TimestampOracle<T> + Send + Sync>,
74    pub(crate) read_holds: ReadHolds<T>,
75}
76
77impl<T: TimelyTimestamp> fmt::Debug for TimelineState<T> {
78    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79        f.debug_struct("TimelineState")
80            .field("read_holds", &self.read_holds)
81            .finish()
82    }
83}
84
85impl Coordinator {
86    pub(crate) fn now(&self) -> EpochMillis {
87        (self.catalog().config().now)()
88    }
89
90    pub(crate) fn now_datetime(&self) -> DateTime<Utc> {
91        to_datetime(self.now())
92    }
93
94    pub(crate) fn get_timestamp_oracle(
95        &self,
96        timeline: &Timeline,
97    ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync> {
98        let oracle = &self
99            .global_timelines
100            .get(timeline)
101            .expect("all timelines have a timestamp oracle")
102            .oracle;
103
104        Arc::clone(oracle)
105    }
106
107    /// Returns a [`TimestampOracle`] used for reads and writes from/to a local input.
108    pub(crate) fn get_local_timestamp_oracle(
109        &self,
110    ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync> {
111        self.get_timestamp_oracle(&Timeline::EpochMilliseconds)
112    }
113
114    /// Assign a timestamp for a read from a local input. Reads following writes
115    /// must be at a time >= the write's timestamp; we choose "equal to" for
116    /// simplicity's sake and to open as few new timestamps as possible.
117    pub(crate) async fn get_local_read_ts(&self) -> Timestamp {
118        self.get_local_timestamp_oracle().read_ts().await
119    }
120
121    /// Assign a timestamp for a write to a local input and increase the local ts.
122    /// Writes following reads must ensure that they are assigned a strictly larger
123    /// timestamp to ensure they are not visible to any real-time earlier reads.
124    #[instrument(name = "coord::get_local_write_ts")]
125    pub(crate) async fn get_local_write_ts(&mut self) -> WriteTimestamp {
126        self.global_timelines
127            .get_mut(&Timeline::EpochMilliseconds)
128            .expect("no realtime timeline")
129            .oracle
130            .write_ts()
131            .await
132    }
133
134    /// Peek the current timestamp used for operations on local inputs. Used to determine how much
135    /// to block group commits by.
136    pub(crate) async fn peek_local_write_ts(&self) -> Timestamp {
137        self.get_local_timestamp_oracle().peek_write_ts().await
138    }
139
140    /// Marks a write at `timestamp` as completed, using a [`TimestampOracle`].
141    pub(crate) fn apply_local_write(
142        &self,
143        timestamp: Timestamp,
144    ) -> impl Future<Output = ()> + Send + 'static {
145        let now = self.now().into();
146
147        let upper_bound = upper_bound(&now);
148        if timestamp > upper_bound {
149            error!(
150                %now,
151                "Setting local read timestamp to {timestamp}, which is more than \
152                the desired upper bound {upper_bound}."
153            );
154        }
155
156        let oracle = self.get_local_timestamp_oracle();
157
158        async move {
159            oracle
160                .apply_write(timestamp)
161                .instrument(tracing::debug_span!("apply_local_write_static", ?timestamp))
162                .await
163        }
164    }
165
166    /// Assign a timestamp for a write to the catalog. This timestamp should have the following
167    /// properties:
168    ///
169    ///   - Monotonically increasing.
170    ///   - Greater than or equal to the current catalog upper.
171    ///   - Greater than the largest write timestamp used in the
172    ///     [epoch millisecond timeline](Timeline::EpochMilliseconds).
173    ///
174    /// In general this is fully satisfied by the getting the current write timestamp in the
175    /// [epoch millisecond timeline](Timeline::EpochMilliseconds) from the timestamp oracle,
176    /// however, in read-only mode we cannot modify the timestamp oracle.
177    pub(crate) async fn get_catalog_write_ts(&mut self) -> Timestamp {
178        if self.read_only_controllers {
179            let (write_ts, upper) =
180                futures::future::join(self.peek_local_write_ts(), self.catalog().current_upper())
181                    .await;
182            std::cmp::max(write_ts, upper)
183        } else {
184            self.get_local_write_ts().await.timestamp
185        }
186    }
187
188    /// Ensures that a global timeline state exists for `timeline`.
189    pub(crate) async fn ensure_timeline_state<'a>(
190        &'a mut self,
191        timeline: &'a Timeline,
192    ) -> &'a mut TimelineState<Timestamp> {
193        Self::ensure_timeline_state_with_initial_time(
194            timeline,
195            Timestamp::minimum(),
196            self.catalog().config().now.clone(),
197            self.timestamp_oracle_config.clone(),
198            &mut self.global_timelines,
199            self.read_only_controllers,
200        )
201        .await
202    }
203
204    /// Ensures that a global timeline state exists for `timeline`, with an initial time
205    /// of `initially`.
206    #[instrument]
207    pub(crate) async fn ensure_timeline_state_with_initial_time<'a>(
208        timeline: &'a Timeline,
209        initially: Timestamp,
210        now: NowFn,
211        oracle_config: Option<TimestampOracleConfig>,
212        global_timelines: &'a mut BTreeMap<Timeline, TimelineState<Timestamp>>,
213        read_only: bool,
214    ) -> &'a mut TimelineState<Timestamp> {
215        if !global_timelines.contains_key(timeline) {
216            info!("opening a new TimestampOracle for timeline {:?}", timeline,);
217
218            let now_fn = if timeline == &Timeline::EpochMilliseconds {
219                now
220            } else {
221                // Timelines that are not `EpochMilliseconds` don't have an
222                // "external" clock that wants to drive forward timestamps in
223                // addition to the rule that write timestamps must be strictly
224                // monotonically increasing.
225                //
226                // Passing in a clock that always yields the minimum takes the
227                // clock out of the equation and makes timestamps advance only
228                // by the rule about strict monotonicity mentioned above.
229                NowFn::from(|| Timestamp::minimum().into())
230            };
231
232            let oracle_config = oracle_config.expect(
233                "missing --timestamp-oracle-url even though the timestamp oracle was configured",
234            );
235
236            let oracle = oracle_config
237                .open(timeline.to_string(), initially, now_fn, read_only)
238                .await;
239
240            let batching_oracle = BatchingTimestampOracle::new(oracle_config.metrics(), oracle);
241
242            let oracle: Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync> =
243                Arc::new(batching_oracle);
244
245            global_timelines.insert(
246                timeline.clone(),
247                TimelineState {
248                    oracle,
249                    read_holds: ReadHolds::new(),
250                },
251            );
252        }
253        global_timelines.get_mut(timeline).expect("inserted above")
254    }
255
256    /// Given a [`Timeline`] and a [`CollectionIdBundle`], removes all of the "storage ids"
257    /// and "compute ids" in the bundle, from the timeline.
258    pub(crate) fn remove_resources_associated_with_timeline(
259        &mut self,
260        timeline: Timeline,
261        ids: CollectionIdBundle,
262    ) -> bool {
263        let TimelineState { read_holds, .. } = self
264            .global_timelines
265            .get_mut(&timeline)
266            .expect("all timeslines have a timestamp oracle");
267
268        // Remove all of the underlying resources.
269        for id in ids.storage_ids {
270            read_holds.remove_storage_collection(id);
271        }
272        for (compute_id, ids) in ids.compute_ids {
273            for id in ids {
274                read_holds.remove_compute_collection(compute_id, id);
275            }
276        }
277        let became_empty = read_holds.is_empty();
278
279        became_empty
280    }
281
282    pub(crate) fn remove_compute_ids_from_timeline<I>(&mut self, ids: I) -> Vec<Timeline>
283    where
284        I: IntoIterator<Item = (ComputeInstanceId, GlobalId)>,
285    {
286        let mut empty_timelines = BTreeSet::new();
287        for (compute_instance, id) in ids {
288            for (timeline, TimelineState { read_holds, .. }) in &mut self.global_timelines {
289                read_holds.remove_compute_collection(compute_instance, id);
290                if read_holds.is_empty() {
291                    empty_timelines.insert(timeline.clone());
292                }
293            }
294        }
295        empty_timelines.into_iter().collect()
296    }
297
298    #[instrument(level = "debug")]
299    pub(crate) async fn advance_timelines(&mut self) {
300        let global_timelines = std::mem::take(&mut self.global_timelines);
301        for (
302            timeline,
303            TimelineState {
304                oracle,
305                mut read_holds,
306            },
307        ) in global_timelines
308        {
309            // Timeline::EpochMilliseconds is advanced in group commits and doesn't need to be
310            // manually advanced here.
311            if timeline != Timeline::EpochMilliseconds && !self.read_only_controllers {
312                // For non realtime sources, we define now as the largest timestamp, not in
313                // advance of any object's upper. This is the largest timestamp that is closed
314                // to writes.
315                let id_bundle = self.catalog().ids_in_timeline(&timeline);
316
317                // Advance the timeline if-and-only-if there are objects in it.
318                // Otherwise we'd advance to the empty frontier, meaning we
319                // close it off for ever.
320                if !id_bundle.is_empty() {
321                    let least_valid_write = self.least_valid_write(&id_bundle);
322                    let now = Self::largest_not_in_advance_of_upper(&least_valid_write);
323                    oracle.apply_write(now).await;
324                    debug!(
325                        least_valid_write = ?least_valid_write,
326                        oracle_read_ts = ?oracle.read_ts().await,
327                        "advanced {:?} to {}",
328                        timeline,
329                        now,
330                    );
331                }
332            };
333            let read_ts = oracle.read_ts().await;
334            read_holds.downgrade(read_ts);
335            self.global_timelines
336                .insert(timeline, TimelineState { oracle, read_holds });
337        }
338    }
339}
340
341/// Convenience function for calculating the current upper bound that we want to
342/// prevent the global timestamp from exceeding.
343fn upper_bound(now: &mz_repr::Timestamp) -> mz_repr::Timestamp {
344    const TIMESTAMP_INTERVAL_MS: u64 = 5000;
345    const TIMESTAMP_INTERVAL_UPPER_BOUND: u64 = 2;
346
347    now.saturating_add(TIMESTAMP_INTERVAL_MS * TIMESTAMP_INTERVAL_UPPER_BOUND)
348}
349
350/// Return the set of ids in a timedomain and verify timeline correctness.
351///
352/// When a user starts a transaction, we need to prevent compaction of anything
353/// they might read from. We use a heuristic of "anything in the same database
354/// schemas with the same timeline as whatever the first query is".
355///
356/// This is a free-standing function that can be called from both the old peek sequencing
357/// and the new frontend peek sequencing.
358///
359/// This function assumes that uses_ids only includes such ids that are the latest versions of each
360/// object. This should be easy to satisfy when calling this function with the ids directly
361/// referenced by a new query, because a new query should not be able to refer to old versions of
362/// objects.
363pub(crate) fn timedomain_for<'a, I>(
364    catalog: &Catalog,
365    dataflow_builder: &DataflowBuilder,
366    uses_ids: I,
367    timeline_context: &TimelineContext,
368    conn_id: &ConnectionId,
369    compute_instance: ComputeInstanceId,
370) -> Result<CollectionIdBundle, AdapterError>
371where
372    I: IntoIterator<Item = &'a GlobalId>,
373{
374    // This is just for the assert below.
375    let mut orig_uses_ids = Vec::new();
376
377    // Gather all the used schemas.
378    let mut schemas = BTreeSet::new();
379    for id in uses_ids {
380        orig_uses_ids.push(id.clone());
381
382        let entry = catalog.get_entry_by_global_id(id);
383        let name = entry.name();
384        schemas.insert((name.qualifiers.database_spec, name.qualifiers.schema_spec));
385    }
386
387    let pg_catalog_schema = (
388        ResolvedDatabaseSpecifier::Ambient,
389        SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
390    );
391    let system_schemas: Vec<_> = catalog
392        .system_schema_ids()
393        .map(|id| (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)))
394        .collect();
395
396    if system_schemas.iter().any(|s| schemas.contains(s)) {
397        // If any of the system schemas is specified, add the rest of the
398        // system schemas.
399        schemas.extend(system_schemas);
400    } else if !schemas.is_empty() {
401        // Always include the pg_catalog schema, if schemas is non-empty. The pg_catalog schemas is
402        // sometimes used by applications in followup queries.
403        schemas.insert(pg_catalog_schema);
404    }
405
406    // Gather the IDs of all items in all used schemas.
407    let mut collection_ids: BTreeSet<GlobalId> = BTreeSet::new();
408    for (db, schema) in schemas {
409        let schema = catalog.get_schema(&db, &schema, conn_id);
410        // Note: We include just the latest `GlobalId` instead of all `GlobalId`s associated
411        // with an object, because older versions will already get included, if there are
412        // objects the depend on them.
413        let global_ids = schema
414            .items
415            .values()
416            .map(|item_id| catalog.get_entry(item_id).latest_global_id());
417        collection_ids.extend(global_ids);
418    }
419
420    {
421        // Assert that we got back a superset of the original ids.
422        // This should be true, because the query is able to directly reference only the latest
423        // version of each object.
424        for id in orig_uses_ids.iter() {
425            soft_assert_or_log!(
426                collection_ids.contains(id),
427                "timedomain_for is about to miss {}",
428                id
429            );
430        }
431    }
432
433    // Gather the dependencies of those items.
434    let mut id_bundle: CollectionIdBundle = dataflow_builder.sufficient_collections(collection_ids);
435
436    // Filter out ids from different timelines.
437    for ids in [
438        &mut id_bundle.storage_ids,
439        &mut id_bundle.compute_ids.entry(compute_instance).or_default(),
440    ] {
441        ids.retain(|gid| {
442            let id_timeline_context = catalog
443                .validate_timeline_context(vec![*gid])
444                .expect("single id should never fail");
445            match (&id_timeline_context, &timeline_context) {
446                // If this id doesn't have a timeline, we can keep it.
447                (
448                    TimelineContext::TimestampIndependent | TimelineContext::TimestampDependent,
449                    _,
450                ) => true,
451                // If there's no source timeline, we have the option to opt into a timeline,
452                // so optimistically choose epoch ms. This is useful when the first query in a
453                // transaction is on a static view.
454                (
455                    TimelineContext::TimelineDependent(id_timeline),
456                    TimelineContext::TimestampIndependent | TimelineContext::TimestampDependent,
457                ) => *id_timeline == Timeline::EpochMilliseconds,
458                // Otherwise check if timelines are the same.
459                (
460                    TimelineContext::TimelineDependent(id_timeline),
461                    TimelineContext::TimelineDependent(source_timeline),
462                ) => id_timeline == source_timeline,
463            }
464        });
465    }
466
467    Ok(id_bundle)
468}