mz_transform/dataflow.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Whole-dataflow optimization
11//!
12//! A dataflow may contain multiple views, each of which may only be
13//! optimized locally. However, information like demand and predicate
14//! pushdown can be applied across views once we understand the context
15//! in which the views will be executed.
16
17use std::collections::{BTreeMap, BTreeSet};
18
19use itertools::Itertools;
20use mz_compute_types::dataflows::{BuildDesc, DataflowDesc, DataflowDescription, IndexImport};
21use mz_expr::{
22 AccessStrategy, CollectionPlan, Id, JoinImplementation, LocalId, MapFilterProject,
23 MirRelationExpr, MirScalarExpr, RECURSION_LIMIT,
24};
25use mz_ore::stack::{CheckedRecursion, RecursionGuard, RecursionLimitError};
26use mz_ore::{assert_none, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
27use mz_repr::GlobalId;
28use mz_repr::explain::{DeltaJoinIndexUsageType, IndexUsageType, UsedIndexes};
29#[cfg(any(test, feature = "proptest"))]
30use proptest_derive::Arbitrary;
31use serde::{Deserialize, Serialize};
32
33use crate::monotonic::MonotonicFlag;
34use crate::notice::RawOptimizerNotice;
35use crate::{IndexOracle, Optimizer, TransformCtx, TransformError};
36
37/// Optimizes the implementation of each dataflow.
38///
39/// Inlines views, performs a full optimization pass including physical
40/// planning using the supplied indexes, propagates filtering and projection
41/// information to dataflow sources and lifts monotonicity information.
42#[mz_ore::instrument(
43 target = "optimizer",
44 level = "debug",
45 fields(path.segment ="global")
46)]
47pub fn optimize_dataflow(
48 dataflow: &mut DataflowDesc,
49 transform_ctx: &mut TransformCtx,
50 fast_path_optimizer: bool,
51) -> Result<(), TransformError> {
52 // Inline views that are used in only one other view.
53 inline_views(dataflow)?;
54
55 if fast_path_optimizer {
56 optimize_dataflow_relations(
57 dataflow,
58 &Optimizer::fast_path_optimizer(transform_ctx),
59 transform_ctx,
60 )?;
61 } else {
62 // Logical optimization pass after view inlining
63 optimize_dataflow_relations(
64 dataflow,
65 #[allow(deprecated)]
66 &Optimizer::logical_optimizer(transform_ctx),
67 transform_ctx,
68 )?;
69
70 optimize_dataflow_filters(dataflow)?;
71 // TODO: when the linear operator contract ensures that propagated
72 // predicates are always applied, projections and filters can be removed
73 // from where they come from. Once projections and filters can be removed,
74 // TODO: it would be useful for demand to be optimized after filters
75 // that way demand only includes the columns that are still necessary after
76 // the filters are applied.
77 optimize_dataflow_demand(dataflow)?;
78
79 // A smaller logical optimization pass after projections and filters are
80 // pushed down across views.
81 optimize_dataflow_relations(
82 dataflow,
83 &Optimizer::logical_cleanup_pass(transform_ctx, false),
84 transform_ctx,
85 )?;
86
87 // Physical optimization pass
88 optimize_dataflow_relations(
89 dataflow,
90 &Optimizer::physical_optimizer(transform_ctx),
91 transform_ctx,
92 )?;
93
94 optimize_dataflow_monotonic(dataflow, transform_ctx)?;
95 }
96
97 prune_and_annotate_dataflow_index_imports(
98 dataflow,
99 transform_ctx.indexes,
100 transform_ctx.df_meta,
101 )?;
102
103 // Warning: If you want to add a transform call here, consider it very carefully whether it
104 // could accidentally invalidate information that we already derived above in
105 // `optimize_dataflow_monotonic` or `prune_and_annotate_dataflow_index_imports`.
106
107 mz_repr::explain::trace_plan(dataflow);
108
109 Ok(())
110}
111
112/// Inline views used in one other view, and in no exported objects.
113#[mz_ore::instrument(
114 target = "optimizer",
115 level = "debug",
116 fields(path.segment = "inline_views")
117)]
118fn inline_views(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
119 // We cannot inline anything whose `BuildDesc::id` appears in either the
120 // `index_exports` or `sink_exports` of `dataflow`, because we lose our
121 // ability to name it.
122
123 // A view can / should be in-lined in another view if it is only used by
124 // one subsequent view. If there are two distinct views that have not
125 // themselves been merged, then too bad and it doesn't get inlined.
126
127 // Starting from the *last* object to build, walk backwards and inline
128 // any view that is neither referenced by a `index_exports` nor
129 // `sink_exports` nor more than two remaining objects to build.
130
131 for index in (0..dataflow.objects_to_build.len()).rev() {
132 // Capture the name used by others to reference this view.
133 let global_id = dataflow.objects_to_build[index].id;
134 // Determine if any exports directly reference this view.
135 let mut occurs_in_export = false;
136 for (_gid, sink_desc) in dataflow.sink_exports.iter() {
137 if sink_desc.from == global_id {
138 occurs_in_export = true;
139 }
140 }
141 for (_, (index_desc, _)) in dataflow.index_exports.iter() {
142 if index_desc.on_id == global_id {
143 occurs_in_export = true;
144 }
145 }
146 // Count the number of subsequent views that reference this view.
147 let mut occurrences_in_later_views = Vec::new();
148 for other in (index + 1)..dataflow.objects_to_build.len() {
149 if dataflow.objects_to_build[other]
150 .plan
151 .depends_on()
152 .contains(&global_id)
153 {
154 occurrences_in_later_views.push(other);
155 }
156 }
157 // Inline if the view is referenced in one view and no exports.
158 if !occurs_in_export && occurrences_in_later_views.len() == 1 {
159 let other = occurrences_in_later_views[0];
160 // We can remove this view and insert it in the later view,
161 // but are not able to relocate the later view `other`.
162
163 // When splicing in the `index` view, we need to create disjoint
164 // identifiers for the Let's `body` and `value`, as well as a new
165 // identifier for the binding itself. Following `NormalizeLets`, we
166 // go with the binding first, then the value, then the body.
167 let mut id_gen = crate::IdGen::default();
168 let new_local = LocalId::new(id_gen.allocate_id());
169 // Use the same `id_gen` to assign new identifiers to `index`.
170 crate::normalize_lets::renumber_bindings(
171 dataflow.objects_to_build[index].plan.as_inner_mut(),
172 &mut id_gen,
173 )?;
174 // Assign new identifiers to the other relation.
175 crate::normalize_lets::renumber_bindings(
176 dataflow.objects_to_build[other].plan.as_inner_mut(),
177 &mut id_gen,
178 )?;
179 // Install the `new_local` name wherever `global_id` was used.
180 dataflow.objects_to_build[other]
181 .plan
182 .as_inner_mut()
183 .visit_pre_mut(|expr| {
184 if let MirRelationExpr::Get { id, .. } = expr {
185 if id == &Id::Global(global_id) {
186 *id = Id::Local(new_local);
187 }
188 }
189 });
190
191 // With identifiers rewritten, we can replace `other` with
192 // a `MirRelationExpr::Let` binding, whose value is `index` and
193 // whose body is `other`.
194 let body = dataflow.objects_to_build[other]
195 .plan
196 .as_inner_mut()
197 .take_dangerous();
198 let value = dataflow.objects_to_build[index]
199 .plan
200 .as_inner_mut()
201 .take_dangerous();
202 *dataflow.objects_to_build[other].plan.as_inner_mut() = MirRelationExpr::Let {
203 id: new_local,
204 value: Box::new(value),
205 body: Box::new(body),
206 };
207 dataflow.objects_to_build.remove(index);
208 }
209 }
210
211 mz_repr::explain::trace_plan(dataflow);
212
213 Ok(())
214}
215
216/// Performs either the logical or the physical optimization pass on the
217/// dataflow using the supplied set of indexes.
218#[mz_ore::instrument(
219 target = "optimizer",
220 level = "debug",
221 fields(path.segment = optimizer.name)
222)]
223fn optimize_dataflow_relations(
224 dataflow: &mut DataflowDesc,
225 optimizer: &Optimizer,
226 ctx: &mut TransformCtx,
227) -> Result<(), TransformError> {
228 // Re-optimize each dataflow
229 for object in dataflow.objects_to_build.iter_mut() {
230 // Re-run all optimizations on the composite views.
231 ctx.set_global_id(object.id);
232 optimizer.transform(object.plan.as_inner_mut(), ctx)?;
233 ctx.reset_global_id();
234 }
235
236 mz_repr::explain::trace_plan(dataflow);
237
238 Ok(())
239}
240
241/// Pushes demand information from published outputs to dataflow inputs,
242/// projecting away unnecessary columns.
243///
244/// Dataflows that exist for the sake of generating plan explanations do not
245/// have published outputs. In this case, we push demand information from views
246/// not depended on by other views to dataflow inputs.
247#[mz_ore::instrument(
248 target = "optimizer",
249 level = "debug",
250 fields(path.segment ="demand")
251)]
252fn optimize_dataflow_demand(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
253 // Maps id -> union of known columns demanded from the source/view with the
254 // corresponding id.
255 let mut demand = BTreeMap::new();
256
257 if dataflow.index_exports.is_empty() && dataflow.sink_exports.is_empty() {
258 // In the absence of any exports, just demand all columns from views
259 // that are not depended on by another view, which is currently the last
260 // object in `objects_to_build`.
261
262 // A DataflowDesc without exports is currently created in the context of
263 // EXPLAIN outputs. This ensures that the output has all the columns of
264 // the original explainee.
265 if let Some(build_desc) = dataflow.objects_to_build.iter_mut().rev().next() {
266 demand
267 .entry(Id::Global(build_desc.id))
268 .or_insert_with(BTreeSet::new)
269 .extend(0..build_desc.plan.as_inner_mut().arity());
270 }
271 } else {
272 // Demand all columns of inputs to sinks.
273 for (_id, sink) in dataflow.sink_exports.iter() {
274 let input_id = sink.from;
275 demand
276 .entry(Id::Global(input_id))
277 .or_insert_with(BTreeSet::new)
278 .extend(0..dataflow.arity_of(&input_id));
279 }
280
281 // Demand all columns of inputs to exported indexes.
282 for (_id, (desc, _typ)) in dataflow.index_exports.iter() {
283 let input_id = desc.on_id;
284 demand
285 .entry(Id::Global(input_id))
286 .or_insert_with(BTreeSet::new)
287 .extend(0..dataflow.arity_of(&input_id));
288 }
289 }
290
291 optimize_dataflow_demand_inner(
292 dataflow
293 .objects_to_build
294 .iter_mut()
295 .rev()
296 .map(|build_desc| (Id::Global(build_desc.id), build_desc.plan.as_inner_mut())),
297 &mut demand,
298 )?;
299
300 mz_repr::explain::trace_plan(dataflow);
301
302 Ok(())
303}
304
305/// Pushes demand through views in `view_sequence` in order, removing
306/// columns not demanded.
307///
308/// This method is made public for the sake of testing.
309/// TODO: make this private once we allow multiple exports per dataflow.
310pub fn optimize_dataflow_demand_inner<'a, I>(
311 view_sequence: I,
312 demand: &mut BTreeMap<Id, BTreeSet<usize>>,
313) -> Result<(), TransformError>
314where
315 I: Iterator<Item = (Id, &'a mut MirRelationExpr)>,
316{
317 // Maps id -> The projection that was pushed down on the view with the
318 // corresponding id.
319 let mut applied_projection = BTreeMap::new();
320 // Collect the mutable references to views after pushing projection down
321 // in order to run cleanup actions on them in a second loop.
322 let mut view_refs = Vec::new();
323 let projection_pushdown = crate::movement::ProjectionPushdown::default();
324 for (id, view) in view_sequence {
325 if let Some(columns) = demand.get(&id) {
326 let projection_pushed_down = columns.iter().map(|c| *c).collect();
327 // Push down the projection consisting of the entries of `columns`
328 // in increasing order.
329 projection_pushdown.action(view, &projection_pushed_down, demand)?;
330 let new_type = view.typ();
331 applied_projection.insert(id, (projection_pushed_down, new_type));
332 }
333 view_refs.push(view);
334 }
335
336 for view in view_refs {
337 // Update `Get` nodes to reflect any columns that have been projected away.
338 projection_pushdown.update_projection_around_get(view, &applied_projection);
339 }
340
341 Ok(())
342}
343
344/// Pushes predicate to dataflow inputs.
345#[mz_ore::instrument(
346 target = "optimizer",
347 level = "debug",
348 fields(path.segment ="filters")
349)]
350fn optimize_dataflow_filters(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
351 // Contains id -> predicates map, describing those predicates that
352 // can (but need not) be applied to the collection named by `id`.
353 let mut predicates = BTreeMap::<Id, BTreeSet<mz_expr::MirScalarExpr>>::new();
354
355 // Propagate predicate information from outputs to inputs.
356 optimize_dataflow_filters_inner(
357 dataflow
358 .objects_to_build
359 .iter_mut()
360 .rev()
361 .map(|build_desc| (Id::Global(build_desc.id), build_desc.plan.as_inner_mut())),
362 &mut predicates,
363 )?;
364
365 // Push predicate information into the SourceDesc.
366 for (source_id, source_import) in dataflow.source_imports.iter_mut() {
367 let source = &mut source_import.desc;
368 if let Some(list) = predicates.remove(&Id::Global(*source_id)) {
369 if !list.is_empty() {
370 // Canonicalize the order of predicates, for stable plans.
371 let mut list = list.into_iter().collect::<Vec<_>>();
372 list.sort();
373 // Install no-op predicate information if none exists.
374 if source.arguments.operators.is_none() {
375 source.arguments.operators = Some(MapFilterProject::new(source.typ.arity()));
376 }
377 // Add any predicates that can be pushed to the source.
378 if let Some(operator) = source.arguments.operators.take() {
379 source.arguments.operators = Some(operator.filter(list));
380 source.arguments.operators.as_mut().map(|x| x.optimize());
381 }
382 }
383 }
384 }
385
386 mz_repr::explain::trace_plan(dataflow);
387
388 Ok(())
389}
390
391/// Pushes filters down through views in `view_sequence` in order.
392///
393/// This method is made public for the sake of testing.
394/// TODO: make this private once we allow multiple exports per dataflow.
395pub fn optimize_dataflow_filters_inner<'a, I>(
396 view_iter: I,
397 predicates: &mut BTreeMap<Id, BTreeSet<mz_expr::MirScalarExpr>>,
398) -> Result<(), TransformError>
399where
400 I: Iterator<Item = (Id, &'a mut MirRelationExpr)>,
401{
402 let transform = crate::predicate_pushdown::PredicatePushdown::default();
403 for (id, view) in view_iter {
404 if let Some(list) = predicates.get(&id).clone() {
405 if !list.is_empty() {
406 *view = view.take_dangerous().filter(list.iter().cloned());
407 }
408 }
409 transform.action(view, predicates)?;
410 }
411 Ok(())
412}
413
414/// Propagates information about monotonic inputs through operators,
415/// using [`mz_repr::optimize::OptimizerFeatures`] from `ctx` for [`crate::analysis::Analysis`].
416#[mz_ore::instrument(
417 target = "optimizer",
418 level = "debug",
419 fields(path.segment ="monotonic")
420)]
421pub fn optimize_dataflow_monotonic(
422 dataflow: &mut DataflowDesc,
423 ctx: &mut TransformCtx,
424) -> Result<(), TransformError> {
425 let mut monotonic_ids = BTreeSet::new();
426 for (source_id, source_import) in dataflow.source_imports.iter() {
427 if source_import.monotonic {
428 monotonic_ids.insert(source_id.clone());
429 }
430 }
431 for (
432 _index_id,
433 IndexImport {
434 desc: index_desc,
435 monotonic,
436 ..
437 },
438 ) in dataflow.index_imports.iter()
439 {
440 if *monotonic {
441 monotonic_ids.insert(index_desc.on_id.clone());
442 }
443 }
444
445 let monotonic_flag = MonotonicFlag::default();
446
447 for build_desc in dataflow.objects_to_build.iter_mut() {
448 monotonic_flag.transform(build_desc.plan.as_inner_mut(), ctx, &monotonic_ids)?;
449 }
450
451 mz_repr::explain::trace_plan(dataflow);
452
453 Ok(())
454}
455
456/// Determine whether we require snapshots from our durable source imports.
457/// (For example, these can often be skipped for simple subscribe queries.)
458pub fn optimize_dataflow_snapshot(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
459 // For every global id, true iff we need a snapshot for that global ID.
460 // This is computed bottom-up: subscribes may or may not require a snapshot from their inputs,
461 // index exports definitely do, and objects-to-build require a snapshot from their inputs if
462 // either they need to provide a snapshot as output or they may need snapshots internally, eg. to
463 // compute a join.
464 let mut downstream_requires_snapshot = BTreeMap::new();
465
466 for (_id, export) in &dataflow.sink_exports {
467 *downstream_requires_snapshot
468 .entry(Id::Global(export.from))
469 .or_default() |= export.with_snapshot;
470 }
471 for (_id, (export, _typ)) in &dataflow.index_exports {
472 *downstream_requires_snapshot
473 .entry(Id::Global(export.on_id))
474 .or_default() |= true;
475 }
476 for BuildDesc { id: _, plan } in dataflow.objects_to_build.iter().rev() {
477 // For now, we treat all intermediate nodes as potentially requiring a snapshot.
478 // Walk the AST, marking anything depended on by a compute object as snapshot-required.
479 let mut todo = vec![(true, &plan.0)];
480 while let Some((requires_snapshot, expr)) = todo.pop() {
481 match expr {
482 MirRelationExpr::Get { id, .. } => {
483 *downstream_requires_snapshot.entry(*id).or_default() |= requires_snapshot;
484 }
485 other => {
486 todo.extend(other.children().rev().map(|c| (true, c)));
487 }
488 }
489 }
490 }
491 for (id, import) in &mut dataflow.source_imports {
492 let with_snapshot = downstream_requires_snapshot
493 .entry(Id::Global(*id))
494 .or_default();
495
496 // As above, fetch the snapshot if there are any transformations on the raw source data.
497 // (And we'll always need to check for things like temporal filters, since those allow
498 // snapshot data to affect diffs at times past the as-of.)
499 *with_snapshot |= import.desc.arguments.operators.is_some();
500
501 import.with_snapshot = *with_snapshot;
502 }
503 for (_id, import) in &mut dataflow.index_imports {
504 let with_snapshot = downstream_requires_snapshot
505 .entry(Id::Global(import.desc.on_id))
506 .or_default();
507
508 import.with_snapshot = *with_snapshot;
509 }
510
511 Ok(())
512}
513
514/// Restricts the indexes imported by `dataflow` to only the ones it needs.
515/// It also adds to the `DataflowMetainfo` how each index will be used.
516/// It also annotates global `Get`s with whether they will be reads from Persist or an index, plus
517/// their index usage types.
518///
519/// The input `dataflow` should import all indexes belonging to all views/sources/tables it
520/// references.
521///
522/// The input plans should be normalized with `NormalizeLets`! Otherwise, we might find dangling
523/// `ArrangeBy`s at the top of unused Let bindings.
524#[mz_ore::instrument(
525 target = "optimizer",
526 level = "debug",
527 fields(path.segment = "index_imports")
528)]
529fn prune_and_annotate_dataflow_index_imports(
530 dataflow: &mut DataflowDesc,
531 indexes: &dyn IndexOracle,
532 dataflow_metainfo: &mut DataflowMetainfo,
533) -> Result<(), TransformError> {
534 // Preparation.
535 // Let's save the unique keys of the sources. This will inform which indexes to choose for full
536 // scans. (We can't get this info from `source_imports`, because `source_imports` only has those
537 // sources that are not getting an indexed read.)
538 let mut source_keys = BTreeMap::new();
539 for build_desc in dataflow.objects_to_build.iter() {
540 build_desc
541 .plan
542 .as_inner()
543 .visit_pre(|expr: &MirRelationExpr| match expr {
544 MirRelationExpr::Get {
545 id: Id::Global(global_id),
546 typ,
547 ..
548 } => {
549 source_keys.entry(*global_id).or_insert_with(|| {
550 typ.keys
551 .iter()
552 .map(|key| {
553 key.iter()
554 // Convert the Vec<usize> key to Vec<MirScalarExpr>, so that
555 // later we can more easily compare index keys to these keys.
556 .map(|col_idx| MirScalarExpr::column(*col_idx))
557 .collect()
558 })
559 .collect()
560 });
561 }
562 _ => {}
563 });
564 }
565
566 // This will be a mapping of
567 // (ids used by exports and objects to build) ->
568 // (arrangement keys and usage types on that id that have been requested)
569 let mut index_reqs_by_id = BTreeMap::new();
570
571 // Go through the MIR plans of `objects_to_build` and collect which arrangements are requested
572 // for which we also have an available index.
573 for build_desc in dataflow.objects_to_build.iter_mut() {
574 CollectIndexRequests::new(&source_keys, indexes, &mut index_reqs_by_id)
575 .collect_index_reqs(build_desc.plan.as_inner_mut())?;
576 }
577
578 // Collect index usages by `sink_exports`.
579 // A sink export sometimes wants to directly use an imported index. I know of one case where
580 // this happens: The dataflow for a SUBSCRIBE on an indexed view won't have any
581 // `objects_to_build`, but will want to directly read from the index and write to a sink.
582 for (_sink_id, sink_desc) in dataflow.sink_exports.iter() {
583 // First, let's see if there exists an index on the id that the sink wants. If not, there is
584 // nothing we can do here.
585 if let Some((idx_id, arbitrary_idx_key)) = indexes.indexes_on(sink_desc.from).next() {
586 // If yes, then we'll add a request of _some_ index: If we already collected an index
587 // request on this id, then use that, otherwise use the above arbitrarily picked index.
588 let requested_idxs = index_reqs_by_id
589 .entry(sink_desc.from)
590 .or_insert_with(Vec::new);
591 if let Some((already_req_idx_id, already_req_key, _)) = requested_idxs.get(0) {
592 requested_idxs.push((
593 *already_req_idx_id,
594 already_req_key.clone(),
595 IndexUsageType::SinkExport,
596 ));
597 } else {
598 requested_idxs.push((
599 idx_id,
600 arbitrary_idx_key.to_owned(),
601 IndexUsageType::SinkExport,
602 ));
603 }
604 }
605 }
606
607 // Collect index usages by `index_exports`.
608 for (_id, (index_desc, _)) in dataflow.index_exports.iter() {
609 // First, let's see if there exists an index on the id that the exported index is on. If
610 // not, there is nothing we can do here.
611 if let Some((idx_id, arbitrary_index_key)) = indexes.indexes_on(index_desc.on_id).next() {
612 // If yes, then we'll add an index request of some index: If we already collected an
613 // index request on this id, then use that, otherwise use the above arbitrarily picked
614 // index.
615 let requested_idxs = index_reqs_by_id
616 .entry(index_desc.on_id)
617 .or_insert_with(Vec::new);
618 if let Some((already_req_idx_id, already_req_key, _)) = requested_idxs.get(0) {
619 requested_idxs.push((
620 *already_req_idx_id,
621 already_req_key.clone(),
622 IndexUsageType::IndexExport,
623 ));
624 } else {
625 // This is surprising: Actually, an index creation dataflow always has a plan in
626 // `objects_to_build` that will have a Get of the object that the index is on (see
627 // `DataflowDescription::export_index`). Therefore, we should have already requested
628 // an index usage when seeing that Get in `CollectIndexRequests`.
629 soft_panic_or_log!(
630 "We are seeing an index export on an id that's not mentioned in `objects_to_build`"
631 );
632 requested_idxs.push((
633 idx_id,
634 arbitrary_index_key.to_owned(),
635 IndexUsageType::IndexExport,
636 ));
637 }
638 }
639 }
640
641 // By now, `index_reqs_by_id` has all ids that we think might benefit from having an index on.
642 // Moreover, for each of these ids, if any index exists on it, then we should have already
643 // picked one. If not, then we have a bug somewhere. In that case, do a soft panic, and add an
644 // Unknown usage, picking an arbitrary index.
645 for (id, index_reqs) in index_reqs_by_id.iter_mut() {
646 if index_reqs.is_empty() {
647 // Try to pick an arbitrary index to be fully scanned.
648 if let Some((idx_id, key)) = indexes.indexes_on(*id).next() {
649 soft_panic_or_log!(
650 "prune_and_annotate_dataflow_index_imports didn't find any index for an id, even though one exists
651id: {}, key: {:?}",
652 id,
653 key
654 );
655 index_reqs.push((idx_id, key.to_owned(), IndexUsageType::Unknown));
656 }
657 }
658 }
659
660 // Adjust FullScans to not introduce a new index dependency if there is also some non-FullScan
661 // request on the same id.
662 // `full_scan_changes` saves the changes that we do: Each (Get id, index id) entry indicates
663 // that if a Get has that id, then any full scan index accesses on it should be changed to use
664 // the indicated index id.
665 let mut full_scan_changes = BTreeMap::new();
666 for (get_id, index_reqs) in index_reqs_by_id.iter_mut() {
667 // Let's choose a non-FullScan access (if exists).
668 if let Some((picked_idx, picked_idx_key)) = choose_index(
669 &source_keys,
670 get_id,
671 &index_reqs
672 .iter()
673 .filter_map(|(idx_id, key, usage_type)| match usage_type {
674 IndexUsageType::FullScan => None,
675 _ => Some((*idx_id, key.clone())),
676 })
677 .collect_vec(),
678 ) {
679 // Found a non-FullScan access. Modify all FullScans to use the same index as that one.
680 for (idx_id, key, usage_type) in index_reqs {
681 match usage_type {
682 IndexUsageType::FullScan => {
683 full_scan_changes.insert(get_id, picked_idx);
684 *idx_id = picked_idx;
685 key.clone_from(&picked_idx_key);
686 }
687 _ => {}
688 }
689 }
690 }
691 }
692 // Apply the above full scan changes to also the Gets.
693 for build_desc in dataflow.objects_to_build.iter_mut() {
694 build_desc
695 .plan
696 .as_inner_mut()
697 .visit_pre_mut(|expr: &mut MirRelationExpr| {
698 match expr {
699 MirRelationExpr::Get {
700 id: Id::Global(global_id),
701 typ: _,
702 access_strategy: persist_or_index,
703 } => {
704 if let Some(new_idx_id) = full_scan_changes.get(global_id) {
705 match persist_or_index {
706 AccessStrategy::UnknownOrLocal => {
707 // Should have been already filled by `collect_index_reqs`.
708 unreachable!()
709 }
710 AccessStrategy::Persist => {
711 // We already know that it's an indexed access.
712 unreachable!()
713 }
714 AccessStrategy::SameDataflow => {
715 // We have not added such annotations yet.
716 unreachable!()
717 }
718 AccessStrategy::Index(accesses) => {
719 for (idx_id, usage_type) in accesses {
720 if matches!(usage_type, IndexUsageType::FullScan) {
721 *idx_id = *new_idx_id;
722 }
723 }
724 }
725 }
726 }
727 }
728 _ => {}
729 }
730 });
731 }
732
733 // Annotate index imports by their usage types
734 dataflow_metainfo.index_usage_types = BTreeMap::new();
735 for (
736 index_id,
737 IndexImport {
738 desc: index_desc,
739 typ: _,
740 monotonic: _,
741 with_snapshot: _,
742 },
743 ) in dataflow.index_imports.iter_mut()
744 {
745 // A sanity check that we are not importing an index that we are also exporting.
746 assert!(
747 !dataflow
748 .index_exports
749 .iter()
750 .map(|(exported_index_id, _)| exported_index_id)
751 .any(|exported_index_id| exported_index_id == index_id)
752 );
753
754 let mut new_usage_types = Vec::new();
755 // Let's see whether something has requested an index on this object that this imported
756 // index is on.
757 if let Some(index_reqs) = index_reqs_by_id.get(&index_desc.on_id) {
758 for (req_idx_id, req_key, req_usage_type) in index_reqs {
759 if req_idx_id == index_id {
760 soft_assert_eq_or_log!(*req_key, index_desc.key);
761 new_usage_types.push(req_usage_type.clone());
762 }
763 }
764 }
765 if !new_usage_types.is_empty() {
766 dataflow_metainfo
767 .index_usage_types
768 .insert(*index_id, new_usage_types);
769 }
770 }
771
772 // Prune index imports to only those that are used
773 dataflow
774 .index_imports
775 .retain(|id, _index_import| dataflow_metainfo.index_usage_types.contains_key(id));
776
777 // Determine AccessStrategy::SameDataflow accesses. These were classified as
778 // AccessStrategy::Persist inside collect_index_reqs, so now we check these, and if the id is of
779 // a collection that we are building ourselves, then we adjust the access strategy.
780 let mut objects_to_build_ids = BTreeSet::new();
781 for BuildDesc { id, plan: _ } in dataflow.objects_to_build.iter() {
782 objects_to_build_ids.insert(id.clone());
783 }
784 for build_desc in dataflow.objects_to_build.iter_mut() {
785 build_desc
786 .plan
787 .as_inner_mut()
788 .visit_pre_mut(|expr: &mut MirRelationExpr| match expr {
789 MirRelationExpr::Get {
790 id: Id::Global(global_id),
791 typ: _,
792 access_strategy,
793 } => match access_strategy {
794 AccessStrategy::Persist => {
795 if objects_to_build_ids.contains(global_id) {
796 *access_strategy = AccessStrategy::SameDataflow;
797 }
798 }
799 _ => {}
800 },
801 _ => {}
802 });
803 }
804
805 // A sanity check that all Get annotations indicate indexes that are present in `index_imports`.
806 for build_desc in dataflow.objects_to_build.iter() {
807 build_desc
808 .plan
809 .as_inner()
810 .visit_pre(|expr: &MirRelationExpr| match expr {
811 MirRelationExpr::Get {
812 id: Id::Global(_),
813 typ: _,
814 access_strategy: AccessStrategy::Index(accesses),
815 } => {
816 for (idx_id, _) in accesses {
817 soft_assert_or_log!(
818 dataflow.index_imports.contains_key(idx_id),
819 "Dangling Get index annotation"
820 );
821 }
822 }
823 _ => {}
824 });
825 }
826
827 mz_repr::explain::trace_plan(dataflow);
828
829 Ok(())
830}
831
832/// Pick an index from a given Vec of index keys.
833///
834/// Currently, we pick as follows:
835/// - If there is an index on a unique key, then we pick that. (It might be better distributed, and
836/// is less likely to get dropped than other indexes.)
837/// - Otherwise, we pick an arbitrary index.
838///
839/// TODO: There are various edge cases where a better choice would be possible:
840/// - Some indexes might be less skewed than others. (Although, picking a unique key tries to
841/// capture this already.)
842/// - Some indexes might have an error, while others don't.
843/// <https://github.com/MaterializeInc/database-issues/issues/4455>
844/// - Some indexes might have more extra data in their keys (because of being on more complicated
845/// expressions than just column references), which won't be used in a full scan.
846fn choose_index(
847 source_keys: &BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
848 id: &GlobalId,
849 indexes: &Vec<(GlobalId, Vec<MirScalarExpr>)>,
850) -> Option<(GlobalId, Vec<MirScalarExpr>)> {
851 match source_keys.get(id) {
852 None => indexes.iter().next().cloned(), // pick an arbitrary index
853 Some(coll_keys) => match indexes
854 .iter()
855 .find(|(_idx_id, key)| coll_keys.contains(&*key))
856 {
857 Some((idx_id, key)) => Some((*idx_id, key.clone())),
858 None => indexes.iter().next().cloned(), // pick an arbitrary index
859 },
860 }
861}
862
863#[derive(Debug)]
864struct CollectIndexRequests<'a> {
865 /// We were told about these unique keys on sources.
866 source_keys: &'a BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
867 /// We were told about these indexes being available.
868 indexes_available: &'a dyn IndexOracle,
869 /// We'll be collecting index requests here.
870 index_reqs_by_id:
871 &'a mut BTreeMap<GlobalId, Vec<(GlobalId, Vec<MirScalarExpr>, IndexUsageType)>>,
872 /// As we recurse down a MirRelationExpr, we'll need to keep track of the context of the
873 /// current node (see docs on `IndexUsageContext` about what context we keep).
874 /// Moreover, we need to propagate this context from cte uses to cte definitions.
875 /// `context_across_lets` will keep track of the contexts that reached each use of a LocalId
876 /// added together.
877 context_across_lets: BTreeMap<LocalId, Vec<IndexUsageContext>>,
878 recursion_guard: RecursionGuard,
879}
880
881impl<'a> CheckedRecursion for CollectIndexRequests<'a> {
882 fn recursion_guard(&self) -> &RecursionGuard {
883 &self.recursion_guard
884 }
885}
886
887impl<'a> CollectIndexRequests<'a> {
888 fn new(
889 source_keys: &'a BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
890 indexes_available: &'a dyn IndexOracle,
891 index_reqs_by_id: &'a mut BTreeMap<
892 GlobalId,
893 Vec<(GlobalId, Vec<MirScalarExpr>, IndexUsageType)>,
894 >,
895 ) -> CollectIndexRequests<'a> {
896 CollectIndexRequests {
897 source_keys,
898 indexes_available,
899 index_reqs_by_id,
900 context_across_lets: BTreeMap::new(),
901 recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
902 }
903 }
904
905 pub fn collect_index_reqs(
906 &mut self,
907 expr: &mut MirRelationExpr,
908 ) -> Result<(), RecursionLimitError> {
909 assert!(self.context_across_lets.is_empty());
910 self.collect_index_reqs_inner(
911 expr,
912 &IndexUsageContext::from_usage_type(IndexUsageType::PlanRootNoArrangement),
913 )?;
914 assert!(self.context_across_lets.is_empty());
915 // Sanity check that we don't have any `DeltaJoinIndexUsageType::Unknown` remaining.
916 for (_id, index_reqs) in self.index_reqs_by_id.iter() {
917 for (_, _, index_usage_type) in index_reqs {
918 soft_assert_or_log!(
919 !matches!(
920 index_usage_type,
921 IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Unknown)
922 ),
923 "Delta join Unknown index usage remained"
924 );
925 }
926 }
927 Ok(())
928 }
929
930 fn collect_index_reqs_inner(
931 &mut self,
932 expr: &mut MirRelationExpr,
933 contexts: &Vec<IndexUsageContext>,
934 ) -> Result<(), RecursionLimitError> {
935 self.checked_recur_mut(|this| {
936 // If an index exists on `on_id`, this function picks an index to be fully scanned.
937 let pick_index_for_full_scan = |on_id: &GlobalId| {
938 // Note that the choice we make here might be modified later at the
939 // "Adjust FullScans to not introduce a new index dependency".
940 choose_index(
941 this.source_keys,
942 on_id,
943 &this
944 .indexes_available
945 .indexes_on(*on_id)
946 .map(|(idx_id, key)| (idx_id, key.iter().cloned().collect_vec()))
947 .collect_vec(),
948 )
949 };
950
951 // See comment on `IndexUsageContext`.
952 Ok(match expr {
953 MirRelationExpr::Join {
954 inputs,
955 implementation,
956 ..
957 } => {
958 match implementation {
959 JoinImplementation::Differential(..) => {
960 for input in inputs {
961 this.collect_index_reqs_inner(
962 input,
963 &IndexUsageContext::from_usage_type(
964 IndexUsageType::DifferentialJoin,
965 ),
966 )?;
967 }
968 }
969 JoinImplementation::DeltaQuery(..) => {
970 // For Delta joins, the first input is special, see
971 // https://github.com/MaterializeInc/database-issues/issues/2115
972 this.collect_index_reqs_inner(
973 &mut inputs[0],
974 &IndexUsageContext::from_usage_type(IndexUsageType::DeltaJoin(
975 DeltaJoinIndexUsageType::Unknown,
976 )),
977 )?;
978 for input in &mut inputs[1..] {
979 this.collect_index_reqs_inner(
980 input,
981 &IndexUsageContext::from_usage_type(IndexUsageType::DeltaJoin(
982 DeltaJoinIndexUsageType::Lookup,
983 )),
984 )?;
985 }
986 }
987 JoinImplementation::IndexedFilter(_coll_id, idx_id, ..) => {
988 for input in inputs {
989 this.collect_index_reqs_inner(
990 input,
991 &IndexUsageContext::from_usage_type(IndexUsageType::Lookup(
992 *idx_id,
993 )),
994 )?;
995 }
996 }
997 JoinImplementation::Unimplemented => {
998 soft_panic_or_log!(
999 "CollectIndexRequests encountered an Unimplemented join"
1000 );
1001 }
1002 }
1003 }
1004 MirRelationExpr::ArrangeBy { input, keys } => {
1005 let ctx = &IndexUsageContext::add_keys(contexts, keys);
1006 this.collect_index_reqs_inner(input, ctx)?;
1007 }
1008 MirRelationExpr::Get {
1009 id: Id::Global(global_id),
1010 access_strategy: persist_or_index,
1011 ..
1012 } => {
1013 this.index_reqs_by_id
1014 .entry(*global_id)
1015 .or_insert_with(Vec::new);
1016 // If the context is empty, it means we didn't see an operator that would
1017 // specifically want to use an index for this Get. However, let's still try to
1018 // find an index for a full scan.
1019 let mut try_full_scan = contexts.is_empty();
1020 let mut index_accesses = Vec::new();
1021 for context in contexts {
1022 match &context.requested_keys {
1023 None => {
1024 // We have some index usage context, but didn't see an `ArrangeBy`.
1025 try_full_scan = true;
1026 match context.usage_type {
1027 IndexUsageType::FullScan
1028 | IndexUsageType::SinkExport
1029 | IndexUsageType::IndexExport => {
1030 // Not possible, because these don't go through
1031 // IndexUsageContext at all.
1032 unreachable!()
1033 }
1034 // You can find more info on why the following join cases
1035 // shouldn't happen in comments of the Join lowering to LIR.
1036 IndexUsageType::Lookup(_) => soft_panic_or_log!(
1037 "CollectIndexRequests encountered \
1038 an IndexedFilter join without an ArrangeBy"
1039 ),
1040 IndexUsageType::DifferentialJoin => soft_panic_or_log!(
1041 "CollectIndexRequests encountered \
1042 a Differential join without an ArrangeBy"
1043 ),
1044 IndexUsageType::DeltaJoin(_) => soft_panic_or_log!(
1045 "CollectIndexRequests encountered \
1046 a Delta join without an ArrangeBy"
1047 ),
1048 IndexUsageType::PlanRootNoArrangement => {
1049 // This is ok: the entire plan is a `Get`, with not even an
1050 // `ArrangeBy`. Note that if an index exists, the usage will
1051 // be saved as `FullScan` (NOT as `PlanRootNoArrangement`),
1052 // because we are going into the `try_full_scan` if.
1053 }
1054 IndexUsageType::FastPathLimit => {
1055 // These are created much later, not even inside
1056 // `prune_and_annotate_dataflow_index_imports`.
1057 unreachable!()
1058 }
1059 IndexUsageType::DanglingArrangeBy => {
1060 // Not possible, because we create `DanglingArrangeBy`
1061 // only when we see an `ArrangeBy`.
1062 unreachable!()
1063 }
1064 IndexUsageType::Unknown => {
1065 // These are added only after `CollectIndexRequests` has run.
1066 unreachable!()
1067 }
1068 }
1069 }
1070 Some(requested_keys) => {
1071 for requested_key in requested_keys {
1072 match this.indexes_available.indexes_on(*global_id).find(
1073 |(available_idx_id, available_key)| {
1074 match context.usage_type {
1075 IndexUsageType::Lookup(req_idx_id) => {
1076 // `LiteralConstraints` already picked an index
1077 // by id. Let's use that one.
1078 assert!(
1079 !(available_idx_id == &req_idx_id
1080 && available_key != &requested_key)
1081 );
1082 available_idx_id == &req_idx_id
1083 }
1084 _ => available_key == &requested_key,
1085 }
1086 },
1087 ) {
1088 Some((idx_id, key)) => {
1089 let usage = context.usage_type.clone();
1090 this.index_reqs_by_id
1091 .get_mut(global_id)
1092 .unwrap()
1093 .push((idx_id, key.to_owned(), usage.clone()));
1094 index_accesses.push((idx_id, usage));
1095 }
1096 None => {
1097 // If there is a key requested for which we don't have an
1098 // index, then we might still be able to do a full scan of a
1099 // differently keyed index.
1100 try_full_scan = true;
1101 }
1102 }
1103 }
1104 if requested_keys.is_empty() {
1105 // It's a bit weird if an MIR ArrangeBy is not requesting any
1106 // key, but let's try a full scan in that case anyhow.
1107 try_full_scan = true;
1108 }
1109 }
1110 }
1111 }
1112 if try_full_scan {
1113 // Keep in mind that when having 2 contexts coming from 2 uses of a Let,
1114 // this code can't distinguish between the case when there is 1 ArrangeBy at the
1115 // top of the Let, or when the 2 uses each have an `ArrangeBy`. In both cases,
1116 // we'll add only 1 full scan, which would be wrong in the latter case. However,
1117 // the latter case can't currently happen until we do
1118 // https://github.com/MaterializeInc/database-issues/issues/6363
1119 // Also note that currently we are deduplicating index usage types when
1120 // printing index usages in EXPLAIN.
1121 if let Some((idx_id, key)) = pick_index_for_full_scan(global_id) {
1122 this.index_reqs_by_id.get_mut(global_id).unwrap().push((
1123 idx_id,
1124 key.to_owned(),
1125 IndexUsageType::FullScan,
1126 ));
1127 index_accesses.push((idx_id, IndexUsageType::FullScan));
1128 }
1129 }
1130 if index_accesses.is_empty() {
1131 *persist_or_index = AccessStrategy::Persist;
1132 } else {
1133 *persist_or_index = AccessStrategy::Index(index_accesses);
1134 }
1135 }
1136 MirRelationExpr::Get {
1137 id: Id::Local(local_id),
1138 ..
1139 } => {
1140 // Add the current context to the vector of contexts of `local_id`.
1141 // (The unwrap is safe, because the Let and LetRec cases start with inserting an
1142 // empty entry.)
1143 this.context_across_lets
1144 .get_mut(local_id)
1145 .unwrap()
1146 .extend(contexts.iter().cloned());
1147 // No recursive call here, because Get has no inputs.
1148 }
1149 MirRelationExpr::Let { id, value, body } => {
1150 let shadowed_context = this.context_across_lets.insert(id.clone(), Vec::new());
1151 // No shadowing in MIR
1152 assert_none!(shadowed_context);
1153 // We go backwards: Recurse on the body and then the value.
1154 this.collect_index_reqs_inner(body, contexts)?;
1155 // The above call filled in the entry for `id` in `context_across_lets` (if it
1156 // was referenced). Anyhow, at least an empty entry should exist, because we started
1157 // above with inserting it.
1158 this.collect_index_reqs_inner(value, &this.context_across_lets[id].clone())?;
1159 // Clean up the id from the saved contexts.
1160 this.context_across_lets.remove(id);
1161 }
1162 MirRelationExpr::LetRec {
1163 ids,
1164 values,
1165 limits: _,
1166 body,
1167 } => {
1168 for id in ids.iter() {
1169 let shadowed_context =
1170 this.context_across_lets.insert(id.clone(), Vec::new());
1171 // No shadowing in MIR
1172 assert_none!(shadowed_context);
1173 }
1174 // We go backwards: Recurse on the body first.
1175 this.collect_index_reqs_inner(body, contexts)?;
1176 // Reset the contexts of the ids (of the current LetRec), because an arrangement
1177 // from a value can't be used in the body.
1178 for id in ids.iter() {
1179 *this.context_across_lets.get_mut(id).unwrap() = Vec::new();
1180 }
1181 // Recurse on the values in reverse order.
1182 // Note that we do only one pass, i.e., we won't see context through a Get that
1183 // refers to the previous iteration. But this is ok, because we can't reuse
1184 // arrangements across iterations anyway.
1185 for (id, value) in ids.iter().rev().zip_eq(values.iter_mut().rev()) {
1186 this.collect_index_reqs_inner(
1187 value,
1188 &this.context_across_lets[id].clone(),
1189 )?;
1190 }
1191 // Clean up the ids from the saved contexts.
1192 for id in ids {
1193 this.context_across_lets.remove(id);
1194 }
1195 }
1196 _ => {
1197 // Nothing interesting at this node, recurse with the empty context (regardless of
1198 // what context we got from above).
1199 let empty_context = Vec::new();
1200 for child in expr.children_mut() {
1201 this.collect_index_reqs_inner(child, &empty_context)?;
1202 }
1203 }
1204 })
1205 })
1206 }
1207}
1208
1209/// This struct will save info about parent nodes as we are descending down a `MirRelationExpr`.
1210/// We always start with filling in `usage_type` when we see an operation that uses an arrangement,
1211/// and then we fill in `requested_keys` when we see an `ArrangeBy`. So, the pattern that we are
1212/// looking for is
1213/// ```text
1214/// <operation that uses an index>
1215/// ArrangeBy <requested_keys>
1216/// Get <global_id>
1217/// ```
1218/// When we reach a `Get` to a global id, we access this context struct to see if the rest of the
1219/// pattern is present above the `Get`.
1220///
1221/// Note that we usually put this struct in a Vec, because we track context across local let
1222/// bindings, which means that a node can have multiple parents.
1223#[derive(Debug, Clone)]
1224struct IndexUsageContext {
1225 usage_type: IndexUsageType,
1226 requested_keys: Option<BTreeSet<Vec<MirScalarExpr>>>,
1227}
1228
1229impl IndexUsageContext {
1230 pub fn from_usage_type(usage_type: IndexUsageType) -> Vec<Self> {
1231 vec![IndexUsageContext {
1232 usage_type,
1233 requested_keys: None,
1234 }]
1235 }
1236
1237 // Add the keys of an ArrangeBy into the contexts.
1238 // Soft_panics if haven't already seen something that indicates what the index will be used for.
1239 pub fn add_keys(
1240 old_contexts: &Vec<IndexUsageContext>,
1241 keys_to_add: &Vec<Vec<MirScalarExpr>>,
1242 ) -> Vec<IndexUsageContext> {
1243 let old_contexts = if old_contexts.is_empty() {
1244 // No join above us, and we are not at the root. Why does this ArrangeBy even exist?
1245 soft_panic_or_log!("CollectIndexRequests encountered a dangling ArrangeBy");
1246 // Anyhow, let's create a context with a `DanglingArrangeBy` index usage, so that we
1247 // have a place to note down the requested keys below.
1248 IndexUsageContext::from_usage_type(IndexUsageType::DanglingArrangeBy)
1249 } else {
1250 old_contexts.clone()
1251 };
1252 old_contexts
1253 .into_iter()
1254 .flat_map(|old_context| {
1255 if !matches!(
1256 old_context.usage_type,
1257 IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Unknown)
1258 ) {
1259 // If it's not an unknown delta join usage, then we simply note down the new
1260 // keys into `requested_keys`.
1261 let mut context = old_context.clone();
1262 if context.requested_keys.is_none() {
1263 context.requested_keys = Some(BTreeSet::new());
1264 }
1265 context
1266 .requested_keys
1267 .as_mut()
1268 .unwrap()
1269 .extend(keys_to_add.iter().cloned());
1270 Some(context).into_iter().chain(None)
1271 } else {
1272 // If it's an unknown delta join usage, then we need to figure out whether this
1273 // is a full scan or a lookup.
1274 //
1275 // `source_key` in `DeltaPathPlan` determines which arrangement we are going to
1276 // scan when starting the rendering of a delta path. This is the one for which
1277 // we want a `DeltaJoinIndexUsageType::FirstInputFullScan`.
1278 //
1279 // However, `DeltaPathPlan` is an LIR concept, and here we need to figure out
1280 // the `source_key` based on the MIR plan. We do this by doing the same as
1281 // `DeltaJoinPlan::create_from`: choose the smallest key (by `Ord`).
1282 let source_key = keys_to_add
1283 .iter()
1284 .min()
1285 .expect("ArrangeBy below a delta join has at least one key");
1286 let full_scan_context = IndexUsageContext {
1287 requested_keys: Some(BTreeSet::from([source_key.clone()])),
1288 usage_type: IndexUsageType::DeltaJoin(
1289 DeltaJoinIndexUsageType::FirstInputFullScan,
1290 ),
1291 };
1292 let lookup_keys = keys_to_add
1293 .into_iter()
1294 .filter(|key| *key != source_key)
1295 .cloned()
1296 .collect_vec();
1297 if lookup_keys.is_empty() {
1298 Some(full_scan_context).into_iter().chain(None)
1299 } else {
1300 let lookup_context = IndexUsageContext {
1301 requested_keys: Some(lookup_keys.into_iter().collect()),
1302 usage_type: IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Lookup),
1303 };
1304 Some(full_scan_context)
1305 .into_iter()
1306 .chain(Some(lookup_context))
1307 }
1308 }
1309 })
1310 .collect()
1311 }
1312}
1313
1314/// Extra information about the dataflow. This is not going to be shipped, but has to be processed
1315/// in other ways, e.g., showing notices to the user, or saving meta-information to the catalog.
1316#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1317#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
1318pub struct DataflowMetainfo<Notice = RawOptimizerNotice> {
1319 /// Notices that the optimizer wants to show to users.
1320 /// For pushing a new element, use [`Self::push_optimizer_notice_dedup`].
1321 pub optimizer_notices: Vec<Notice>,
1322 /// What kind of operation (full scan, lookup, ...) will access each index. Computed by
1323 /// `prune_and_annotate_dataflow_index_imports`.
1324 pub index_usage_types: BTreeMap<GlobalId, Vec<IndexUsageType>>,
1325}
1326
1327impl<Notice> Default for DataflowMetainfo<Notice> {
1328 fn default() -> Self {
1329 DataflowMetainfo {
1330 optimizer_notices: Vec::new(),
1331 index_usage_types: BTreeMap::new(),
1332 }
1333 }
1334}
1335
1336impl<Notice> DataflowMetainfo<Notice> {
1337 /// Create a [`UsedIndexes`] instance by resolving each `id` in the
1338 /// `index_ids` iterator against an entry expected to exist in the
1339 /// [`DataflowMetainfo::index_usage_types`].
1340 pub fn used_indexes<T>(&self, df_desc: &DataflowDescription<T>) -> UsedIndexes {
1341 UsedIndexes::new(
1342 df_desc
1343 .index_imports
1344 .iter()
1345 .map(|(id, _)| {
1346 let entry = self.index_usage_types.get(id).cloned();
1347 // If an entry does not exist, mark the usage type for this
1348 // index as `Unknown`.
1349 //
1350 // This should never happen if this method is called after
1351 // running `prune_and_annotate_dataflow_index_imports` on
1352 // the dataflow (this happens at the end of the
1353 // `optimize_dataflow` call).
1354 let index_usage_type = entry.unwrap_or_else(|| vec![IndexUsageType::Unknown]);
1355
1356 (*id, index_usage_type)
1357 })
1358 .collect(),
1359 )
1360 }
1361}
1362
1363impl DataflowMetainfo<RawOptimizerNotice> {
1364 /// Pushes a [`RawOptimizerNotice`] into [`Self::optimizer_notices`], but
1365 /// only if the exact same notice is not already present.
1366 pub fn push_optimizer_notice_dedup<T>(&mut self, notice: T)
1367 where
1368 T: Into<RawOptimizerNotice>,
1369 {
1370 let notice = notice.into();
1371 if !self.optimizer_notices.contains(¬ice) {
1372 self.optimizer_notices.push(notice);
1373 }
1374 }
1375}