mz_transform/dataflow.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Whole-dataflow optimization
11//!
12//! A dataflow may contain multiple views, each of which may only be
13//! optimized locally. However, information like demand and predicate
14//! pushdown can be applied across views once we understand the context
15//! in which the views will be executed.
16
17use std::collections::{BTreeMap, BTreeSet};
18
19use itertools::Itertools;
20use mz_compute_types::dataflows::{BuildDesc, DataflowDesc, DataflowDescription, IndexImport};
21use mz_expr::{
22 AccessStrategy, CollectionPlan, Id, JoinImplementation, LocalId, MapFilterProject,
23 MirRelationExpr, MirScalarExpr, RECURSION_LIMIT,
24};
25use mz_ore::stack::{CheckedRecursion, RecursionGuard, RecursionLimitError};
26use mz_ore::{assert_none, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
27use mz_repr::GlobalId;
28use mz_repr::explain::{DeltaJoinIndexUsageType, IndexUsageType, UsedIndexes};
29use proptest_derive::Arbitrary;
30use serde::{Deserialize, Serialize};
31
32use crate::monotonic::MonotonicFlag;
33use crate::notice::RawOptimizerNotice;
34use crate::{IndexOracle, Optimizer, TransformCtx, TransformError};
35
36/// Optimizes the implementation of each dataflow.
37///
38/// Inlines views, performs a full optimization pass including physical
39/// planning using the supplied indexes, propagates filtering and projection
40/// information to dataflow sources and lifts monotonicity information.
41#[mz_ore::instrument(
42 target = "optimizer",
43 level = "debug",
44 fields(path.segment ="global")
45)]
46pub fn optimize_dataflow(
47 dataflow: &mut DataflowDesc,
48 transform_ctx: &mut TransformCtx,
49 fast_path_optimizer: bool,
50) -> Result<(), TransformError> {
51 // Inline views that are used in only one other view.
52 inline_views(dataflow)?;
53
54 if fast_path_optimizer {
55 optimize_dataflow_relations(
56 dataflow,
57 &Optimizer::fast_path_optimizer(transform_ctx),
58 transform_ctx,
59 )?;
60 } else {
61 // Logical optimization pass after view inlining
62 optimize_dataflow_relations(
63 dataflow,
64 #[allow(deprecated)]
65 &Optimizer::logical_optimizer(transform_ctx),
66 transform_ctx,
67 )?;
68
69 optimize_dataflow_filters(dataflow)?;
70 // TODO: when the linear operator contract ensures that propagated
71 // predicates are always applied, projections and filters can be removed
72 // from where they come from. Once projections and filters can be removed,
73 // TODO: it would be useful for demand to be optimized after filters
74 // that way demand only includes the columns that are still necessary after
75 // the filters are applied.
76 optimize_dataflow_demand(dataflow)?;
77
78 // A smaller logical optimization pass after projections and filters are
79 // pushed down across views.
80 optimize_dataflow_relations(
81 dataflow,
82 &Optimizer::logical_cleanup_pass(transform_ctx, false),
83 transform_ctx,
84 )?;
85
86 // Physical optimization pass
87 optimize_dataflow_relations(
88 dataflow,
89 &Optimizer::physical_optimizer(transform_ctx),
90 transform_ctx,
91 )?;
92
93 optimize_dataflow_monotonic(dataflow, transform_ctx)?;
94 }
95
96 prune_and_annotate_dataflow_index_imports(
97 dataflow,
98 transform_ctx.indexes,
99 transform_ctx.df_meta,
100 )?;
101
102 // Warning: If you want to add a transform call here, consider it very carefully whether it
103 // could accidentally invalidate information that we already derived above in
104 // `optimize_dataflow_monotonic` or `prune_and_annotate_dataflow_index_imports`.
105
106 mz_repr::explain::trace_plan(dataflow);
107
108 Ok(())
109}
110
111/// Inline views used in one other view, and in no exported objects.
112#[mz_ore::instrument(
113 target = "optimizer",
114 level = "debug",
115 fields(path.segment = "inline_views")
116)]
117fn inline_views(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
118 // We cannot inline anything whose `BuildDesc::id` appears in either the
119 // `index_exports` or `sink_exports` of `dataflow`, because we lose our
120 // ability to name it.
121
122 // A view can / should be in-lined in another view if it is only used by
123 // one subsequent view. If there are two distinct views that have not
124 // themselves been merged, then too bad and it doesn't get inlined.
125
126 // Starting from the *last* object to build, walk backwards and inline
127 // any view that is neither referenced by a `index_exports` nor
128 // `sink_exports` nor more than two remaining objects to build.
129
130 for index in (0..dataflow.objects_to_build.len()).rev() {
131 // Capture the name used by others to reference this view.
132 let global_id = dataflow.objects_to_build[index].id;
133 // Determine if any exports directly reference this view.
134 let mut occurs_in_export = false;
135 for (_gid, sink_desc) in dataflow.sink_exports.iter() {
136 if sink_desc.from == global_id {
137 occurs_in_export = true;
138 }
139 }
140 for (_, (index_desc, _)) in dataflow.index_exports.iter() {
141 if index_desc.on_id == global_id {
142 occurs_in_export = true;
143 }
144 }
145 // Count the number of subsequent views that reference this view.
146 let mut occurrences_in_later_views = Vec::new();
147 for other in (index + 1)..dataflow.objects_to_build.len() {
148 if dataflow.objects_to_build[other]
149 .plan
150 .depends_on()
151 .contains(&global_id)
152 {
153 occurrences_in_later_views.push(other);
154 }
155 }
156 // Inline if the view is referenced in one view and no exports.
157 if !occurs_in_export && occurrences_in_later_views.len() == 1 {
158 let other = occurrences_in_later_views[0];
159 // We can remove this view and insert it in the later view,
160 // but are not able to relocate the later view `other`.
161
162 // When splicing in the `index` view, we need to create disjoint
163 // identifiers for the Let's `body` and `value`, as well as a new
164 // identifier for the binding itself. Following `NormalizeLets`, we
165 // go with the binding first, then the value, then the body.
166 let mut id_gen = crate::IdGen::default();
167 let new_local = LocalId::new(id_gen.allocate_id());
168 // Use the same `id_gen` to assign new identifiers to `index`.
169 crate::normalize_lets::renumber_bindings(
170 dataflow.objects_to_build[index].plan.as_inner_mut(),
171 &mut id_gen,
172 )?;
173 // Assign new identifiers to the other relation.
174 crate::normalize_lets::renumber_bindings(
175 dataflow.objects_to_build[other].plan.as_inner_mut(),
176 &mut id_gen,
177 )?;
178 // Install the `new_local` name wherever `global_id` was used.
179 dataflow.objects_to_build[other]
180 .plan
181 .as_inner_mut()
182 .visit_pre_mut(|expr| {
183 if let MirRelationExpr::Get { id, .. } = expr {
184 if id == &Id::Global(global_id) {
185 *id = Id::Local(new_local);
186 }
187 }
188 });
189
190 // With identifiers rewritten, we can replace `other` with
191 // a `MirRelationExpr::Let` binding, whose value is `index` and
192 // whose body is `other`.
193 let body = dataflow.objects_to_build[other]
194 .plan
195 .as_inner_mut()
196 .take_dangerous();
197 let value = dataflow.objects_to_build[index]
198 .plan
199 .as_inner_mut()
200 .take_dangerous();
201 *dataflow.objects_to_build[other].plan.as_inner_mut() = MirRelationExpr::Let {
202 id: new_local,
203 value: Box::new(value),
204 body: Box::new(body),
205 };
206 dataflow.objects_to_build.remove(index);
207 }
208 }
209
210 mz_repr::explain::trace_plan(dataflow);
211
212 Ok(())
213}
214
215/// Performs either the logical or the physical optimization pass on the
216/// dataflow using the supplied set of indexes.
217#[mz_ore::instrument(
218 target = "optimizer",
219 level = "debug",
220 fields(path.segment = optimizer.name)
221)]
222fn optimize_dataflow_relations(
223 dataflow: &mut DataflowDesc,
224 optimizer: &Optimizer,
225 ctx: &mut TransformCtx,
226) -> Result<(), TransformError> {
227 // Re-optimize each dataflow
228 for object in dataflow.objects_to_build.iter_mut() {
229 // Re-run all optimizations on the composite views.
230 ctx.set_global_id(object.id);
231 optimizer.transform(object.plan.as_inner_mut(), ctx)?;
232 ctx.reset_global_id();
233 }
234
235 mz_repr::explain::trace_plan(dataflow);
236
237 Ok(())
238}
239
240/// Pushes demand information from published outputs to dataflow inputs,
241/// projecting away unnecessary columns.
242///
243/// Dataflows that exist for the sake of generating plan explanations do not
244/// have published outputs. In this case, we push demand information from views
245/// not depended on by other views to dataflow inputs.
246#[mz_ore::instrument(
247 target = "optimizer",
248 level = "debug",
249 fields(path.segment ="demand")
250)]
251fn optimize_dataflow_demand(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
252 // Maps id -> union of known columns demanded from the source/view with the
253 // corresponding id.
254 let mut demand = BTreeMap::new();
255
256 if dataflow.index_exports.is_empty() && dataflow.sink_exports.is_empty() {
257 // In the absence of any exports, just demand all columns from views
258 // that are not depended on by another view, which is currently the last
259 // object in `objects_to_build`.
260
261 // A DataflowDesc without exports is currently created in the context of
262 // EXPLAIN outputs. This ensures that the output has all the columns of
263 // the original explainee.
264 if let Some(build_desc) = dataflow.objects_to_build.iter_mut().rev().next() {
265 demand
266 .entry(Id::Global(build_desc.id))
267 .or_insert_with(BTreeSet::new)
268 .extend(0..build_desc.plan.as_inner_mut().arity());
269 }
270 } else {
271 // Demand all columns of inputs to sinks.
272 for (_id, sink) in dataflow.sink_exports.iter() {
273 let input_id = sink.from;
274 demand
275 .entry(Id::Global(input_id))
276 .or_insert_with(BTreeSet::new)
277 .extend(0..dataflow.arity_of(&input_id));
278 }
279
280 // Demand all columns of inputs to exported indexes.
281 for (_id, (desc, _typ)) in dataflow.index_exports.iter() {
282 let input_id = desc.on_id;
283 demand
284 .entry(Id::Global(input_id))
285 .or_insert_with(BTreeSet::new)
286 .extend(0..dataflow.arity_of(&input_id));
287 }
288 }
289
290 optimize_dataflow_demand_inner(
291 dataflow
292 .objects_to_build
293 .iter_mut()
294 .rev()
295 .map(|build_desc| (Id::Global(build_desc.id), build_desc.plan.as_inner_mut())),
296 &mut demand,
297 )?;
298
299 mz_repr::explain::trace_plan(dataflow);
300
301 Ok(())
302}
303
304/// Pushes demand through views in `view_sequence` in order, removing
305/// columns not demanded.
306///
307/// This method is made public for the sake of testing.
308/// TODO: make this private once we allow multiple exports per dataflow.
309pub fn optimize_dataflow_demand_inner<'a, I>(
310 view_sequence: I,
311 demand: &mut BTreeMap<Id, BTreeSet<usize>>,
312) -> Result<(), TransformError>
313where
314 I: Iterator<Item = (Id, &'a mut MirRelationExpr)>,
315{
316 // Maps id -> The projection that was pushed down on the view with the
317 // corresponding id.
318 let mut applied_projection = BTreeMap::new();
319 // Collect the mutable references to views after pushing projection down
320 // in order to run cleanup actions on them in a second loop.
321 let mut view_refs = Vec::new();
322 let projection_pushdown = crate::movement::ProjectionPushdown::default();
323 for (id, view) in view_sequence {
324 if let Some(columns) = demand.get(&id) {
325 let projection_pushed_down = columns.iter().map(|c| *c).collect();
326 // Push down the projection consisting of the entries of `columns`
327 // in increasing order.
328 projection_pushdown.action(view, &projection_pushed_down, demand)?;
329 let new_type = view.typ();
330 applied_projection.insert(id, (projection_pushed_down, new_type));
331 }
332 view_refs.push(view);
333 }
334
335 for view in view_refs {
336 // Update `Get` nodes to reflect any columns that have been projected away.
337 projection_pushdown.update_projection_around_get(view, &applied_projection);
338 }
339
340 Ok(())
341}
342
343/// Pushes predicate to dataflow inputs.
344#[mz_ore::instrument(
345 target = "optimizer",
346 level = "debug",
347 fields(path.segment ="filters")
348)]
349fn optimize_dataflow_filters(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
350 // Contains id -> predicates map, describing those predicates that
351 // can (but need not) be applied to the collection named by `id`.
352 let mut predicates = BTreeMap::<Id, BTreeSet<mz_expr::MirScalarExpr>>::new();
353
354 // Propagate predicate information from outputs to inputs.
355 optimize_dataflow_filters_inner(
356 dataflow
357 .objects_to_build
358 .iter_mut()
359 .rev()
360 .map(|build_desc| (Id::Global(build_desc.id), build_desc.plan.as_inner_mut())),
361 &mut predicates,
362 )?;
363
364 // Push predicate information into the SourceDesc.
365 for (source_id, source_import) in dataflow.source_imports.iter_mut() {
366 let source = &mut source_import.desc;
367 if let Some(list) = predicates.remove(&Id::Global(*source_id)) {
368 if !list.is_empty() {
369 // Canonicalize the order of predicates, for stable plans.
370 let mut list = list.into_iter().collect::<Vec<_>>();
371 list.sort();
372 // Install no-op predicate information if none exists.
373 if source.arguments.operators.is_none() {
374 source.arguments.operators = Some(MapFilterProject::new(source.typ.arity()));
375 }
376 // Add any predicates that can be pushed to the source.
377 if let Some(operator) = source.arguments.operators.take() {
378 source.arguments.operators = Some(operator.filter(list));
379 source.arguments.operators.as_mut().map(|x| x.optimize());
380 }
381 }
382 }
383 }
384
385 mz_repr::explain::trace_plan(dataflow);
386
387 Ok(())
388}
389
390/// Pushes filters down through views in `view_sequence` in order.
391///
392/// This method is made public for the sake of testing.
393/// TODO: make this private once we allow multiple exports per dataflow.
394pub fn optimize_dataflow_filters_inner<'a, I>(
395 view_iter: I,
396 predicates: &mut BTreeMap<Id, BTreeSet<mz_expr::MirScalarExpr>>,
397) -> Result<(), TransformError>
398where
399 I: Iterator<Item = (Id, &'a mut MirRelationExpr)>,
400{
401 let transform = crate::predicate_pushdown::PredicatePushdown::default();
402 for (id, view) in view_iter {
403 if let Some(list) = predicates.get(&id).clone() {
404 if !list.is_empty() {
405 *view = view.take_dangerous().filter(list.iter().cloned());
406 }
407 }
408 transform.action(view, predicates)?;
409 }
410 Ok(())
411}
412
413/// Propagates information about monotonic inputs through operators,
414/// using [`mz_repr::optimize::OptimizerFeatures`] from `ctx` for [`crate::analysis::Analysis`].
415#[mz_ore::instrument(
416 target = "optimizer",
417 level = "debug",
418 fields(path.segment ="monotonic")
419)]
420pub fn optimize_dataflow_monotonic(
421 dataflow: &mut DataflowDesc,
422 ctx: &mut TransformCtx,
423) -> Result<(), TransformError> {
424 let mut monotonic_ids = BTreeSet::new();
425 for (source_id, source_import) in dataflow.source_imports.iter() {
426 if source_import.monotonic {
427 monotonic_ids.insert(source_id.clone());
428 }
429 }
430 for (
431 _index_id,
432 IndexImport {
433 desc: index_desc,
434 monotonic,
435 ..
436 },
437 ) in dataflow.index_imports.iter()
438 {
439 if *monotonic {
440 monotonic_ids.insert(index_desc.on_id.clone());
441 }
442 }
443
444 let monotonic_flag = MonotonicFlag::default();
445
446 for build_desc in dataflow.objects_to_build.iter_mut() {
447 monotonic_flag.transform(build_desc.plan.as_inner_mut(), ctx, &monotonic_ids)?;
448 }
449
450 mz_repr::explain::trace_plan(dataflow);
451
452 Ok(())
453}
454
455/// Determine whether we require snapshots from our durable source imports.
456/// (For example, these can often be skipped for simple subscribe queries.)
457pub fn optimize_dataflow_snapshot(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
458 // For every global id, true iff we need a snapshot for that global ID.
459 // This is computed bottom-up: subscribes may or may not require a snapshot from their inputs,
460 // index exports definitely do, and objects-to-build require a snapshot from their inputs if
461 // either they need to provide a snapshot as output or they may need snapshots internally, eg. to
462 // compute a join.
463 let mut downstream_requires_snapshot = BTreeMap::new();
464
465 for (_id, export) in &dataflow.sink_exports {
466 *downstream_requires_snapshot
467 .entry(Id::Global(export.from))
468 .or_default() |= export.with_snapshot;
469 }
470 for (_id, (export, _typ)) in &dataflow.index_exports {
471 *downstream_requires_snapshot
472 .entry(Id::Global(export.on_id))
473 .or_default() |= true;
474 }
475 for BuildDesc { id: _, plan } in dataflow.objects_to_build.iter().rev() {
476 // For now, we treat all intermediate nodes as potentially requiring a snapshot.
477 // Walk the AST, marking anything depended on by a compute object as snapshot-required.
478 let mut todo = vec![(true, &plan.0)];
479 while let Some((requires_snapshot, expr)) = todo.pop() {
480 match expr {
481 MirRelationExpr::Get { id, .. } => {
482 *downstream_requires_snapshot.entry(*id).or_default() |= requires_snapshot;
483 }
484 other => {
485 todo.extend(other.children().rev().map(|c| (true, c)));
486 }
487 }
488 }
489 }
490 for (id, import) in &mut dataflow.source_imports {
491 let with_snapshot = downstream_requires_snapshot
492 .entry(Id::Global(*id))
493 .or_default();
494
495 // As above, fetch the snapshot if there are any transformations on the raw source data.
496 // (And we'll always need to check for things like temporal filters, since those allow
497 // snapshot data to affect diffs at times past the as-of.)
498 *with_snapshot |= import.desc.arguments.operators.is_some();
499
500 import.with_snapshot = *with_snapshot;
501 }
502 for (_id, import) in &mut dataflow.index_imports {
503 let with_snapshot = downstream_requires_snapshot
504 .entry(Id::Global(import.desc.on_id))
505 .or_default();
506
507 import.with_snapshot = *with_snapshot;
508 }
509
510 Ok(())
511}
512
513/// Restricts the indexes imported by `dataflow` to only the ones it needs.
514/// It also adds to the `DataflowMetainfo` how each index will be used.
515/// It also annotates global `Get`s with whether they will be reads from Persist or an index, plus
516/// their index usage types.
517///
518/// The input `dataflow` should import all indexes belonging to all views/sources/tables it
519/// references.
520///
521/// The input plans should be normalized with `NormalizeLets`! Otherwise, we might find dangling
522/// `ArrangeBy`s at the top of unused Let bindings.
523#[mz_ore::instrument(
524 target = "optimizer",
525 level = "debug",
526 fields(path.segment = "index_imports")
527)]
528fn prune_and_annotate_dataflow_index_imports(
529 dataflow: &mut DataflowDesc,
530 indexes: &dyn IndexOracle,
531 dataflow_metainfo: &mut DataflowMetainfo,
532) -> Result<(), TransformError> {
533 // Preparation.
534 // Let's save the unique keys of the sources. This will inform which indexes to choose for full
535 // scans. (We can't get this info from `source_imports`, because `source_imports` only has those
536 // sources that are not getting an indexed read.)
537 let mut source_keys = BTreeMap::new();
538 for build_desc in dataflow.objects_to_build.iter() {
539 build_desc
540 .plan
541 .as_inner()
542 .visit_pre(|expr: &MirRelationExpr| match expr {
543 MirRelationExpr::Get {
544 id: Id::Global(global_id),
545 typ,
546 ..
547 } => {
548 source_keys.entry(*global_id).or_insert_with(|| {
549 typ.keys
550 .iter()
551 .map(|key| {
552 key.iter()
553 // Convert the Vec<usize> key to Vec<MirScalarExpr>, so that
554 // later we can more easily compare index keys to these keys.
555 .map(|col_idx| MirScalarExpr::column(*col_idx))
556 .collect()
557 })
558 .collect()
559 });
560 }
561 _ => {}
562 });
563 }
564
565 // This will be a mapping of
566 // (ids used by exports and objects to build) ->
567 // (arrangement keys and usage types on that id that have been requested)
568 let mut index_reqs_by_id = BTreeMap::new();
569
570 // Go through the MIR plans of `objects_to_build` and collect which arrangements are requested
571 // for which we also have an available index.
572 for build_desc in dataflow.objects_to_build.iter_mut() {
573 CollectIndexRequests::new(&source_keys, indexes, &mut index_reqs_by_id)
574 .collect_index_reqs(build_desc.plan.as_inner_mut())?;
575 }
576
577 // Collect index usages by `sink_exports`.
578 // A sink export sometimes wants to directly use an imported index. I know of one case where
579 // this happens: The dataflow for a SUBSCRIBE on an indexed view won't have any
580 // `objects_to_build`, but will want to directly read from the index and write to a sink.
581 for (_sink_id, sink_desc) in dataflow.sink_exports.iter() {
582 // First, let's see if there exists an index on the id that the sink wants. If not, there is
583 // nothing we can do here.
584 if let Some((idx_id, arbitrary_idx_key)) = indexes.indexes_on(sink_desc.from).next() {
585 // If yes, then we'll add a request of _some_ index: If we already collected an index
586 // request on this id, then use that, otherwise use the above arbitrarily picked index.
587 let requested_idxs = index_reqs_by_id
588 .entry(sink_desc.from)
589 .or_insert_with(Vec::new);
590 if let Some((already_req_idx_id, already_req_key, _)) = requested_idxs.get(0) {
591 requested_idxs.push((
592 *already_req_idx_id,
593 already_req_key.clone(),
594 IndexUsageType::SinkExport,
595 ));
596 } else {
597 requested_idxs.push((
598 idx_id,
599 arbitrary_idx_key.to_owned(),
600 IndexUsageType::SinkExport,
601 ));
602 }
603 }
604 }
605
606 // Collect index usages by `index_exports`.
607 for (_id, (index_desc, _)) in dataflow.index_exports.iter() {
608 // First, let's see if there exists an index on the id that the exported index is on. If
609 // not, there is nothing we can do here.
610 if let Some((idx_id, arbitrary_index_key)) = indexes.indexes_on(index_desc.on_id).next() {
611 // If yes, then we'll add an index request of some index: If we already collected an
612 // index request on this id, then use that, otherwise use the above arbitrarily picked
613 // index.
614 let requested_idxs = index_reqs_by_id
615 .entry(index_desc.on_id)
616 .or_insert_with(Vec::new);
617 if let Some((already_req_idx_id, already_req_key, _)) = requested_idxs.get(0) {
618 requested_idxs.push((
619 *already_req_idx_id,
620 already_req_key.clone(),
621 IndexUsageType::IndexExport,
622 ));
623 } else {
624 // This is surprising: Actually, an index creation dataflow always has a plan in
625 // `objects_to_build` that will have a Get of the object that the index is on (see
626 // `DataflowDescription::export_index`). Therefore, we should have already requested
627 // an index usage when seeing that Get in `CollectIndexRequests`.
628 soft_panic_or_log!(
629 "We are seeing an index export on an id that's not mentioned in `objects_to_build`"
630 );
631 requested_idxs.push((
632 idx_id,
633 arbitrary_index_key.to_owned(),
634 IndexUsageType::IndexExport,
635 ));
636 }
637 }
638 }
639
640 // By now, `index_reqs_by_id` has all ids that we think might benefit from having an index on.
641 // Moreover, for each of these ids, if any index exists on it, then we should have already
642 // picked one. If not, then we have a bug somewhere. In that case, do a soft panic, and add an
643 // Unknown usage, picking an arbitrary index.
644 for (id, index_reqs) in index_reqs_by_id.iter_mut() {
645 if index_reqs.is_empty() {
646 // Try to pick an arbitrary index to be fully scanned.
647 if let Some((idx_id, key)) = indexes.indexes_on(*id).next() {
648 soft_panic_or_log!(
649 "prune_and_annotate_dataflow_index_imports didn't find any index for an id, even though one exists
650id: {}, key: {:?}",
651 id,
652 key
653 );
654 index_reqs.push((idx_id, key.to_owned(), IndexUsageType::Unknown));
655 }
656 }
657 }
658
659 // Adjust FullScans to not introduce a new index dependency if there is also some non-FullScan
660 // request on the same id.
661 // `full_scan_changes` saves the changes that we do: Each (Get id, index id) entry indicates
662 // that if a Get has that id, then any full scan index accesses on it should be changed to use
663 // the indicated index id.
664 let mut full_scan_changes = BTreeMap::new();
665 for (get_id, index_reqs) in index_reqs_by_id.iter_mut() {
666 // Let's choose a non-FullScan access (if exists).
667 if let Some((picked_idx, picked_idx_key)) = choose_index(
668 &source_keys,
669 get_id,
670 &index_reqs
671 .iter()
672 .filter_map(|(idx_id, key, usage_type)| match usage_type {
673 IndexUsageType::FullScan => None,
674 _ => Some((*idx_id, key.clone())),
675 })
676 .collect_vec(),
677 ) {
678 // Found a non-FullScan access. Modify all FullScans to use the same index as that one.
679 for (idx_id, key, usage_type) in index_reqs {
680 match usage_type {
681 IndexUsageType::FullScan => {
682 full_scan_changes.insert(get_id, picked_idx);
683 *idx_id = picked_idx;
684 key.clone_from(&picked_idx_key);
685 }
686 _ => {}
687 }
688 }
689 }
690 }
691 // Apply the above full scan changes to also the Gets.
692 for build_desc in dataflow.objects_to_build.iter_mut() {
693 build_desc
694 .plan
695 .as_inner_mut()
696 .visit_pre_mut(|expr: &mut MirRelationExpr| {
697 match expr {
698 MirRelationExpr::Get {
699 id: Id::Global(global_id),
700 typ: _,
701 access_strategy: persist_or_index,
702 } => {
703 if let Some(new_idx_id) = full_scan_changes.get(global_id) {
704 match persist_or_index {
705 AccessStrategy::UnknownOrLocal => {
706 // Should have been already filled by `collect_index_reqs`.
707 unreachable!()
708 }
709 AccessStrategy::Persist => {
710 // We already know that it's an indexed access.
711 unreachable!()
712 }
713 AccessStrategy::SameDataflow => {
714 // We have not added such annotations yet.
715 unreachable!()
716 }
717 AccessStrategy::Index(accesses) => {
718 for (idx_id, usage_type) in accesses {
719 if matches!(usage_type, IndexUsageType::FullScan) {
720 *idx_id = *new_idx_id;
721 }
722 }
723 }
724 }
725 }
726 }
727 _ => {}
728 }
729 });
730 }
731
732 // Annotate index imports by their usage types
733 dataflow_metainfo.index_usage_types = BTreeMap::new();
734 for (
735 index_id,
736 IndexImport {
737 desc: index_desc,
738 typ: _,
739 monotonic: _,
740 with_snapshot: _,
741 },
742 ) in dataflow.index_imports.iter_mut()
743 {
744 // A sanity check that we are not importing an index that we are also exporting.
745 assert!(
746 !dataflow
747 .index_exports
748 .iter()
749 .map(|(exported_index_id, _)| exported_index_id)
750 .any(|exported_index_id| exported_index_id == index_id)
751 );
752
753 let mut new_usage_types = Vec::new();
754 // Let's see whether something has requested an index on this object that this imported
755 // index is on.
756 if let Some(index_reqs) = index_reqs_by_id.get(&index_desc.on_id) {
757 for (req_idx_id, req_key, req_usage_type) in index_reqs {
758 if req_idx_id == index_id {
759 soft_assert_eq_or_log!(*req_key, index_desc.key);
760 new_usage_types.push(req_usage_type.clone());
761 }
762 }
763 }
764 if !new_usage_types.is_empty() {
765 dataflow_metainfo
766 .index_usage_types
767 .insert(*index_id, new_usage_types);
768 }
769 }
770
771 // Prune index imports to only those that are used
772 dataflow
773 .index_imports
774 .retain(|id, _index_import| dataflow_metainfo.index_usage_types.contains_key(id));
775
776 // Determine AccessStrategy::SameDataflow accesses. These were classified as
777 // AccessStrategy::Persist inside collect_index_reqs, so now we check these, and if the id is of
778 // a collection that we are building ourselves, then we adjust the access strategy.
779 let mut objects_to_build_ids = BTreeSet::new();
780 for BuildDesc { id, plan: _ } in dataflow.objects_to_build.iter() {
781 objects_to_build_ids.insert(id.clone());
782 }
783 for build_desc in dataflow.objects_to_build.iter_mut() {
784 build_desc
785 .plan
786 .as_inner_mut()
787 .visit_pre_mut(|expr: &mut MirRelationExpr| match expr {
788 MirRelationExpr::Get {
789 id: Id::Global(global_id),
790 typ: _,
791 access_strategy,
792 } => match access_strategy {
793 AccessStrategy::Persist => {
794 if objects_to_build_ids.contains(global_id) {
795 *access_strategy = AccessStrategy::SameDataflow;
796 }
797 }
798 _ => {}
799 },
800 _ => {}
801 });
802 }
803
804 // A sanity check that all Get annotations indicate indexes that are present in `index_imports`.
805 for build_desc in dataflow.objects_to_build.iter() {
806 build_desc
807 .plan
808 .as_inner()
809 .visit_pre(|expr: &MirRelationExpr| match expr {
810 MirRelationExpr::Get {
811 id: Id::Global(_),
812 typ: _,
813 access_strategy: AccessStrategy::Index(accesses),
814 } => {
815 for (idx_id, _) in accesses {
816 soft_assert_or_log!(
817 dataflow.index_imports.contains_key(idx_id),
818 "Dangling Get index annotation"
819 );
820 }
821 }
822 _ => {}
823 });
824 }
825
826 mz_repr::explain::trace_plan(dataflow);
827
828 Ok(())
829}
830
831/// Pick an index from a given Vec of index keys.
832///
833/// Currently, we pick as follows:
834/// - If there is an index on a unique key, then we pick that. (It might be better distributed, and
835/// is less likely to get dropped than other indexes.)
836/// - Otherwise, we pick an arbitrary index.
837///
838/// TODO: There are various edge cases where a better choice would be possible:
839/// - Some indexes might be less skewed than others. (Although, picking a unique key tries to
840/// capture this already.)
841/// - Some indexes might have an error, while others don't.
842/// <https://github.com/MaterializeInc/database-issues/issues/4455>
843/// - Some indexes might have more extra data in their keys (because of being on more complicated
844/// expressions than just column references), which won't be used in a full scan.
845fn choose_index(
846 source_keys: &BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
847 id: &GlobalId,
848 indexes: &Vec<(GlobalId, Vec<MirScalarExpr>)>,
849) -> Option<(GlobalId, Vec<MirScalarExpr>)> {
850 match source_keys.get(id) {
851 None => indexes.iter().next().cloned(), // pick an arbitrary index
852 Some(coll_keys) => match indexes
853 .iter()
854 .find(|(_idx_id, key)| coll_keys.contains(&*key))
855 {
856 Some((idx_id, key)) => Some((*idx_id, key.clone())),
857 None => indexes.iter().next().cloned(), // pick an arbitrary index
858 },
859 }
860}
861
862#[derive(Debug)]
863struct CollectIndexRequests<'a> {
864 /// We were told about these unique keys on sources.
865 source_keys: &'a BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
866 /// We were told about these indexes being available.
867 indexes_available: &'a dyn IndexOracle,
868 /// We'll be collecting index requests here.
869 index_reqs_by_id:
870 &'a mut BTreeMap<GlobalId, Vec<(GlobalId, Vec<MirScalarExpr>, IndexUsageType)>>,
871 /// As we recurse down a MirRelationExpr, we'll need to keep track of the context of the
872 /// current node (see docs on `IndexUsageContext` about what context we keep).
873 /// Moreover, we need to propagate this context from cte uses to cte definitions.
874 /// `context_across_lets` will keep track of the contexts that reached each use of a LocalId
875 /// added together.
876 context_across_lets: BTreeMap<LocalId, Vec<IndexUsageContext>>,
877 recursion_guard: RecursionGuard,
878}
879
880impl<'a> CheckedRecursion for CollectIndexRequests<'a> {
881 fn recursion_guard(&self) -> &RecursionGuard {
882 &self.recursion_guard
883 }
884}
885
886impl<'a> CollectIndexRequests<'a> {
887 fn new(
888 source_keys: &'a BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
889 indexes_available: &'a dyn IndexOracle,
890 index_reqs_by_id: &'a mut BTreeMap<
891 GlobalId,
892 Vec<(GlobalId, Vec<MirScalarExpr>, IndexUsageType)>,
893 >,
894 ) -> CollectIndexRequests<'a> {
895 CollectIndexRequests {
896 source_keys,
897 indexes_available,
898 index_reqs_by_id,
899 context_across_lets: BTreeMap::new(),
900 recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
901 }
902 }
903
904 pub fn collect_index_reqs(
905 &mut self,
906 expr: &mut MirRelationExpr,
907 ) -> Result<(), RecursionLimitError> {
908 assert!(self.context_across_lets.is_empty());
909 self.collect_index_reqs_inner(
910 expr,
911 &IndexUsageContext::from_usage_type(IndexUsageType::PlanRootNoArrangement),
912 )?;
913 assert!(self.context_across_lets.is_empty());
914 // Sanity check that we don't have any `DeltaJoinIndexUsageType::Unknown` remaining.
915 for (_id, index_reqs) in self.index_reqs_by_id.iter() {
916 for (_, _, index_usage_type) in index_reqs {
917 soft_assert_or_log!(
918 !matches!(
919 index_usage_type,
920 IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Unknown)
921 ),
922 "Delta join Unknown index usage remained"
923 );
924 }
925 }
926 Ok(())
927 }
928
929 fn collect_index_reqs_inner(
930 &mut self,
931 expr: &mut MirRelationExpr,
932 contexts: &Vec<IndexUsageContext>,
933 ) -> Result<(), RecursionLimitError> {
934 self.checked_recur_mut(|this| {
935
936 // If an index exists on `on_id`, this function picks an index to be fully scanned.
937 let pick_index_for_full_scan = |on_id: &GlobalId| {
938 // Note that the choice we make here might be modified later at the
939 // "Adjust FullScans to not introduce a new index dependency".
940 choose_index(
941 this.source_keys,
942 on_id,
943 &this.indexes_available.indexes_on(*on_id).map(
944 |(idx_id, key)| (idx_id, key.iter().cloned().collect_vec())
945 ).collect_vec()
946 )
947 };
948
949 // See comment on `IndexUsageContext`.
950 Ok(match expr {
951 MirRelationExpr::Join {
952 inputs,
953 implementation,
954 ..
955 } => {
956 match implementation {
957 JoinImplementation::Differential(..) => {
958 for input in inputs {
959 this.collect_index_reqs_inner(
960 input,
961 &IndexUsageContext::from_usage_type(
962 IndexUsageType::DifferentialJoin,
963 ),
964 )?;
965 }
966 }
967 JoinImplementation::DeltaQuery(..) => {
968 // For Delta joins, the first input is special, see
969 // https://github.com/MaterializeInc/database-issues/issues/2115
970 this.collect_index_reqs_inner(
971 &mut inputs[0],
972 &IndexUsageContext::from_usage_type(IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Unknown)),
973 )?;
974 for input in &mut inputs[1..] {
975 this.collect_index_reqs_inner(
976 input,
977 &IndexUsageContext::from_usage_type(IndexUsageType::DeltaJoin(
978 DeltaJoinIndexUsageType::Lookup,
979 )),
980 )?;
981 }
982 }
983 JoinImplementation::IndexedFilter(_coll_id, idx_id, ..) => {
984 for input in inputs {
985 this.collect_index_reqs_inner(
986 input,
987 &IndexUsageContext::from_usage_type(IndexUsageType::Lookup(*idx_id)),
988 )?;
989 }
990 }
991 JoinImplementation::Unimplemented => {
992 soft_panic_or_log!(
993 "CollectIndexRequests encountered an Unimplemented join"
994 );
995 }
996 }
997 }
998 MirRelationExpr::ArrangeBy { input, keys } => {
999 this.collect_index_reqs_inner(input, &IndexUsageContext::add_keys(contexts, keys))?;
1000 }
1001 MirRelationExpr::Get {
1002 id: Id::Global(global_id),
1003 access_strategy: persist_or_index,
1004 ..
1005 } => {
1006 this.index_reqs_by_id
1007 .entry(*global_id)
1008 .or_insert_with(Vec::new);
1009 // If the context is empty, it means we didn't see an operator that would
1010 // specifically want to use an index for this Get. However, let's still try to
1011 // find an index for a full scan.
1012 let mut try_full_scan = contexts.is_empty();
1013 let mut index_accesses = Vec::new();
1014 for context in contexts {
1015 match &context.requested_keys {
1016 None => {
1017 // We have some index usage context, but didn't see an `ArrangeBy`.
1018 try_full_scan = true;
1019 match context.usage_type {
1020 IndexUsageType::FullScan | IndexUsageType::SinkExport | IndexUsageType::IndexExport => {
1021 // Not possible, because these don't go through
1022 // IndexUsageContext at all.
1023 unreachable!()
1024 },
1025 // You can find more info on why the following join cases
1026 // shouldn't happen in comments of the Join lowering to LIR.
1027 IndexUsageType::Lookup(_) => soft_panic_or_log!("CollectIndexRequests encountered an IndexedFilter join without an ArrangeBy"),
1028 IndexUsageType::DifferentialJoin => soft_panic_or_log!("CollectIndexRequests encountered a Differential join without an ArrangeBy"),
1029 IndexUsageType::DeltaJoin(_) => soft_panic_or_log!("CollectIndexRequests encountered a Delta join without an ArrangeBy"),
1030 IndexUsageType::PlanRootNoArrangement => {
1031 // This is ok: the entire plan is a `Get`, with not even an
1032 // `ArrangeBy`. Note that if an index exists, the usage will
1033 // be saved as `FullScan` (NOT as `PlanRootNoArrangement`),
1034 // because we are going into the `try_full_scan` if.
1035 },
1036 IndexUsageType::FastPathLimit => {
1037 // These are created much later, not even inside
1038 // `prune_and_annotate_dataflow_index_imports`.
1039 unreachable!()
1040 }
1041 IndexUsageType::DanglingArrangeBy => {
1042 // Not possible, because we create `DanglingArrangeBy`
1043 // only when we see an `ArrangeBy`.
1044 unreachable!()
1045 },
1046 IndexUsageType::Unknown => {
1047 // These are added only after `CollectIndexRequests` has run.
1048 unreachable!()
1049 }
1050 }
1051 }
1052 Some(requested_keys) => {
1053 for requested_key in requested_keys {
1054 match this
1055 .indexes_available
1056 .indexes_on(*global_id)
1057 .find(|(available_idx_id, available_key)| {
1058 match context.usage_type {
1059 IndexUsageType::Lookup(req_idx_id) => {
1060 // `LiteralConstraints` already picked an index
1061 // by id. Let's use that one.
1062 assert!(!(available_idx_id == &req_idx_id && available_key != &requested_key));
1063 available_idx_id == &req_idx_id
1064 },
1065 _ => available_key == &requested_key,
1066 }
1067 })
1068 {
1069 Some((idx_id, key)) => {
1070 this.index_reqs_by_id
1071 .get_mut(global_id)
1072 .unwrap()
1073 .push((idx_id, key.to_owned(), context.usage_type.clone()));
1074 index_accesses.push((idx_id, context.usage_type.clone()));
1075 },
1076 None => {
1077 // If there is a key requested for which we don't have an
1078 // index, then we might still be able to do a full scan of a
1079 // differently keyed index.
1080 try_full_scan = true;
1081 }
1082 }
1083 }
1084 if requested_keys.is_empty() {
1085 // It's a bit weird if an MIR ArrangeBy is not requesting any
1086 // key, but let's try a full scan in that case anyhow.
1087 try_full_scan = true;
1088 }
1089 }
1090 }
1091 }
1092 if try_full_scan {
1093 // Keep in mind that when having 2 contexts coming from 2 uses of a Let,
1094 // this code can't distinguish between the case when there is 1 ArrangeBy at the
1095 // top of the Let, or when the 2 uses each have an `ArrangeBy`. In both cases,
1096 // we'll add only 1 full scan, which would be wrong in the latter case. However,
1097 // the latter case can't currently happen until we do
1098 // https://github.com/MaterializeInc/database-issues/issues/6363
1099 // Also note that currently we are deduplicating index usage types when
1100 // printing index usages in EXPLAIN.
1101 if let Some((idx_id, key)) = pick_index_for_full_scan(global_id) {
1102 this.index_reqs_by_id
1103 .get_mut(global_id)
1104 .unwrap()
1105 .push((idx_id, key.to_owned(), IndexUsageType::FullScan));
1106 index_accesses.push((idx_id, IndexUsageType::FullScan));
1107 }
1108 }
1109 if index_accesses.is_empty() {
1110 *persist_or_index = AccessStrategy::Persist;
1111 } else {
1112 *persist_or_index = AccessStrategy::Index(index_accesses);
1113 }
1114 }
1115 MirRelationExpr::Get {
1116 id: Id::Local(local_id),
1117 ..
1118 } => {
1119 // Add the current context to the vector of contexts of `local_id`.
1120 // (The unwrap is safe, because the Let and LetRec cases start with inserting an
1121 // empty entry.)
1122 this.context_across_lets
1123 .get_mut(local_id)
1124 .unwrap()
1125 .extend(contexts.iter().cloned());
1126 // No recursive call here, because Get has no inputs.
1127 }
1128 MirRelationExpr::Let { id, value, body } => {
1129 let shadowed_context = this.context_across_lets.insert(id.clone(), Vec::new());
1130 // No shadowing in MIR
1131 assert_none!(shadowed_context);
1132 // We go backwards: Recurse on the body and then the value.
1133 this.collect_index_reqs_inner(body, contexts)?;
1134 // The above call filled in the entry for `id` in `context_across_lets` (if it
1135 // was referenced). Anyhow, at least an empty entry should exist, because we started
1136 // above with inserting it.
1137 this.collect_index_reqs_inner(
1138 value,
1139 &this.context_across_lets[id].clone(),
1140 )?;
1141 // Clean up the id from the saved contexts.
1142 this.context_across_lets.remove(id);
1143 }
1144 MirRelationExpr::LetRec {
1145 ids,
1146 values,
1147 limits: _,
1148 body,
1149 } => {
1150 for id in ids.iter() {
1151 let shadowed_context = this.context_across_lets.insert(id.clone(), Vec::new());
1152 assert_none!(shadowed_context); // No shadowing in MIR
1153 }
1154 // We go backwards: Recurse on the body first.
1155 this.collect_index_reqs_inner(body, contexts)?;
1156 // Reset the contexts of the ids (of the current LetRec), because an arrangement
1157 // from a value can't be used in the body.
1158 for id in ids.iter() {
1159 *this.context_across_lets.get_mut(id).unwrap() = Vec::new();
1160 }
1161 // Recurse on the values in reverse order.
1162 // Note that we do only one pass, i.e., we won't see context through a Get that
1163 // refers to the previous iteration. But this is ok, because we can't reuse
1164 // arrangements across iterations anyway.
1165 for (id, value) in ids.iter().rev().zip_eq(values.iter_mut().rev()) {
1166 this.collect_index_reqs_inner(
1167 value,
1168 &this.context_across_lets[id].clone(),
1169 )?;
1170 }
1171 // Clean up the ids from the saved contexts.
1172 for id in ids {
1173 this.context_across_lets.remove(id);
1174 }
1175 }
1176 _ => {
1177 // Nothing interesting at this node, recurse with the empty context (regardless of
1178 // what context we got from above).
1179 let empty_context = Vec::new();
1180 for child in expr.children_mut() {
1181 this.collect_index_reqs_inner(child, &empty_context)?;
1182 }
1183 }
1184 })
1185 })
1186 }
1187}
1188
1189/// This struct will save info about parent nodes as we are descending down a `MirRelationExpr`.
1190/// We always start with filling in `usage_type` when we see an operation that uses an arrangement,
1191/// and then we fill in `requested_keys` when we see an `ArrangeBy`. So, the pattern that we are
1192/// looking for is
1193/// ```text
1194/// <operation that uses an index>
1195/// ArrangeBy <requested_keys>
1196/// Get <global_id>
1197/// ```
1198/// When we reach a `Get` to a global id, we access this context struct to see if the rest of the
1199/// pattern is present above the `Get`.
1200///
1201/// Note that we usually put this struct in a Vec, because we track context across local let
1202/// bindings, which means that a node can have multiple parents.
1203#[derive(Debug, Clone)]
1204struct IndexUsageContext {
1205 usage_type: IndexUsageType,
1206 requested_keys: Option<BTreeSet<Vec<MirScalarExpr>>>,
1207}
1208
1209impl IndexUsageContext {
1210 pub fn from_usage_type(usage_type: IndexUsageType) -> Vec<Self> {
1211 vec![IndexUsageContext {
1212 usage_type,
1213 requested_keys: None,
1214 }]
1215 }
1216
1217 // Add the keys of an ArrangeBy into the contexts.
1218 // Soft_panics if haven't already seen something that indicates what the index will be used for.
1219 pub fn add_keys(
1220 old_contexts: &Vec<IndexUsageContext>,
1221 keys_to_add: &Vec<Vec<MirScalarExpr>>,
1222 ) -> Vec<IndexUsageContext> {
1223 let old_contexts = if old_contexts.is_empty() {
1224 // No join above us, and we are not at the root. Why does this ArrangeBy even exist?
1225 soft_panic_or_log!("CollectIndexRequests encountered a dangling ArrangeBy");
1226 // Anyhow, let's create a context with a `DanglingArrangeBy` index usage, so that we
1227 // have a place to note down the requested keys below.
1228 IndexUsageContext::from_usage_type(IndexUsageType::DanglingArrangeBy)
1229 } else {
1230 old_contexts.clone()
1231 };
1232 old_contexts
1233 .into_iter()
1234 .flat_map(|old_context| {
1235 if !matches!(
1236 old_context.usage_type,
1237 IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Unknown)
1238 ) {
1239 // If it's not an unknown delta join usage, then we simply note down the new
1240 // keys into `requested_keys`.
1241 let mut context = old_context.clone();
1242 if context.requested_keys.is_none() {
1243 context.requested_keys = Some(BTreeSet::new());
1244 }
1245 context
1246 .requested_keys
1247 .as_mut()
1248 .unwrap()
1249 .extend(keys_to_add.iter().cloned());
1250 Some(context).into_iter().chain(None)
1251 } else {
1252 // If it's an unknown delta join usage, then we need to figure out whether this
1253 // is a full scan or a lookup.
1254 //
1255 // `source_key` in `DeltaPathPlan` determines which arrangement we are going to
1256 // scan when starting the rendering of a delta path. This is the one for which
1257 // we want a `DeltaJoinIndexUsageType::FirstInputFullScan`.
1258 //
1259 // However, `DeltaPathPlan` is an LIR concept, and here we need to figure out
1260 // the `source_key` based on the MIR plan. We do this by doing the same as
1261 // `DeltaJoinPlan::create_from`: choose the smallest key (by `Ord`).
1262 let source_key = keys_to_add
1263 .iter()
1264 .min()
1265 .expect("ArrangeBy below a delta join has at least one key");
1266 let full_scan_context = IndexUsageContext {
1267 requested_keys: Some(BTreeSet::from([source_key.clone()])),
1268 usage_type: IndexUsageType::DeltaJoin(
1269 DeltaJoinIndexUsageType::FirstInputFullScan,
1270 ),
1271 };
1272 let lookup_keys = keys_to_add
1273 .into_iter()
1274 .filter(|key| *key != source_key)
1275 .cloned()
1276 .collect_vec();
1277 if lookup_keys.is_empty() {
1278 Some(full_scan_context).into_iter().chain(None)
1279 } else {
1280 let lookup_context = IndexUsageContext {
1281 requested_keys: Some(lookup_keys.into_iter().collect()),
1282 usage_type: IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Lookup),
1283 };
1284 Some(full_scan_context)
1285 .into_iter()
1286 .chain(Some(lookup_context))
1287 }
1288 }
1289 })
1290 .collect()
1291 }
1292}
1293
1294/// Extra information about the dataflow. This is not going to be shipped, but has to be processed
1295/// in other ways, e.g., showing notices to the user, or saving meta-information to the catalog.
1296#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
1297pub struct DataflowMetainfo<Notice = RawOptimizerNotice> {
1298 /// Notices that the optimizer wants to show to users.
1299 /// For pushing a new element, use [`Self::push_optimizer_notice_dedup`].
1300 pub optimizer_notices: Vec<Notice>,
1301 /// What kind of operation (full scan, lookup, ...) will access each index. Computed by
1302 /// `prune_and_annotate_dataflow_index_imports`.
1303 pub index_usage_types: BTreeMap<GlobalId, Vec<IndexUsageType>>,
1304}
1305
1306impl<Notice> Default for DataflowMetainfo<Notice> {
1307 fn default() -> Self {
1308 DataflowMetainfo {
1309 optimizer_notices: Vec::new(),
1310 index_usage_types: BTreeMap::new(),
1311 }
1312 }
1313}
1314
1315impl<Notice> DataflowMetainfo<Notice> {
1316 /// Create a [`UsedIndexes`] instance by resolving each `id` in the
1317 /// `index_ids` iterator against an entry expected to exist in the
1318 /// [`DataflowMetainfo::index_usage_types`].
1319 pub fn used_indexes<T>(&self, df_desc: &DataflowDescription<T>) -> UsedIndexes {
1320 UsedIndexes::new(
1321 df_desc
1322 .index_imports
1323 .iter()
1324 .map(|(id, _)| {
1325 let entry = self.index_usage_types.get(id).cloned();
1326 // If an entry does not exist, mark the usage type for this
1327 // index as `Unknown`.
1328 //
1329 // This should never happen if this method is called after
1330 // running `prune_and_annotate_dataflow_index_imports` on
1331 // the dataflow (this happens at the end of the
1332 // `optimize_dataflow` call).
1333 let index_usage_type = entry.unwrap_or_else(|| vec![IndexUsageType::Unknown]);
1334
1335 (*id, index_usage_type)
1336 })
1337 .collect(),
1338 )
1339 }
1340}
1341
1342impl DataflowMetainfo<RawOptimizerNotice> {
1343 /// Pushes a [`RawOptimizerNotice`] into [`Self::optimizer_notices`], but
1344 /// only if the exact same notice is not already present.
1345 pub fn push_optimizer_notice_dedup<T>(&mut self, notice: T)
1346 where
1347 T: Into<RawOptimizerNotice>,
1348 {
1349 let notice = notice.into();
1350 if !self.optimizer_notices.contains(¬ice) {
1351 self.optimizer_notices.push(notice);
1352 }
1353 }
1354}