Skip to main content

mz_adapter/coord/
timeline.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A mechanism to ensure that a sequence of writes and reads proceed correctly through timestamps.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::sync::Arc;
15
16use chrono::{DateTime, Utc};
17use futures::Future;
18use mz_adapter_types::connection::ConnectionId;
19use mz_compute_types::ComputeInstanceId;
20use mz_ore::now::{EpochMillis, NowFn, to_datetime};
21use mz_ore::{instrument, soft_assert_or_log};
22use mz_repr::{GlobalId, Timestamp};
23use mz_sql::names::{ResolvedDatabaseSpecifier, SchemaSpecifier};
24use mz_storage_types::sources::Timeline;
25use mz_timestamp_oracle::batching_oracle::BatchingTimestampOracle;
26use mz_timestamp_oracle::{self, TimestampOracle, TimestampOracleConfig, WriteTimestamp};
27use timely::progress::Timestamp as _;
28use tracing::{Instrument, debug, error, info};
29
30use crate::AdapterError;
31use crate::catalog::Catalog;
32use crate::coord::Coordinator;
33use crate::coord::id_bundle::CollectionIdBundle;
34use crate::coord::read_policy::ReadHolds;
35use crate::coord::timestamp_selection::TimestampProvider;
36use crate::optimize::dataflows::DataflowBuilder;
37
38/// An enum describing whether or not a query belongs to a timeline and whether the query can be
39/// affected by the timestamp at which it executes.
40#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash)]
41pub enum TimelineContext {
42    /// Can only ever belong to a single specific timeline. The answer will depend on a timestamp
43    /// chosen from that specific timeline.
44    TimelineDependent(Timeline),
45    /// Can belong to any timeline. The answer will depend on a timestamp chosen from some
46    /// timeline.
47    TimestampDependent,
48    /// The answer does not depend on a chosen timestamp.
49    TimestampIndependent,
50}
51
52impl TimelineContext {
53    /// Whether or not the context contains a timeline.
54    pub fn contains_timeline(&self) -> bool {
55        self.timeline().is_some()
56    }
57
58    /// The timeline belonging to this context, if one exists.
59    pub fn timeline(&self) -> Option<&Timeline> {
60        match self {
61            Self::TimelineDependent(timeline) => Some(timeline),
62            Self::TimestampIndependent | Self::TimestampDependent => None,
63        }
64    }
65}
66
67/// Global state for a single timeline.
68///
69/// For each timeline we maintain a timestamp oracle, which is responsible for
70/// providing read (and sometimes write) timestamps, and a set of read holds which
71/// guarantee that those read timestamps are valid.
72pub(crate) struct TimelineState {
73    pub(crate) oracle: Arc<dyn TimestampOracle<Timestamp> + Send + Sync>,
74    pub(crate) read_holds: ReadHolds,
75}
76
77impl fmt::Debug for TimelineState {
78    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79        f.debug_struct("TimelineState")
80            .field("read_holds", &self.read_holds)
81            .finish()
82    }
83}
84
85impl Coordinator {
86    pub(crate) fn now(&self) -> EpochMillis {
87        (self.catalog().config().now)()
88    }
89
90    pub(crate) fn now_datetime(&self) -> DateTime<Utc> {
91        to_datetime(self.now())
92    }
93
94    pub(crate) fn get_timestamp_oracle(
95        &self,
96        timeline: &Timeline,
97    ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync> {
98        let oracle = &self
99            .global_timelines
100            .get(timeline)
101            .expect("all timelines have a timestamp oracle")
102            .oracle;
103
104        Arc::clone(oracle)
105    }
106
107    /// Returns a [`TimestampOracle`] used for reads and writes from/to a local input.
108    pub(crate) fn get_local_timestamp_oracle(
109        &self,
110    ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync> {
111        self.get_timestamp_oracle(&Timeline::EpochMilliseconds)
112    }
113
114    /// Assign a timestamp for a read from a local input. Reads following writes
115    /// must be at a time >= the write's timestamp; we choose "equal to" for
116    /// simplicity's sake and to open as few new timestamps as possible.
117    pub(crate) async fn get_local_read_ts(&self) -> Timestamp {
118        self.get_local_timestamp_oracle().read_ts().await
119    }
120
121    /// Assign a timestamp for a write to a local input and increase the local ts.
122    /// Writes following reads must ensure that they are assigned a strictly larger
123    /// timestamp to ensure they are not visible to any real-time earlier reads.
124    #[instrument(name = "coord::get_local_write_ts")]
125    pub(crate) async fn get_local_write_ts(&mut self) -> WriteTimestamp {
126        self.global_timelines
127            .get_mut(&Timeline::EpochMilliseconds)
128            .expect("no realtime timeline")
129            .oracle
130            .write_ts()
131            .await
132    }
133
134    /// Peek the current timestamp used for operations on local inputs. Used to determine how much
135    /// to block group commits by.
136    pub(crate) async fn peek_local_write_ts(&self) -> Timestamp {
137        self.get_local_timestamp_oracle().peek_write_ts().await
138    }
139
140    /// Marks a write at `timestamp` as completed, using a [`TimestampOracle`].
141    pub(crate) fn apply_local_write(
142        &self,
143        timestamp: Timestamp,
144    ) -> impl Future<Output = ()> + Send + 'static {
145        let now = self.now().into();
146
147        let upper_bound = upper_bound(&now);
148        if timestamp > upper_bound {
149            error!(
150                %now,
151                "Setting local read timestamp to {timestamp}, which is more than \
152                the desired upper bound {upper_bound}."
153            );
154        }
155
156        let oracle = self.get_local_timestamp_oracle();
157
158        async move {
159            oracle
160                .apply_write(timestamp)
161                .instrument(tracing::debug_span!("apply_local_write_static", ?timestamp))
162                .await
163        }
164    }
165
166    /// Assign a timestamp for a write to the catalog. This timestamp should have the following
167    /// properties:
168    ///
169    ///   - Monotonically increasing.
170    ///   - Greater than or equal to the current catalog upper.
171    ///   - Greater than the largest write timestamp used in the
172    ///     [epoch millisecond timeline](Timeline::EpochMilliseconds).
173    ///
174    /// In general this is fully satisfied by the getting the current write timestamp in the
175    /// [epoch millisecond timeline](Timeline::EpochMilliseconds) from the timestamp oracle,
176    /// however, in read-only mode we cannot modify the timestamp oracle.
177    pub(crate) async fn get_catalog_write_ts(&mut self) -> Timestamp {
178        if self.read_only_controllers {
179            let (write_ts, upper) =
180                futures::future::join(self.peek_local_write_ts(), self.catalog().current_upper())
181                    .await;
182            std::cmp::max(write_ts, upper)
183        } else {
184            self.get_local_write_ts().await.timestamp
185        }
186    }
187
188    /// Ensures that a global timeline state exists for `timeline`.
189    pub(crate) async fn ensure_timeline_state<'a>(
190        &'a mut self,
191        timeline: &'a Timeline,
192    ) -> &'a mut TimelineState {
193        Self::ensure_timeline_state_with_initial_time(
194            timeline,
195            Timestamp::minimum(),
196            self.catalog().config().now.clone(),
197            self.timestamp_oracle_config.clone(),
198            &mut self.global_timelines,
199            self.read_only_controllers,
200        )
201        .await
202    }
203
204    /// Ensures that a global timeline state exists for `timeline`, with an initial time
205    /// of `initially`.
206    #[instrument]
207    pub(crate) async fn ensure_timeline_state_with_initial_time<'a>(
208        timeline: &'a Timeline,
209        initially: Timestamp,
210        now: NowFn,
211        oracle_config: Option<TimestampOracleConfig>,
212        global_timelines: &'a mut BTreeMap<Timeline, TimelineState>,
213        read_only: bool,
214    ) -> &'a mut TimelineState {
215        if !global_timelines.contains_key(timeline) {
216            info!("opening a new TimestampOracle for timeline {:?}", timeline,);
217
218            let now_fn = if timeline == &Timeline::EpochMilliseconds {
219                now
220            } else {
221                // Timelines that are not `EpochMilliseconds` don't have an
222                // "external" clock that wants to drive forward timestamps in
223                // addition to the rule that write timestamps must be strictly
224                // monotonically increasing.
225                //
226                // Passing in a clock that always yields the minimum takes the
227                // clock out of the equation and makes timestamps advance only
228                // by the rule about strict monotonicity mentioned above.
229                NowFn::from(|| Timestamp::minimum().into())
230            };
231
232            let oracle_config = oracle_config.expect(
233                "missing --timestamp-oracle-url even though the timestamp oracle was configured",
234            );
235
236            let oracle = oracle_config
237                .open(timeline.to_string(), initially, now_fn, read_only)
238                .await;
239
240            let batching_oracle = BatchingTimestampOracle::new(oracle_config.metrics(), oracle);
241
242            let oracle: Arc<dyn TimestampOracle<mz_repr::Timestamp> + Send + Sync> =
243                Arc::new(batching_oracle);
244
245            global_timelines.insert(
246                timeline.clone(),
247                TimelineState {
248                    oracle,
249                    read_holds: ReadHolds::new(),
250                },
251            );
252        }
253        global_timelines.get_mut(timeline).expect("inserted above")
254    }
255
256    /// Given a [`Timeline`] and a [`CollectionIdBundle`], removes all of the "storage ids"
257    /// and "compute ids" in the bundle, from the timeline.
258    pub(crate) fn remove_resources_associated_with_timeline(
259        &mut self,
260        timeline: Timeline,
261        ids: CollectionIdBundle,
262    ) -> bool {
263        let TimelineState { read_holds, .. } = self
264            .global_timelines
265            .get_mut(&timeline)
266            .expect("all timeslines have a timestamp oracle");
267
268        // Remove all of the underlying resources.
269        for id in ids.storage_ids {
270            read_holds.remove_storage_collection(id);
271        }
272        for (compute_id, ids) in ids.compute_ids {
273            for id in ids {
274                read_holds.remove_compute_collection(compute_id, id);
275            }
276        }
277        let became_empty = read_holds.is_empty();
278
279        became_empty
280    }
281
282    #[instrument(level = "debug")]
283    pub(crate) async fn advance_timelines(&mut self) {
284        let global_timelines = std::mem::take(&mut self.global_timelines);
285        for (
286            timeline,
287            TimelineState {
288                oracle,
289                mut read_holds,
290            },
291        ) in global_timelines
292        {
293            // Timeline::EpochMilliseconds is advanced in group commits and doesn't need to be
294            // manually advanced here.
295            if timeline != Timeline::EpochMilliseconds && !self.read_only_controllers {
296                // For non realtime sources, we define now as the largest timestamp, not in
297                // advance of any object's upper. This is the largest timestamp that is closed
298                // to writes.
299                let id_bundle = self.catalog().ids_in_timeline(&timeline);
300
301                // Advance the timeline if-and-only-if there are objects in it.
302                // Otherwise we'd advance to the empty frontier, meaning we
303                // close it off for ever.
304                if !id_bundle.is_empty() {
305                    let least_valid_write = self.least_valid_write(&id_bundle);
306                    let now = Self::largest_not_in_advance_of_upper(&least_valid_write);
307                    oracle.apply_write(now).await;
308                    debug!(
309                        least_valid_write = ?least_valid_write,
310                        oracle_read_ts = ?oracle.read_ts().await,
311                        "advanced {:?} to {}",
312                        timeline,
313                        now,
314                    );
315                }
316            };
317            let read_ts = oracle.read_ts().await;
318            read_holds.downgrade(read_ts);
319            self.global_timelines
320                .insert(timeline, TimelineState { oracle, read_holds });
321        }
322    }
323}
324
325/// Convenience function for calculating the current upper bound that we want to
326/// prevent the global timestamp from exceeding.
327fn upper_bound(now: &mz_repr::Timestamp) -> mz_repr::Timestamp {
328    const TIMESTAMP_INTERVAL_MS: u64 = 5000;
329    const TIMESTAMP_INTERVAL_UPPER_BOUND: u64 = 2;
330
331    now.saturating_add(TIMESTAMP_INTERVAL_MS * TIMESTAMP_INTERVAL_UPPER_BOUND)
332}
333
334/// Return the set of ids in a timedomain and verify timeline correctness.
335///
336/// When a user starts a transaction, we need to prevent compaction of anything
337/// they might read from. We use a heuristic of "anything in the same database
338/// schemas with the same timeline as whatever the first query is".
339///
340/// This is a free-standing function that can be called from both the old peek sequencing
341/// and the new frontend peek sequencing.
342///
343/// This function assumes that uses_ids only includes such ids that are the latest versions of each
344/// object. This should be easy to satisfy when calling this function with the ids directly
345/// referenced by a new query, because a new query should not be able to refer to old versions of
346/// objects.
347pub(crate) fn timedomain_for<'a, I>(
348    catalog: &Catalog,
349    dataflow_builder: &DataflowBuilder,
350    uses_ids: I,
351    timeline_context: &TimelineContext,
352    conn_id: &ConnectionId,
353    compute_instance: ComputeInstanceId,
354) -> Result<CollectionIdBundle, AdapterError>
355where
356    I: IntoIterator<Item = &'a GlobalId>,
357{
358    // This is just for the assert below.
359    let mut orig_uses_ids = Vec::new();
360
361    // Gather all the used schemas.
362    let mut schemas = BTreeSet::new();
363    for id in uses_ids {
364        orig_uses_ids.push(id.clone());
365
366        let entry = catalog.get_entry_by_global_id(id);
367        let name = entry.name();
368        schemas.insert((name.qualifiers.database_spec, name.qualifiers.schema_spec));
369    }
370
371    let pg_catalog_schema = (
372        ResolvedDatabaseSpecifier::Ambient,
373        SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
374    );
375    let system_schemas: Vec<_> = catalog
376        .system_schema_ids()
377        .map(|id| (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)))
378        .collect();
379
380    if system_schemas.iter().any(|s| schemas.contains(s)) {
381        // If any of the system schemas is specified, add the rest of the
382        // system schemas.
383        schemas.extend(system_schemas);
384    } else if !schemas.is_empty() {
385        // Always include the pg_catalog schema, if schemas is non-empty. The pg_catalog schemas is
386        // sometimes used by applications in followup queries.
387        schemas.insert(pg_catalog_schema);
388    }
389
390    // Gather the IDs of all items in all used schemas.
391    let mut collection_ids: BTreeSet<GlobalId> = BTreeSet::new();
392    for (db, schema) in schemas {
393        let schema = catalog.get_schema(&db, &schema, conn_id);
394        // Note: We include just the latest `GlobalId` instead of all `GlobalId`s associated
395        // with an object, because older versions will already get included, if there are
396        // objects the depend on them.
397        let global_ids = schema
398            .items
399            .values()
400            .map(|item_id| catalog.get_entry(item_id).latest_global_id());
401        collection_ids.extend(global_ids);
402    }
403
404    {
405        // Assert that we got back a superset of the original ids.
406        // This should be true, because the query is able to directly reference only the latest
407        // version of each object.
408        for id in orig_uses_ids.iter() {
409            soft_assert_or_log!(
410                collection_ids.contains(id),
411                "timedomain_for is about to miss {}",
412                id
413            );
414        }
415    }
416
417    // Gather the dependencies of those items.
418    let mut id_bundle: CollectionIdBundle = dataflow_builder.sufficient_collections(collection_ids);
419
420    // Filter out ids from different timelines.
421    for ids in [
422        &mut id_bundle.storage_ids,
423        &mut id_bundle.compute_ids.entry(compute_instance).or_default(),
424    ] {
425        ids.retain(|gid| {
426            let id_timeline_context = catalog
427                .validate_timeline_context(vec![*gid])
428                .expect("single id should never fail");
429            match (&id_timeline_context, &timeline_context) {
430                // If this id doesn't have a timeline, we can keep it.
431                (
432                    TimelineContext::TimestampIndependent | TimelineContext::TimestampDependent,
433                    _,
434                ) => true,
435                // If there's no source timeline, we have the option to opt into a timeline,
436                // so optimistically choose epoch ms. This is useful when the first query in a
437                // transaction is on a static view.
438                (
439                    TimelineContext::TimelineDependent(id_timeline),
440                    TimelineContext::TimestampIndependent | TimelineContext::TimestampDependent,
441                ) => *id_timeline == Timeline::EpochMilliseconds,
442                // Otherwise check if timelines are the same.
443                (
444                    TimelineContext::TimelineDependent(id_timeline),
445                    TimelineContext::TimelineDependent(source_timeline),
446                ) => id_timeline == source_timeline,
447            }
448        });
449    }
450
451    Ok(id_bundle)
452}