mz_adapter/catalog/
timeline.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to timelines.
11
12use std::collections::{BTreeMap, BTreeSet};
13
14use itertools::Itertools;
15use mz_catalog::memory::objects::{CatalogItem, ContinualTask, MaterializedView, View};
16use mz_expr::CollectionPlan;
17use mz_ore::collections::CollectionExt;
18use mz_repr::{CatalogItemId, GlobalId};
19use mz_storage_types::sources::Timeline;
20
21use crate::catalog::Catalog;
22use crate::{AdapterError, CollectionIdBundle, TimelineContext};
23
24impl Catalog {
25    /// Return the [`TimelineContext`] belonging to a [`CatalogItemId`], if one exists.
26    pub(crate) fn get_timeline_context(&self, id: CatalogItemId) -> TimelineContext {
27        let entry = self.get_entry(&id);
28        self.validate_timeline_context(entry.global_ids())
29            .expect("impossible for a single object to belong to incompatible timeline contexts")
30    }
31
32    /// Return the [`TimelineContext`] belonging to a [`GlobalId`], if one exists.
33    pub(crate) fn get_timeline_context_for_global_id(&self, id: GlobalId) -> TimelineContext {
34        self.validate_timeline_context(vec![id])
35            .expect("impossible for a single object to belong to incompatible timeline contexts")
36    }
37
38    /// Returns an iterator that partitions an id bundle by the [`TimelineContext`] that each id
39    /// belongs to.
40    pub fn partition_ids_by_timeline_context(
41        &self,
42        id_bundle: &CollectionIdBundle,
43    ) -> impl Iterator<Item = (TimelineContext, CollectionIdBundle)> + use<> {
44        let mut res: BTreeMap<TimelineContext, CollectionIdBundle> = BTreeMap::new();
45
46        for gid in &id_bundle.storage_ids {
47            let timeline_context = self.get_timeline_context_for_global_id(*gid);
48            res.entry(timeline_context)
49                .or_default()
50                .storage_ids
51                .insert(*gid);
52        }
53
54        for (compute_instance, ids) in &id_bundle.compute_ids {
55            for gid in ids {
56                let timeline_context = self.get_timeline_context_for_global_id(*gid);
57                res.entry(timeline_context)
58                    .or_default()
59                    .compute_ids
60                    .entry(*compute_instance)
61                    .or_default()
62                    .insert(*gid);
63            }
64        }
65
66        res.into_iter()
67    }
68
69    /// Returns an id bundle containing all the ids in the give timeline.
70    pub(crate) fn ids_in_timeline(&self, timeline: &Timeline) -> CollectionIdBundle {
71        let mut id_bundle = CollectionIdBundle::default();
72        for entry in self.entries() {
73            if let TimelineContext::TimelineDependent(entry_timeline) =
74                self.get_timeline_context(entry.id())
75            {
76                if timeline == &entry_timeline {
77                    match entry.item() {
78                        CatalogItem::Table(table) => {
79                            id_bundle.storage_ids.extend(table.global_ids());
80                        }
81                        CatalogItem::Source(source) => {
82                            id_bundle.storage_ids.insert(source.global_id());
83                        }
84                        CatalogItem::MaterializedView(mv) => {
85                            id_bundle.storage_ids.insert(mv.global_id());
86                        }
87                        CatalogItem::ContinualTask(ct) => {
88                            id_bundle.storage_ids.insert(ct.global_id());
89                        }
90                        CatalogItem::Index(index) => {
91                            id_bundle
92                                .compute_ids
93                                .entry(index.cluster_id)
94                                .or_default()
95                                .insert(index.global_id());
96                        }
97                        CatalogItem::View(_)
98                        | CatalogItem::Sink(_)
99                        | CatalogItem::Type(_)
100                        | CatalogItem::Func(_)
101                        | CatalogItem::Secret(_)
102                        | CatalogItem::Connection(_)
103                        | CatalogItem::Log(_) => {}
104                    }
105                }
106            }
107        }
108        id_bundle
109    }
110
111    /// Return an error if the ids are from incompatible [`TimelineContext`]s. This should
112    /// be used to prevent users from doing things that are either meaningless
113    /// (joining data from timelines that have similar numbers with different
114    /// meanings like two separate debezium topics) or will never complete (joining
115    /// cdcv2 and realtime data).
116    pub(crate) fn validate_timeline_context<I>(
117        &self,
118        ids: I,
119    ) -> Result<TimelineContext, AdapterError>
120    where
121        I: IntoIterator<Item = GlobalId>,
122    {
123        let items_ids = ids
124            .into_iter()
125            .filter_map(|gid| self.try_resolve_item_id(&gid));
126        let mut timeline_contexts: Vec<_> =
127            self.get_timeline_contexts(items_ids).into_iter().collect();
128        // If there's more than one timeline, we will not produce meaningful
129        // data to a user. Take, for example, some realtime source and a debezium
130        // consistency topic source. The realtime source uses something close to now
131        // for its timestamps. The debezium source starts at 1 and increments per
132        // transaction. We don't want to choose some timestamp that is valid for both
133        // of these because the debezium source will never get to the same value as the
134        // realtime source's "milliseconds since Unix epoch" value. And even if it did,
135        // it's not meaningful to join just because those two numbers happen to be the
136        // same now.
137        //
138        // Another example: assume two separate debezium consistency topics. Both
139        // start counting at 1 and thus have similarish numbers that probably overlap
140        // a lot. However it's still not meaningful to join those two at a specific
141        // transaction counter number because those counters are unrelated to the
142        // other.
143        let timelines: Vec<_> = timeline_contexts
144            .extract_if(.., |timeline_context| timeline_context.contains_timeline())
145            .collect();
146
147        // A single or group of objects may contain multiple compatible timeline
148        // contexts. For example `SELECT *, 1, mz_now() FROM t` will contain all
149        // types of contexts. We choose the strongest context level to return back.
150        if timelines.len() > 1 {
151            Err(AdapterError::Unsupported(
152                "multiple timelines within one dataflow",
153            ))
154        } else if timelines.len() == 1 {
155            Ok(timelines.into_element())
156        } else if timeline_contexts
157            .iter()
158            .contains(&TimelineContext::TimestampDependent)
159        {
160            Ok(TimelineContext::TimestampDependent)
161        } else {
162            Ok(TimelineContext::TimestampIndependent)
163        }
164    }
165
166    /// Return the [`TimelineContext`]s belonging to a list of [`CatalogItemId`]s, if any exist.
167    fn get_timeline_contexts<I>(&self, ids: I) -> BTreeSet<TimelineContext>
168    where
169        I: IntoIterator<Item = CatalogItemId>,
170    {
171        let mut seen: BTreeSet<CatalogItemId> = BTreeSet::new();
172        let mut timelines: BTreeSet<TimelineContext> = BTreeSet::new();
173
174        // Recurse through IDs to find all sources and tables, adding new ones to
175        // the set until we reach the bottom.
176        let mut ids: Vec<_> = ids.into_iter().collect();
177        while let Some(id) = ids.pop() {
178            // Protect against possible infinite recursion. Not sure if it's possible, but
179            // a cheap prevention for the future.
180            if !seen.insert(id) {
181                continue;
182            }
183            if let Some(entry) = self.try_get_entry(&id) {
184                match entry.item() {
185                    CatalogItem::Source(source) => {
186                        timelines
187                            .insert(TimelineContext::TimelineDependent(source.timeline.clone()));
188                    }
189                    CatalogItem::Index(index) => {
190                        let on_id = self.resolve_item_id(&index.on);
191                        ids.push(on_id);
192                    }
193                    CatalogItem::View(View { optimized_expr, .. }) => {
194                        // If the definition contains a temporal function, the timeline must
195                        // be timestamp dependent.
196                        if optimized_expr.contains_temporal() {
197                            timelines.insert(TimelineContext::TimestampDependent);
198                        } else {
199                            timelines.insert(TimelineContext::TimestampIndependent);
200                        }
201                        let item_ids = optimized_expr
202                            .depends_on()
203                            .into_iter()
204                            .map(|gid| self.resolve_item_id(&gid));
205                        ids.extend(item_ids);
206                    }
207                    CatalogItem::MaterializedView(MaterializedView { optimized_expr, .. }) => {
208                        // In some cases the timestamp selected may not affect the answer to a
209                        // query, but it may affect our ability to query the materialized view.
210                        // Materialized views must durably materialize the result of a query, even
211                        // for constant queries. If we choose a timestamp larger than the upper,
212                        // which represents the current progress of the view, then the query will
213                        // need to block and wait for the materialized view to advance.
214                        timelines.insert(TimelineContext::TimestampDependent);
215                        let item_ids = optimized_expr
216                            .depends_on()
217                            .into_iter()
218                            .map(|gid| self.resolve_item_id(&gid));
219                        ids.extend(item_ids);
220                    }
221                    CatalogItem::ContinualTask(ContinualTask { raw_expr, .. }) => {
222                        // See comment in MaterializedView
223                        timelines.insert(TimelineContext::TimestampDependent);
224                        let item_ids = raw_expr
225                            .depends_on()
226                            .into_iter()
227                            .map(|gid| self.resolve_item_id(&gid));
228                        ids.extend(item_ids);
229                    }
230                    CatalogItem::Table(table) => {
231                        timelines.insert(TimelineContext::TimelineDependent(table.timeline()));
232                    }
233                    CatalogItem::Log(_) => {
234                        timelines.insert(TimelineContext::TimelineDependent(
235                            Timeline::EpochMilliseconds,
236                        ));
237                    }
238                    CatalogItem::Sink(_)
239                    | CatalogItem::Type(_)
240                    | CatalogItem::Func(_)
241                    | CatalogItem::Secret(_)
242                    | CatalogItem::Connection(_) => {}
243                }
244            }
245        }
246
247        timelines
248    }
249}