mz_adapter/catalog/timeline.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to timelines.
11
12use std::collections::{BTreeMap, BTreeSet};
13
14use itertools::Itertools;
15use mz_catalog::memory::objects::{CatalogItem, MaterializedView, View};
16use mz_expr::CollectionPlan;
17use mz_ore::collections::CollectionExt;
18use mz_repr::{CatalogItemId, GlobalId};
19use mz_storage_types::sources::Timeline;
20
21use crate::catalog::Catalog;
22use crate::{AdapterError, CollectionIdBundle, TimelineContext};
23
24impl Catalog {
25 /// Return the [`TimelineContext`] belonging to a [`CatalogItemId`], if one exists.
26 pub(crate) fn get_timeline_context(&self, id: CatalogItemId) -> TimelineContext {
27 let entry = self.get_entry(&id);
28 self.validate_timeline_context(entry.global_ids())
29 .expect("impossible for a single object to belong to incompatible timeline contexts")
30 }
31
32 /// Return the [`TimelineContext`] belonging to a [`GlobalId`], if one exists.
33 pub(crate) fn get_timeline_context_for_global_id(&self, id: GlobalId) -> TimelineContext {
34 self.validate_timeline_context(vec![id])
35 .expect("impossible for a single object to belong to incompatible timeline contexts")
36 }
37
38 /// Returns an iterator that partitions an id bundle by the [`TimelineContext`] that each id
39 /// belongs to.
40 pub fn partition_ids_by_timeline_context(
41 &self,
42 id_bundle: &CollectionIdBundle,
43 ) -> impl Iterator<Item = (TimelineContext, CollectionIdBundle)> + use<> {
44 let mut res: BTreeMap<TimelineContext, CollectionIdBundle> = BTreeMap::new();
45
46 for gid in &id_bundle.storage_ids {
47 let timeline_context = self.get_timeline_context_for_global_id(*gid);
48 res.entry(timeline_context)
49 .or_default()
50 .storage_ids
51 .insert(*gid);
52 }
53
54 for (compute_instance, ids) in &id_bundle.compute_ids {
55 for gid in ids {
56 let timeline_context = self.get_timeline_context_for_global_id(*gid);
57 res.entry(timeline_context)
58 .or_default()
59 .compute_ids
60 .entry(*compute_instance)
61 .or_default()
62 .insert(*gid);
63 }
64 }
65
66 res.into_iter()
67 }
68
69 /// Returns an id bundle containing all the ids in the give timeline.
70 pub(crate) fn ids_in_timeline(&self, timeline: &Timeline) -> CollectionIdBundle {
71 let mut id_bundle = CollectionIdBundle::default();
72 for entry in self.entries() {
73 if let TimelineContext::TimelineDependent(entry_timeline) =
74 self.get_timeline_context(entry.id())
75 {
76 if timeline == &entry_timeline {
77 match entry.item() {
78 CatalogItem::Table(table) => {
79 id_bundle.storage_ids.extend(table.global_ids());
80 }
81 CatalogItem::Source(source) => {
82 id_bundle.storage_ids.insert(source.global_id());
83 }
84 CatalogItem::MaterializedView(mv) => {
85 id_bundle.storage_ids.insert(mv.global_id_writes());
86 }
87 CatalogItem::Index(index) => {
88 id_bundle
89 .compute_ids
90 .entry(index.cluster_id)
91 .or_default()
92 .insert(index.global_id());
93 }
94 CatalogItem::View(_)
95 | CatalogItem::Sink(_)
96 | CatalogItem::Type(_)
97 | CatalogItem::Func(_)
98 | CatalogItem::Secret(_)
99 | CatalogItem::Connection(_)
100 | CatalogItem::Log(_) => {}
101 }
102 }
103 }
104 }
105 id_bundle
106 }
107
108 /// Return an error if the ids are from incompatible [`TimelineContext`]s. This should
109 /// be used to prevent users from doing things that are either meaningless
110 /// (joining data from timelines that have similar numbers with different
111 /// meanings like two separate debezium topics) or will never complete (joining
112 /// cdcv2 and realtime data).
113 pub(crate) fn validate_timeline_context<I>(
114 &self,
115 ids: I,
116 ) -> Result<TimelineContext, AdapterError>
117 where
118 I: IntoIterator<Item = GlobalId>,
119 {
120 let items_ids = ids
121 .into_iter()
122 .filter_map(|gid| self.try_resolve_item_id(&gid));
123 let mut timeline_contexts: Vec<_> =
124 self.get_timeline_contexts(items_ids).into_iter().collect();
125 // If there's more than one timeline, we will not produce meaningful
126 // data to a user. Take, for example, some realtime source and a debezium
127 // consistency topic source. The realtime source uses something close to now
128 // for its timestamps. The debezium source starts at 1 and increments per
129 // transaction. We don't want to choose some timestamp that is valid for both
130 // of these because the debezium source will never get to the same value as the
131 // realtime source's "milliseconds since Unix epoch" value. And even if it did,
132 // it's not meaningful to join just because those two numbers happen to be the
133 // same now.
134 //
135 // Another example: assume two separate debezium consistency topics. Both
136 // start counting at 1 and thus have similarish numbers that probably overlap
137 // a lot. However it's still not meaningful to join those two at a specific
138 // transaction counter number because those counters are unrelated to the
139 // other.
140 let timelines: Vec<_> = timeline_contexts
141 .extract_if(.., |timeline_context| timeline_context.contains_timeline())
142 .collect();
143
144 // A single or group of objects may contain multiple compatible timeline
145 // contexts. For example `SELECT *, 1, mz_now() FROM t` will contain all
146 // types of contexts. We choose the strongest context level to return back.
147 if timelines.len() > 1 {
148 Err(AdapterError::Unsupported(
149 "multiple timelines within one dataflow",
150 ))
151 } else if timelines.len() == 1 {
152 Ok(timelines.into_element())
153 } else if timeline_contexts
154 .iter()
155 .contains(&TimelineContext::TimestampDependent)
156 {
157 Ok(TimelineContext::TimestampDependent)
158 } else {
159 Ok(TimelineContext::TimestampIndependent)
160 }
161 }
162
163 /// Return the [`TimelineContext`]s belonging to a list of [`CatalogItemId`]s, if any exist.
164 fn get_timeline_contexts<I>(&self, ids: I) -> BTreeSet<TimelineContext>
165 where
166 I: IntoIterator<Item = CatalogItemId>,
167 {
168 let mut seen: BTreeSet<CatalogItemId> = BTreeSet::new();
169 let mut timelines: BTreeSet<TimelineContext> = BTreeSet::new();
170
171 // Recurse through IDs to find all sources and tables, adding new ones to
172 // the set until we reach the bottom.
173 let mut ids: Vec<_> = ids.into_iter().collect();
174 while let Some(id) = ids.pop() {
175 // Protect against possible infinite recursion. Not sure if it's possible, but
176 // a cheap prevention for the future.
177 if !seen.insert(id) {
178 continue;
179 }
180 if let Some(entry) = self.try_get_entry(&id) {
181 match entry.item() {
182 CatalogItem::Source(source) => {
183 timelines
184 .insert(TimelineContext::TimelineDependent(source.timeline.clone()));
185 }
186 CatalogItem::Index(index) => {
187 let on_id = self.resolve_item_id(&index.on);
188 ids.push(on_id);
189 }
190 CatalogItem::View(View {
191 locally_optimized_expr: optimized_expr,
192 ..
193 }) => {
194 // If the definition contains a temporal function, the timeline must
195 // be timestamp dependent.
196 if optimized_expr.contains_temporal() {
197 timelines.insert(TimelineContext::TimestampDependent);
198 } else {
199 timelines.insert(TimelineContext::TimestampIndependent);
200 }
201 let item_ids = optimized_expr
202 .depends_on()
203 .into_iter()
204 .map(|gid| self.resolve_item_id(&gid));
205 ids.extend(item_ids);
206 }
207 CatalogItem::MaterializedView(MaterializedView {
208 locally_optimized_expr: optimized_expr,
209 ..
210 }) => {
211 // In some cases the timestamp selected may not affect the answer to a
212 // query, but it may affect our ability to query the materialized view.
213 // Materialized views must durably materialize the result of a query, even
214 // for constant queries. If we choose a timestamp larger than the upper,
215 // which represents the current progress of the view, then the query will
216 // need to block and wait for the materialized view to advance.
217 timelines.insert(TimelineContext::TimestampDependent);
218 let item_ids = optimized_expr
219 .depends_on()
220 .into_iter()
221 .map(|gid| self.resolve_item_id(&gid));
222 ids.extend(item_ids);
223 }
224 CatalogItem::Table(table) => {
225 timelines.insert(TimelineContext::TimelineDependent(table.timeline()));
226 }
227 CatalogItem::Log(_) => {
228 timelines.insert(TimelineContext::TimelineDependent(
229 Timeline::EpochMilliseconds,
230 ));
231 }
232 CatalogItem::Sink(_)
233 | CatalogItem::Type(_)
234 | CatalogItem::Func(_)
235 | CatalogItem::Secret(_)
236 | CatalogItem::Connection(_) => {}
237 }
238 }
239 }
240
241 timelines
242 }
243}