1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14
15use mz_expr::{CollectionPlan, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr};
16use mz_ore::collections::CollectionExt;
17use mz_ore::soft_assert_or_log;
18use mz_repr::refresh_schedule::RefreshSchedule;
19use mz_repr::{GlobalId, SqlRelationType};
20use mz_storage_types::time_dependence::TimeDependence;
21use serde::{Deserialize, Serialize};
22use timely::progress::Antichain;
23
24use crate::plan::Plan;
25use crate::plan::render_plan::RenderPlan;
26use crate::sinks::{ComputeSinkConnection, ComputeSinkDesc};
27use crate::sources::{SourceInstanceArguments, SourceInstanceDesc};
28
29#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
31pub struct DataflowDescription<P, S: 'static = (), T = mz_repr::Timestamp> {
32 pub source_imports: BTreeMap<GlobalId, (SourceInstanceDesc<S>, bool, Antichain<T>)>,
34 pub index_imports: BTreeMap<GlobalId, IndexImport>,
37 pub objects_to_build: Vec<BuildDesc<P>>,
41 pub index_exports: BTreeMap<GlobalId, (IndexDesc, SqlRelationType)>,
44 pub sink_exports: BTreeMap<GlobalId, ComputeSinkDesc<S, T>>,
47 pub as_of: Option<Antichain<T>>,
53 pub until: Antichain<T>,
60 pub initial_storage_as_of: Option<Antichain<T>>,
63 pub refresh_schedule: Option<RefreshSchedule>,
65 pub debug_name: String,
67 pub time_dependence: Option<TimeDependence>,
69}
70
71impl<P, S> DataflowDescription<P, S, mz_repr::Timestamp> {
72 pub fn is_single_time(&self) -> bool {
77 let until = &self.until;
81
82 let Some(as_of) = self.as_of.as_ref() else {
84 return false;
85 };
86 soft_assert_or_log!(
88 timely::PartialOrder::less_equal(as_of, until),
89 "expected empty `as_of ≤ until`, got `{as_of:?} ≰ {until:?}`",
90 );
91 let Some(as_of) = as_of.as_option() else {
93 return false;
94 };
95 soft_assert_or_log!(
97 as_of != &mz_repr::Timestamp::MAX || until.is_empty(),
98 "expected `until = {{}}` due to `as_of = MAX`, got `until = {until:?}`",
99 );
100 as_of.try_step_forward().as_ref() == until.as_option()
103 }
104}
105
106impl<T> DataflowDescription<Plan<T>, (), mz_repr::Timestamp> {
107 pub fn check_invariants(&self) -> Result<(), String> {
109 let mut plans: Vec<_> = self.objects_to_build.iter().map(|o| &o.plan).collect();
110 let mut lir_ids = BTreeSet::new();
111
112 while let Some(plan) = plans.pop() {
113 let lir_id = plan.lir_id;
114 if !lir_ids.insert(lir_id) {
115 return Err(format!(
116 "duplicate `LirId` in `DataflowDescription`: {lir_id}"
117 ));
118 }
119 plans.extend(plan.node.children());
120 }
121
122 Ok(())
123 }
124}
125
126impl<T> DataflowDescription<OptimizedMirRelationExpr, (), T> {
127 pub fn import_index(
133 &mut self,
134 id: GlobalId,
135 desc: IndexDesc,
136 typ: SqlRelationType,
137 monotonic: bool,
138 ) {
139 self.index_imports.insert(
140 id,
141 IndexImport {
142 desc,
143 typ,
144 monotonic,
145 },
146 );
147 }
148
149 pub fn import_source(&mut self, id: GlobalId, typ: SqlRelationType, monotonic: bool) {
151 self.source_imports.insert(
154 id,
155 (
156 SourceInstanceDesc {
157 storage_metadata: (),
158 arguments: SourceInstanceArguments { operators: None },
159 typ,
160 },
161 monotonic,
162 Antichain::new(),
163 ),
164 );
165 }
166
167 pub fn insert_plan(&mut self, id: GlobalId, plan: OptimizedMirRelationExpr) {
169 self.objects_to_build.push(BuildDesc { id, plan });
170 }
171
172 pub fn export_index(&mut self, id: GlobalId, description: IndexDesc, on_type: SqlRelationType) {
177 self.insert_plan(
180 id,
181 OptimizedMirRelationExpr::declare_optimized(MirRelationExpr::ArrangeBy {
182 input: Box::new(MirRelationExpr::global_get(
183 description.on_id,
184 on_type.clone(),
185 )),
186 keys: vec![description.key.clone()],
187 }),
188 );
189 self.index_exports.insert(id, (description, on_type));
190 }
191
192 pub fn export_sink(&mut self, id: GlobalId, description: ComputeSinkDesc<(), T>) {
194 self.sink_exports.insert(id, description);
195 }
196
197 pub fn is_imported(&self, id: &GlobalId) -> bool {
199 self.objects_to_build.iter().any(|bd| &bd.id == id)
200 || self.index_imports.keys().any(|i| i == id)
201 || self.source_imports.keys().any(|i| i == id)
202 }
203
204 pub fn arity_of(&self, id: &GlobalId) -> usize {
206 for (source_id, (source, _monotonic, _upper)) in self.source_imports.iter() {
207 if source_id == id {
208 return source.typ.arity();
209 }
210 }
211 for IndexImport { desc, typ, .. } in self.index_imports.values() {
212 if &desc.on_id == id {
213 return typ.arity();
214 }
215 }
216 for desc in self.objects_to_build.iter() {
217 if &desc.id == id {
218 return desc.plan.arity();
219 }
220 }
221 panic!("GlobalId {} not found in DataflowDesc", id);
222 }
223
224 pub fn visit_children<R, S, E>(&mut self, r: R, s: S) -> Result<(), E>
226 where
227 R: Fn(&mut OptimizedMirRelationExpr) -> Result<(), E>,
228 S: Fn(&mut MirScalarExpr) -> Result<(), E>,
229 {
230 for BuildDesc { plan, .. } in &mut self.objects_to_build {
231 r(plan)?;
232 }
233 for (source_instance_desc, _, _upper) in self.source_imports.values_mut() {
234 let Some(mfp) = source_instance_desc.arguments.operators.as_mut() else {
235 continue;
236 };
237 for expr in mfp.expressions.iter_mut() {
238 s(expr)?;
239 }
240 for (_, expr) in mfp.predicates.iter_mut() {
241 s(expr)?;
242 }
243 }
244 Ok(())
245 }
246}
247
248impl<P, S, T> DataflowDescription<P, S, T> {
249 pub fn new(name: String) -> Self {
251 Self {
252 source_imports: Default::default(),
253 index_imports: Default::default(),
254 objects_to_build: Vec::new(),
255 index_exports: Default::default(),
256 sink_exports: Default::default(),
257 as_of: Default::default(),
258 until: Antichain::new(),
259 initial_storage_as_of: None,
260 refresh_schedule: None,
261 debug_name: name,
262 time_dependence: None,
263 }
264 }
265
266 pub fn set_as_of(&mut self, as_of: Antichain<T>) {
290 self.as_of = Some(as_of);
291 }
292
293 pub fn set_initial_as_of(&mut self, initial_as_of: Antichain<T>) {
295 self.initial_storage_as_of = Some(initial_as_of);
296 }
297
298 pub fn import_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
300 self.imported_index_ids().chain(self.imported_source_ids())
301 }
302
303 pub fn imported_index_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
305 self.index_imports.keys().copied()
306 }
307
308 pub fn imported_source_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
310 self.source_imports.keys().copied()
311 }
312
313 pub fn export_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
315 self.exported_index_ids().chain(self.exported_sink_ids())
316 }
317
318 pub fn exported_index_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
320 self.index_exports.keys().copied()
321 }
322
323 pub fn exported_sink_ids(&self) -> impl Iterator<Item = GlobalId> + Clone + '_ {
325 self.sink_exports.keys().copied()
326 }
327
328 pub fn persist_sink_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
330 self.sink_exports
331 .iter()
332 .filter_map(|(id, desc)| match desc.connection {
333 ComputeSinkConnection::MaterializedView(_) => Some(*id),
334 ComputeSinkConnection::ContinualTask(_) => Some(*id),
335 _ => None,
336 })
337 }
338
339 pub fn subscribe_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
341 self.sink_exports
342 .iter()
343 .filter_map(|(id, desc)| match desc.connection {
344 ComputeSinkConnection::Subscribe(_) => Some(*id),
345 _ => None,
346 })
347 }
348
349 pub fn continual_task_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
351 self.sink_exports
352 .iter()
353 .filter_map(|(id, desc)| match desc.connection {
354 ComputeSinkConnection::ContinualTask(_) => Some(*id),
355 _ => None,
356 })
357 }
358
359 pub fn copy_to_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
361 self.sink_exports
362 .iter()
363 .filter_map(|(id, desc)| match desc.connection {
364 ComputeSinkConnection::CopyToS3Oneshot(_) => Some(*id),
365 _ => None,
366 })
367 }
368
369 pub fn display_import_ids(&self) -> impl fmt::Display + '_ {
371 use mz_ore::str::{bracketed, separated};
372 bracketed("[", "]", separated(", ", self.import_ids()))
373 }
374
375 pub fn display_export_ids(&self) -> impl fmt::Display + '_ {
377 use mz_ore::str::{bracketed, separated};
378 bracketed("[", "]", separated(", ", self.export_ids()))
379 }
380
381 pub fn is_transient(&self) -> bool {
383 self.export_ids().all(|id| id.is_transient())
384 }
385
386 pub fn build_desc(&self, id: GlobalId) -> &BuildDesc<P> {
393 let mut builds = self.objects_to_build.iter().filter(|build| build.id == id);
394 let build = builds
395 .next()
396 .unwrap_or_else(|| panic!("object to build id {id} unexpectedly missing"));
397 assert!(builds.next().is_none());
398 build
399 }
400
401 pub fn sink_id(&self) -> GlobalId {
407 let sink_exports = &self.sink_exports;
408 let sink_id = sink_exports.keys().into_element();
409 *sink_id
410 }
411}
412
413impl<P, S, T> DataflowDescription<P, S, T>
414where
415 P: CollectionPlan,
416{
417 pub fn depends_on(&self, collection_id: GlobalId) -> BTreeSet<GlobalId> {
427 let mut out = BTreeSet::new();
428 self.depends_on_into(collection_id, &mut out);
429 out
430 }
431
432 pub fn depends_on_into(&self, collection_id: GlobalId, out: &mut BTreeSet<GlobalId>) {
434 out.insert(collection_id);
435 if self.source_imports.contains_key(&collection_id) {
436 out.insert(collection_id);
439 return;
440 }
441
442 let mut found_index = false;
446 for (index_id, IndexImport { desc, .. }) in &self.index_imports {
447 if desc.on_id == collection_id {
448 out.insert(*index_id);
451 found_index = true;
452 }
453 }
454 if found_index {
455 return;
456 }
457
458 let build = self.build_desc(collection_id);
461 for id in build.plan.depends_on() {
462 if !out.contains(&id) {
463 self.depends_on_into(id, out)
464 }
465 }
466 }
467
468 pub fn depends_on_imports(&self, collection_id: GlobalId) -> BTreeSet<GlobalId> {
473 let is_import = |id: &GlobalId| {
474 self.source_imports.contains_key(id) || self.index_imports.contains_key(id)
475 };
476
477 let deps = self.depends_on(collection_id);
478 deps.into_iter().filter(is_import).collect()
479 }
480}
481
482impl<S, T> DataflowDescription<RenderPlan, S, T>
483where
484 S: Clone + PartialEq,
485 T: Clone + timely::PartialOrder,
486{
487 pub fn compatible_with(&self, other: &Self) -> bool {
498 let old = self.as_comparable();
499 let new = other.as_comparable();
500
501 let equality = old.index_exports == new.index_exports
502 && old.sink_exports == new.sink_exports
503 && old.objects_to_build == new.objects_to_build
504 && old.index_imports == new.index_imports
505 && old.source_imports == new.source_imports
506 && old.time_dependence == new.time_dependence;
507
508 let partial = if let (Some(old_as_of), Some(new_as_of)) = (&old.as_of, &new.as_of) {
509 timely::PartialOrder::less_equal(old_as_of, new_as_of)
510 } else {
511 false
512 };
513
514 equality && partial
515 }
516
517 fn as_comparable(&self) -> Self {
524 let external_ids: BTreeSet<_> = self.import_ids().chain(self.export_ids()).collect();
525
526 let mut id_counter = 0;
527 let mut replacements = BTreeMap::new();
528
529 let mut maybe_replace = |id: GlobalId| {
530 if id.is_transient() && !external_ids.contains(&id) {
531 *replacements.entry(id).or_insert_with(|| {
532 id_counter += 1;
533 GlobalId::Transient(id_counter)
534 })
535 } else {
536 id
537 }
538 };
539
540 let mut source_imports = self.source_imports.clone();
541 for (_source, _monotonic, upper) in source_imports.values_mut() {
542 *upper = Antichain::new();
543 }
544
545 let mut objects_to_build = self.objects_to_build.clone();
546 for object in &mut objects_to_build {
547 object.id = maybe_replace(object.id);
548 object.plan.replace_ids(&mut maybe_replace);
549 }
550
551 let mut index_exports = self.index_exports.clone();
552 for (desc, _typ) in index_exports.values_mut() {
553 desc.on_id = maybe_replace(desc.on_id);
554 }
555
556 let mut sink_exports = self.sink_exports.clone();
557 for desc in sink_exports.values_mut() {
558 desc.from = maybe_replace(desc.from);
559 }
560
561 DataflowDescription {
562 source_imports,
563 index_imports: self.index_imports.clone(),
564 objects_to_build,
565 index_exports,
566 sink_exports,
567 as_of: self.as_of.clone(),
568 until: self.until.clone(),
569 initial_storage_as_of: self.initial_storage_as_of.clone(),
570 refresh_schedule: self.refresh_schedule.clone(),
571 debug_name: self.debug_name.clone(),
572 time_dependence: self.time_dependence.clone(),
573 }
574 }
575}
576
577pub type DataflowDesc = DataflowDescription<OptimizedMirRelationExpr, ()>;
579
580#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
583pub struct IndexDesc {
584 pub on_id: GlobalId,
586 pub key: Vec<MirScalarExpr>,
588}
589
590#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
592pub struct IndexImport {
593 pub desc: IndexDesc,
595 pub typ: SqlRelationType,
597 pub monotonic: bool,
599}
600
601#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
603pub struct BuildDesc<P> {
604 pub id: GlobalId,
606 pub plan: P,
608}