mz_adapter/catalog/timeline.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to timelines.
11
12use std::collections::{BTreeMap, BTreeSet};
13
14use itertools::Itertools;
15use mz_catalog::memory::objects::{CatalogItem, ContinualTask, MaterializedView, View};
16use mz_expr::CollectionPlan;
17use mz_ore::collections::CollectionExt;
18use mz_repr::{CatalogItemId, GlobalId};
19use mz_storage_types::sources::Timeline;
20
21use crate::catalog::Catalog;
22use crate::{AdapterError, CollectionIdBundle, TimelineContext};
23
24impl Catalog {
25 /// Return the [`TimelineContext`] belonging to a [`CatalogItemId`], if one exists.
26 pub(crate) fn get_timeline_context(&self, id: CatalogItemId) -> TimelineContext {
27 let entry = self.get_entry(&id);
28 self.validate_timeline_context(entry.global_ids())
29 .expect("impossible for a single object to belong to incompatible timeline contexts")
30 }
31
32 /// Return the [`TimelineContext`] belonging to a [`GlobalId`], if one exists.
33 pub(crate) fn get_timeline_context_for_global_id(&self, id: GlobalId) -> TimelineContext {
34 self.validate_timeline_context(vec![id])
35 .expect("impossible for a single object to belong to incompatible timeline contexts")
36 }
37
38 /// Returns an iterator that partitions an id bundle by the [`TimelineContext`] that each id
39 /// belongs to.
40 pub fn partition_ids_by_timeline_context(
41 &self,
42 id_bundle: &CollectionIdBundle,
43 ) -> impl Iterator<Item = (TimelineContext, CollectionIdBundle)> + use<> {
44 let mut res: BTreeMap<TimelineContext, CollectionIdBundle> = BTreeMap::new();
45
46 for gid in &id_bundle.storage_ids {
47 let timeline_context = self.get_timeline_context_for_global_id(*gid);
48 res.entry(timeline_context)
49 .or_default()
50 .storage_ids
51 .insert(*gid);
52 }
53
54 for (compute_instance, ids) in &id_bundle.compute_ids {
55 for gid in ids {
56 let timeline_context = self.get_timeline_context_for_global_id(*gid);
57 res.entry(timeline_context)
58 .or_default()
59 .compute_ids
60 .entry(*compute_instance)
61 .or_default()
62 .insert(*gid);
63 }
64 }
65
66 res.into_iter()
67 }
68
69 /// Returns an id bundle containing all the ids in the give timeline.
70 pub(crate) fn ids_in_timeline(&self, timeline: &Timeline) -> CollectionIdBundle {
71 let mut id_bundle = CollectionIdBundle::default();
72 for entry in self.entries() {
73 if let TimelineContext::TimelineDependent(entry_timeline) =
74 self.get_timeline_context(entry.id())
75 {
76 if timeline == &entry_timeline {
77 match entry.item() {
78 CatalogItem::Table(table) => {
79 id_bundle.storage_ids.extend(table.global_ids());
80 }
81 CatalogItem::Source(source) => {
82 id_bundle.storage_ids.insert(source.global_id());
83 }
84 CatalogItem::MaterializedView(mv) => {
85 id_bundle.storage_ids.insert(mv.global_id());
86 }
87 CatalogItem::ContinualTask(ct) => {
88 id_bundle.storage_ids.insert(ct.global_id());
89 }
90 CatalogItem::Index(index) => {
91 id_bundle
92 .compute_ids
93 .entry(index.cluster_id)
94 .or_default()
95 .insert(index.global_id());
96 }
97 CatalogItem::View(_)
98 | CatalogItem::Sink(_)
99 | CatalogItem::Type(_)
100 | CatalogItem::Func(_)
101 | CatalogItem::Secret(_)
102 | CatalogItem::Connection(_)
103 | CatalogItem::Log(_) => {}
104 }
105 }
106 }
107 }
108 id_bundle
109 }
110
111 /// Return an error if the ids are from incompatible [`TimelineContext`]s. This should
112 /// be used to prevent users from doing things that are either meaningless
113 /// (joining data from timelines that have similar numbers with different
114 /// meanings like two separate debezium topics) or will never complete (joining
115 /// cdcv2 and realtime data).
116 pub(crate) fn validate_timeline_context<I>(
117 &self,
118 ids: I,
119 ) -> Result<TimelineContext, AdapterError>
120 where
121 I: IntoIterator<Item = GlobalId>,
122 {
123 let items_ids = ids
124 .into_iter()
125 .filter_map(|gid| self.try_resolve_item_id(&gid));
126 let mut timeline_contexts: Vec<_> =
127 self.get_timeline_contexts(items_ids).into_iter().collect();
128 // If there's more than one timeline, we will not produce meaningful
129 // data to a user. Take, for example, some realtime source and a debezium
130 // consistency topic source. The realtime source uses something close to now
131 // for its timestamps. The debezium source starts at 1 and increments per
132 // transaction. We don't want to choose some timestamp that is valid for both
133 // of these because the debezium source will never get to the same value as the
134 // realtime source's "milliseconds since Unix epoch" value. And even if it did,
135 // it's not meaningful to join just because those two numbers happen to be the
136 // same now.
137 //
138 // Another example: assume two separate debezium consistency topics. Both
139 // start counting at 1 and thus have similarish numbers that probably overlap
140 // a lot. However it's still not meaningful to join those two at a specific
141 // transaction counter number because those counters are unrelated to the
142 // other.
143 let timelines: Vec<_> = timeline_contexts
144 .extract_if(.., |timeline_context| timeline_context.contains_timeline())
145 .collect();
146
147 // A single or group of objects may contain multiple compatible timeline
148 // contexts. For example `SELECT *, 1, mz_now() FROM t` will contain all
149 // types of contexts. We choose the strongest context level to return back.
150 if timelines.len() > 1 {
151 Err(AdapterError::Unsupported(
152 "multiple timelines within one dataflow",
153 ))
154 } else if timelines.len() == 1 {
155 Ok(timelines.into_element())
156 } else if timeline_contexts
157 .iter()
158 .contains(&TimelineContext::TimestampDependent)
159 {
160 Ok(TimelineContext::TimestampDependent)
161 } else {
162 Ok(TimelineContext::TimestampIndependent)
163 }
164 }
165
166 /// Return the [`TimelineContext`]s belonging to a list of [`CatalogItemId`]s, if any exist.
167 fn get_timeline_contexts<I>(&self, ids: I) -> BTreeSet<TimelineContext>
168 where
169 I: IntoIterator<Item = CatalogItemId>,
170 {
171 let mut seen: BTreeSet<CatalogItemId> = BTreeSet::new();
172 let mut timelines: BTreeSet<TimelineContext> = BTreeSet::new();
173
174 // Recurse through IDs to find all sources and tables, adding new ones to
175 // the set until we reach the bottom.
176 let mut ids: Vec<_> = ids.into_iter().collect();
177 while let Some(id) = ids.pop() {
178 // Protect against possible infinite recursion. Not sure if it's possible, but
179 // a cheap prevention for the future.
180 if !seen.insert(id) {
181 continue;
182 }
183 if let Some(entry) = self.try_get_entry(&id) {
184 match entry.item() {
185 CatalogItem::Source(source) => {
186 timelines
187 .insert(TimelineContext::TimelineDependent(source.timeline.clone()));
188 }
189 CatalogItem::Index(index) => {
190 let on_id = self.resolve_item_id(&index.on);
191 ids.push(on_id);
192 }
193 CatalogItem::View(View { optimized_expr, .. }) => {
194 // If the definition contains a temporal function, the timeline must
195 // be timestamp dependent.
196 if optimized_expr.contains_temporal() {
197 timelines.insert(TimelineContext::TimestampDependent);
198 } else {
199 timelines.insert(TimelineContext::TimestampIndependent);
200 }
201 let item_ids = optimized_expr
202 .depends_on()
203 .into_iter()
204 .map(|gid| self.resolve_item_id(&gid));
205 ids.extend(item_ids);
206 }
207 CatalogItem::MaterializedView(MaterializedView { optimized_expr, .. }) => {
208 // In some cases the timestamp selected may not affect the answer to a
209 // query, but it may affect our ability to query the materialized view.
210 // Materialized views must durably materialize the result of a query, even
211 // for constant queries. If we choose a timestamp larger than the upper,
212 // which represents the current progress of the view, then the query will
213 // need to block and wait for the materialized view to advance.
214 timelines.insert(TimelineContext::TimestampDependent);
215 let item_ids = optimized_expr
216 .depends_on()
217 .into_iter()
218 .map(|gid| self.resolve_item_id(&gid));
219 ids.extend(item_ids);
220 }
221 CatalogItem::ContinualTask(ContinualTask { raw_expr, .. }) => {
222 // See comment in MaterializedView
223 timelines.insert(TimelineContext::TimestampDependent);
224 let item_ids = raw_expr
225 .depends_on()
226 .into_iter()
227 .map(|gid| self.resolve_item_id(&gid));
228 ids.extend(item_ids);
229 }
230 CatalogItem::Table(table) => {
231 timelines.insert(TimelineContext::TimelineDependent(table.timeline()));
232 }
233 CatalogItem::Log(_) => {
234 timelines.insert(TimelineContext::TimelineDependent(
235 Timeline::EpochMilliseconds,
236 ));
237 }
238 CatalogItem::Sink(_)
239 | CatalogItem::Type(_)
240 | CatalogItem::Func(_)
241 | CatalogItem::Secret(_)
242 | CatalogItem::Connection(_) => {}
243 }
244 }
245 }
246
247 timelines
248 }
249}