1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14
15use mz_expr::{CollectionPlan, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr};
16use mz_ore::soft_assert_or_log;
17use mz_repr::refresh_schedule::RefreshSchedule;
18use mz_repr::{GlobalId, SqlRelationType};
19use mz_storage_types::time_dependence::TimeDependence;
20use serde::{Deserialize, Serialize};
21use timely::progress::Antichain;
22
23use crate::plan::Plan;
24use crate::plan::render_plan::RenderPlan;
25use crate::sinks::{ComputeSinkConnection, ComputeSinkDesc};
26use crate::sources::{SourceInstanceArguments, SourceInstanceDesc};
27
28#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
30pub struct DataflowDescription<P, S: 'static = (), T = mz_repr::Timestamp> {
31 pub source_imports: BTreeMap<GlobalId, (SourceInstanceDesc<S>, bool, Antichain<T>)>,
33 pub index_imports: BTreeMap<GlobalId, IndexImport>,
36 pub objects_to_build: Vec<BuildDesc<P>>,
40 pub index_exports: BTreeMap<GlobalId, (IndexDesc, SqlRelationType)>,
43 pub sink_exports: BTreeMap<GlobalId, ComputeSinkDesc<S, T>>,
46 pub as_of: Option<Antichain<T>>,
52 pub until: Antichain<T>,
59 pub initial_storage_as_of: Option<Antichain<T>>,
62 pub refresh_schedule: Option<RefreshSchedule>,
64 pub debug_name: String,
66 pub time_dependence: Option<TimeDependence>,
68}
69
70impl<P, S> DataflowDescription<P, S, mz_repr::Timestamp> {
71 pub fn is_single_time(&self) -> bool {
76 let until = &self.until;
80
81 let Some(as_of) = self.as_of.as_ref() else {
83 return false;
84 };
85 soft_assert_or_log!(
87 timely::PartialOrder::less_equal(as_of, until),
88 "expected empty `as_of ≤ until`, got `{as_of:?} ≰ {until:?}`",
89 );
90 let Some(as_of) = as_of.as_option() else {
92 return false;
93 };
94 soft_assert_or_log!(
96 as_of != &mz_repr::Timestamp::MAX || until.is_empty(),
97 "expected `until = {{}}` due to `as_of = MAX`, got `until = {until:?}`",
98 );
99 as_of.try_step_forward().as_ref() == until.as_option()
102 }
103}
104
105impl<T> DataflowDescription<Plan<T>, (), mz_repr::Timestamp> {
106 pub fn check_invariants(&self) -> Result<(), String> {
108 let mut plans: Vec<_> = self.objects_to_build.iter().map(|o| &o.plan).collect();
109 let mut lir_ids = BTreeSet::new();
110
111 while let Some(plan) = plans.pop() {
112 let lir_id = plan.lir_id;
113 if !lir_ids.insert(lir_id) {
114 return Err(format!(
115 "duplicate `LirId` in `DataflowDescription`: {lir_id}"
116 ));
117 }
118 plans.extend(plan.node.children());
119 }
120
121 Ok(())
122 }
123}
124
125impl<T> DataflowDescription<OptimizedMirRelationExpr, (), T> {
126 pub fn import_index(
132 &mut self,
133 id: GlobalId,
134 desc: IndexDesc,
135 typ: SqlRelationType,
136 monotonic: bool,
137 ) {
138 self.index_imports.insert(
139 id,
140 IndexImport {
141 desc,
142 typ,
143 monotonic,
144 },
145 );
146 }
147
148 pub fn import_source(&mut self, id: GlobalId, typ: SqlRelationType, monotonic: bool) {
150 self.source_imports.insert(
153 id,
154 (
155 SourceInstanceDesc {
156 storage_metadata: (),
157 arguments: SourceInstanceArguments { operators: None },
158 typ,
159 },
160 monotonic,
161 Antichain::new(),
162 ),
163 );
164 }
165
166 pub fn insert_plan(&mut self, id: GlobalId, plan: OptimizedMirRelationExpr) {
168 self.objects_to_build.push(BuildDesc { id, plan });
169 }
170
171 pub fn export_index(&mut self, id: GlobalId, description: IndexDesc, on_type: SqlRelationType) {
176 self.insert_plan(
179 id,
180 OptimizedMirRelationExpr::declare_optimized(MirRelationExpr::ArrangeBy {
181 input: Box::new(MirRelationExpr::global_get(
182 description.on_id,
183 on_type.clone(),
184 )),
185 keys: vec![description.key.clone()],
186 }),
187 );
188 self.index_exports.insert(id, (description, on_type));
189 }
190
191 pub fn export_sink(&mut self, id: GlobalId, description: ComputeSinkDesc<(), T>) {
193 self.sink_exports.insert(id, description);
194 }
195
196 pub fn is_imported(&self, id: &GlobalId) -> bool {
198 self.objects_to_build.iter().any(|bd| &bd.id == id)
199 || self.index_imports.keys().any(|i| i == id)
200 || self.source_imports.keys().any(|i| i == id)
201 }
202
203 pub fn arity_of(&self, id: &GlobalId) -> usize {
205 for (source_id, (source, _monotonic, _upper)) in self.source_imports.iter() {
206 if source_id == id {
207 return source.typ.arity();
208 }
209 }
210 for IndexImport { desc, typ, .. } in self.index_imports.values() {
211 if &desc.on_id == id {
212 return typ.arity();
213 }
214 }
215 for desc in self.objects_to_build.iter() {
216 if &desc.id == id {
217 return desc.plan.arity();
218 }
219 }
220 panic!("GlobalId {} not found in DataflowDesc", id);
221 }
222
223 pub fn visit_children<R, S, E>(&mut self, r: R, s: S) -> Result<(), E>
225 where
226 R: Fn(&mut OptimizedMirRelationExpr) -> Result<(), E>,
227 S: Fn(&mut MirScalarExpr) -> Result<(), E>,
228 {
229 for BuildDesc { plan, .. } in &mut self.objects_to_build {
230 r(plan)?;
231 }
232 for (source_instance_desc, _, _upper) in self.source_imports.values_mut() {
233 let Some(mfp) = source_instance_desc.arguments.operators.as_mut() else {
234 continue;
235 };
236 for expr in mfp.expressions.iter_mut() {
237 s(expr)?;
238 }
239 for (_, expr) in mfp.predicates.iter_mut() {
240 s(expr)?;
241 }
242 }
243 Ok(())
244 }
245}
246
247impl<P, S, T> DataflowDescription<P, S, T> {
248 pub fn new(name: String) -> Self {
250 Self {
251 source_imports: Default::default(),
252 index_imports: Default::default(),
253 objects_to_build: Vec::new(),
254 index_exports: Default::default(),
255 sink_exports: Default::default(),
256 as_of: Default::default(),
257 until: Antichain::new(),
258 initial_storage_as_of: None,
259 refresh_schedule: None,
260 debug_name: name,
261 time_dependence: None,
262 }
263 }
264
265 pub fn set_as_of(&mut self, as_of: Antichain<T>) {
289 self.as_of = Some(as_of);
290 }
291
292 pub fn set_initial_as_of(&mut self, initial_as_of: Antichain<T>) {
294 self.initial_storage_as_of = Some(initial_as_of);
295 }
296
297 pub fn import_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
299 self.imported_index_ids().chain(self.imported_source_ids())
300 }
301
302 pub fn imported_index_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
304 self.index_imports.keys().copied()
305 }
306
307 pub fn imported_source_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
309 self.source_imports.keys().copied()
310 }
311
312 pub fn export_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
314 self.exported_index_ids().chain(self.exported_sink_ids())
315 }
316
317 pub fn exported_index_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
319 self.index_exports.keys().copied()
320 }
321
322 pub fn exported_sink_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
324 self.sink_exports.keys().copied()
325 }
326
327 pub fn persist_sink_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
329 self.sink_exports
330 .iter()
331 .filter_map(|(id, desc)| match desc.connection {
332 ComputeSinkConnection::MaterializedView(_) => Some(*id),
333 ComputeSinkConnection::ContinualTask(_) => Some(*id),
334 _ => None,
335 })
336 }
337
338 pub fn subscribe_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
340 self.sink_exports
341 .iter()
342 .filter_map(|(id, desc)| match desc.connection {
343 ComputeSinkConnection::Subscribe(_) => Some(*id),
344 _ => None,
345 })
346 }
347
348 pub fn continual_task_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
350 self.sink_exports
351 .iter()
352 .filter_map(|(id, desc)| match desc.connection {
353 ComputeSinkConnection::ContinualTask(_) => Some(*id),
354 _ => None,
355 })
356 }
357
358 pub fn copy_to_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
360 self.sink_exports
361 .iter()
362 .filter_map(|(id, desc)| match desc.connection {
363 ComputeSinkConnection::CopyToS3Oneshot(_) => Some(*id),
364 _ => None,
365 })
366 }
367
368 pub fn display_import_ids(&self) -> impl fmt::Display + '_ {
370 use mz_ore::str::{bracketed, separated};
371 bracketed("[", "]", separated(", ", self.import_ids()))
372 }
373
374 pub fn display_export_ids(&self) -> impl fmt::Display + '_ {
376 use mz_ore::str::{bracketed, separated};
377 bracketed("[", "]", separated(", ", self.export_ids()))
378 }
379
380 pub fn is_transient(&self) -> bool {
382 self.export_ids().all(|id| id.is_transient())
383 }
384
385 pub fn build_desc(&self, id: GlobalId) -> &BuildDesc<P> {
392 let mut builds = self.objects_to_build.iter().filter(|build| build.id == id);
393 let build = builds
394 .next()
395 .unwrap_or_else(|| panic!("object to build id {id} unexpectedly missing"));
396 assert!(builds.next().is_none());
397 build
398 }
399}
400
401impl<P, S, T> DataflowDescription<P, S, T>
402where
403 P: CollectionPlan,
404{
405 pub fn depends_on(&self, collection_id: GlobalId) -> BTreeSet<GlobalId> {
415 let mut out = BTreeSet::new();
416 self.depends_on_into(collection_id, &mut out);
417 out
418 }
419
420 pub fn depends_on_into(&self, collection_id: GlobalId, out: &mut BTreeSet<GlobalId>) {
422 out.insert(collection_id);
423 if self.source_imports.contains_key(&collection_id) {
424 out.insert(collection_id);
427 return;
428 }
429
430 let mut found_index = false;
434 for (index_id, IndexImport { desc, .. }) in &self.index_imports {
435 if desc.on_id == collection_id {
436 out.insert(*index_id);
439 found_index = true;
440 }
441 }
442 if found_index {
443 return;
444 }
445
446 let build = self.build_desc(collection_id);
449 for id in build.plan.depends_on() {
450 if !out.contains(&id) {
451 self.depends_on_into(id, out)
452 }
453 }
454 }
455
456 pub fn depends_on_imports(&self, collection_id: GlobalId) -> BTreeSet<GlobalId> {
461 let is_import = |id: &GlobalId| {
462 self.source_imports.contains_key(id) || self.index_imports.contains_key(id)
463 };
464
465 let deps = self.depends_on(collection_id);
466 deps.into_iter().filter(is_import).collect()
467 }
468}
469
470impl<S, T> DataflowDescription<RenderPlan, S, T>
471where
472 S: Clone + PartialEq,
473 T: Clone + timely::PartialOrder,
474{
475 pub fn compatible_with(&self, other: &Self) -> bool {
486 let old = self.as_comparable();
487 let new = other.as_comparable();
488
489 let equality = old.index_exports == new.index_exports
490 && old.sink_exports == new.sink_exports
491 && old.objects_to_build == new.objects_to_build
492 && old.index_imports == new.index_imports
493 && old.source_imports == new.source_imports
494 && old.time_dependence == new.time_dependence;
495
496 let partial = if let (Some(old_as_of), Some(new_as_of)) = (&old.as_of, &new.as_of) {
497 timely::PartialOrder::less_equal(old_as_of, new_as_of)
498 } else {
499 false
500 };
501
502 equality && partial
503 }
504
505 fn as_comparable(&self) -> Self {
512 let external_ids: BTreeSet<_> = self.import_ids().chain(self.export_ids()).collect();
513
514 let mut id_counter = 0;
515 let mut replacements = BTreeMap::new();
516
517 let mut maybe_replace = |id: GlobalId| {
518 if id.is_transient() && !external_ids.contains(&id) {
519 *replacements.entry(id).or_insert_with(|| {
520 id_counter += 1;
521 GlobalId::Transient(id_counter)
522 })
523 } else {
524 id
525 }
526 };
527
528 let mut source_imports = self.source_imports.clone();
529 for (_source, _monotonic, upper) in source_imports.values_mut() {
530 *upper = Antichain::new();
531 }
532
533 let mut objects_to_build = self.objects_to_build.clone();
534 for object in &mut objects_to_build {
535 object.id = maybe_replace(object.id);
536 object.plan.replace_ids(&mut maybe_replace);
537 }
538
539 let mut index_exports = self.index_exports.clone();
540 for (desc, _typ) in index_exports.values_mut() {
541 desc.on_id = maybe_replace(desc.on_id);
542 }
543
544 let mut sink_exports = self.sink_exports.clone();
545 for desc in sink_exports.values_mut() {
546 desc.from = maybe_replace(desc.from);
547 }
548
549 DataflowDescription {
550 source_imports,
551 index_imports: self.index_imports.clone(),
552 objects_to_build,
553 index_exports,
554 sink_exports,
555 as_of: self.as_of.clone(),
556 until: self.until.clone(),
557 initial_storage_as_of: self.initial_storage_as_of.clone(),
558 refresh_schedule: self.refresh_schedule.clone(),
559 debug_name: self.debug_name.clone(),
560 time_dependence: self.time_dependence.clone(),
561 }
562 }
563}
564
565pub type DataflowDesc = DataflowDescription<OptimizedMirRelationExpr, ()>;
567
568#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
571pub struct IndexDesc {
572 pub on_id: GlobalId,
574 pub key: Vec<MirScalarExpr>,
576}
577
578#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
580pub struct IndexImport {
581 pub desc: IndexDesc,
583 pub typ: SqlRelationType,
585 pub monotonic: bool,
587}
588
589#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
591pub struct BuildDesc<P> {
592 pub id: GlobalId,
594 pub plan: P,
596}