Skip to main content

mz_adapter/catalog/
timeline.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to timelines.
11
12use std::collections::{BTreeMap, BTreeSet};
13
14use itertools::Itertools;
15use mz_catalog::memory::objects::{CatalogItem, MaterializedView, View};
16use mz_expr::CollectionPlan;
17use mz_ore::collections::CollectionExt;
18use mz_repr::{CatalogItemId, GlobalId};
19use mz_storage_types::sources::Timeline;
20
21use crate::catalog::Catalog;
22use crate::{AdapterError, CollectionIdBundle, TimelineContext};
23
24impl Catalog {
25    /// Return the [`TimelineContext`] belonging to a [`CatalogItemId`], if one exists.
26    pub(crate) fn get_timeline_context(&self, id: CatalogItemId) -> TimelineContext {
27        let entry = self.get_entry(&id);
28        self.validate_timeline_context(entry.global_ids())
29            .expect("impossible for a single object to belong to incompatible timeline contexts")
30    }
31
32    /// Return the [`TimelineContext`] belonging to a [`GlobalId`], if one exists.
33    pub(crate) fn get_timeline_context_for_global_id(&self, id: GlobalId) -> TimelineContext {
34        self.validate_timeline_context(vec![id])
35            .expect("impossible for a single object to belong to incompatible timeline contexts")
36    }
37
38    /// Returns an iterator that partitions an id bundle by the [`TimelineContext`] that each id
39    /// belongs to.
40    pub fn partition_ids_by_timeline_context(
41        &self,
42        id_bundle: &CollectionIdBundle,
43    ) -> impl Iterator<Item = (TimelineContext, CollectionIdBundle)> + use<> {
44        let mut res: BTreeMap<TimelineContext, CollectionIdBundle> = BTreeMap::new();
45
46        for gid in &id_bundle.storage_ids {
47            let timeline_context = self.get_timeline_context_for_global_id(*gid);
48            res.entry(timeline_context)
49                .or_default()
50                .storage_ids
51                .insert(*gid);
52        }
53
54        for (compute_instance, ids) in &id_bundle.compute_ids {
55            for gid in ids {
56                let timeline_context = self.get_timeline_context_for_global_id(*gid);
57                res.entry(timeline_context)
58                    .or_default()
59                    .compute_ids
60                    .entry(*compute_instance)
61                    .or_default()
62                    .insert(*gid);
63            }
64        }
65
66        res.into_iter()
67    }
68
69    /// Returns an id bundle containing all the ids in the give timeline.
70    pub(crate) fn ids_in_timeline(&self, timeline: &Timeline) -> CollectionIdBundle {
71        let mut id_bundle = CollectionIdBundle::default();
72        for entry in self.entries() {
73            if let TimelineContext::TimelineDependent(entry_timeline) =
74                self.get_timeline_context(entry.id())
75            {
76                if timeline == &entry_timeline {
77                    match entry.item() {
78                        CatalogItem::Table(table) => {
79                            id_bundle.storage_ids.extend(table.global_ids());
80                        }
81                        CatalogItem::Source(source) => {
82                            id_bundle.storage_ids.insert(source.global_id());
83                        }
84                        CatalogItem::MaterializedView(mv) => {
85                            id_bundle.storage_ids.insert(mv.global_id_writes());
86                        }
87                        CatalogItem::Index(index) => {
88                            id_bundle
89                                .compute_ids
90                                .entry(index.cluster_id)
91                                .or_default()
92                                .insert(index.global_id());
93                        }
94                        CatalogItem::View(_)
95                        | CatalogItem::Sink(_)
96                        | CatalogItem::Type(_)
97                        | CatalogItem::Func(_)
98                        | CatalogItem::Secret(_)
99                        | CatalogItem::Connection(_)
100                        | CatalogItem::Log(_) => {}
101                    }
102                }
103            }
104        }
105        id_bundle
106    }
107
108    /// Return an error if the ids are from incompatible [`TimelineContext`]s. This should
109    /// be used to prevent users from doing things that are either meaningless
110    /// (joining data from timelines that have similar numbers with different
111    /// meanings like two separate debezium topics) or will never complete (joining
112    /// cdcv2 and realtime data).
113    pub(crate) fn validate_timeline_context<I>(
114        &self,
115        ids: I,
116    ) -> Result<TimelineContext, AdapterError>
117    where
118        I: IntoIterator<Item = GlobalId>,
119    {
120        let items_ids = ids
121            .into_iter()
122            .filter_map(|gid| self.try_resolve_item_id(&gid));
123        let mut timeline_contexts: Vec<_> =
124            self.get_timeline_contexts(items_ids).into_iter().collect();
125        // If there's more than one timeline, we will not produce meaningful
126        // data to a user. Take, for example, some realtime source and a debezium
127        // consistency topic source. The realtime source uses something close to now
128        // for its timestamps. The debezium source starts at 1 and increments per
129        // transaction. We don't want to choose some timestamp that is valid for both
130        // of these because the debezium source will never get to the same value as the
131        // realtime source's "milliseconds since Unix epoch" value. And even if it did,
132        // it's not meaningful to join just because those two numbers happen to be the
133        // same now.
134        //
135        // Another example: assume two separate debezium consistency topics. Both
136        // start counting at 1 and thus have similarish numbers that probably overlap
137        // a lot. However it's still not meaningful to join those two at a specific
138        // transaction counter number because those counters are unrelated to the
139        // other.
140        let timelines: Vec<_> = timeline_contexts
141            .extract_if(.., |timeline_context| timeline_context.contains_timeline())
142            .collect();
143
144        // A single or group of objects may contain multiple compatible timeline
145        // contexts. For example `SELECT *, 1, mz_now() FROM t` will contain all
146        // types of contexts. We choose the strongest context level to return back.
147        if timelines.len() > 1 {
148            Err(AdapterError::Unsupported(
149                "multiple timelines within one dataflow",
150            ))
151        } else if timelines.len() == 1 {
152            Ok(timelines.into_element())
153        } else if timeline_contexts
154            .iter()
155            .contains(&TimelineContext::TimestampDependent)
156        {
157            Ok(TimelineContext::TimestampDependent)
158        } else {
159            Ok(TimelineContext::TimestampIndependent)
160        }
161    }
162
163    /// Return the [`TimelineContext`]s belonging to a list of [`CatalogItemId`]s, if any exist.
164    fn get_timeline_contexts<I>(&self, ids: I) -> BTreeSet<TimelineContext>
165    where
166        I: IntoIterator<Item = CatalogItemId>,
167    {
168        let mut seen: BTreeSet<CatalogItemId> = BTreeSet::new();
169        let mut timelines: BTreeSet<TimelineContext> = BTreeSet::new();
170
171        // Recurse through IDs to find all sources and tables, adding new ones to
172        // the set until we reach the bottom.
173        let mut ids: Vec<_> = ids.into_iter().collect();
174        while let Some(id) = ids.pop() {
175            // Protect against possible infinite recursion. Not sure if it's possible, but
176            // a cheap prevention for the future.
177            if !seen.insert(id) {
178                continue;
179            }
180            if let Some(entry) = self.try_get_entry(&id) {
181                match entry.item() {
182                    CatalogItem::Source(source) => {
183                        timelines
184                            .insert(TimelineContext::TimelineDependent(source.timeline.clone()));
185                    }
186                    CatalogItem::Index(index) => {
187                        let on_id = self.resolve_item_id(&index.on);
188                        ids.push(on_id);
189                    }
190                    CatalogItem::View(View {
191                        locally_optimized_expr: optimized_expr,
192                        ..
193                    }) => {
194                        // If the definition contains a temporal function, the timeline must
195                        // be timestamp dependent.
196                        if optimized_expr.contains_temporal() {
197                            timelines.insert(TimelineContext::TimestampDependent);
198                        } else {
199                            timelines.insert(TimelineContext::TimestampIndependent);
200                        }
201                        let item_ids = optimized_expr
202                            .depends_on()
203                            .into_iter()
204                            .map(|gid| self.resolve_item_id(&gid));
205                        ids.extend(item_ids);
206                    }
207                    CatalogItem::MaterializedView(MaterializedView {
208                        locally_optimized_expr: optimized_expr,
209                        ..
210                    }) => {
211                        // In some cases the timestamp selected may not affect the answer to a
212                        // query, but it may affect our ability to query the materialized view.
213                        // Materialized views must durably materialize the result of a query, even
214                        // for constant queries. If we choose a timestamp larger than the upper,
215                        // which represents the current progress of the view, then the query will
216                        // need to block and wait for the materialized view to advance.
217                        timelines.insert(TimelineContext::TimestampDependent);
218                        let item_ids = optimized_expr
219                            .depends_on()
220                            .into_iter()
221                            .map(|gid| self.resolve_item_id(&gid));
222                        ids.extend(item_ids);
223                    }
224                    CatalogItem::Table(table) => {
225                        timelines.insert(TimelineContext::TimelineDependent(table.timeline()));
226                    }
227                    CatalogItem::Log(_) => {
228                        timelines.insert(TimelineContext::TimelineDependent(
229                            Timeline::EpochMilliseconds,
230                        ));
231                    }
232                    CatalogItem::Sink(_)
233                    | CatalogItem::Type(_)
234                    | CatalogItem::Func(_)
235                    | CatalogItem::Secret(_)
236                    | CatalogItem::Connection(_) => {}
237                }
238            }
239        }
240
241        timelines
242    }
243}