mz_transform/
dataflow.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Whole-dataflow optimization
11//!
12//! A dataflow may contain multiple views, each of which may only be
13//! optimized locally. However, information like demand and predicate
14//! pushdown can be applied across views once we understand the context
15//! in which the views will be executed.
16
17use std::collections::{BTreeMap, BTreeSet};
18
19use itertools::Itertools;
20use mz_compute_types::dataflows::{BuildDesc, DataflowDesc, DataflowDescription, IndexImport};
21use mz_expr::{
22    AccessStrategy, CollectionPlan, Id, JoinImplementation, LocalId, MapFilterProject,
23    MirRelationExpr, MirScalarExpr, RECURSION_LIMIT,
24};
25use mz_ore::stack::{CheckedRecursion, RecursionGuard, RecursionLimitError};
26use mz_ore::{assert_none, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
27use mz_repr::GlobalId;
28use mz_repr::explain::{DeltaJoinIndexUsageType, IndexUsageType, UsedIndexes};
29use proptest_derive::Arbitrary;
30use serde::{Deserialize, Serialize};
31
32use crate::monotonic::MonotonicFlag;
33use crate::notice::RawOptimizerNotice;
34use crate::{IndexOracle, Optimizer, TransformCtx, TransformError};
35
36/// Optimizes the implementation of each dataflow.
37///
38/// Inlines views, performs a full optimization pass including physical
39/// planning using the supplied indexes, propagates filtering and projection
40/// information to dataflow sources and lifts monotonicity information.
41#[mz_ore::instrument(
42    target = "optimizer",
43    level = "debug",
44    fields(path.segment ="global")
45)]
46pub fn optimize_dataflow(
47    dataflow: &mut DataflowDesc,
48    transform_ctx: &mut TransformCtx,
49    fast_path_optimizer: bool,
50) -> Result<(), TransformError> {
51    // Inline views that are used in only one other view.
52    inline_views(dataflow)?;
53
54    if fast_path_optimizer {
55        optimize_dataflow_relations(
56            dataflow,
57            &Optimizer::fast_path_optimizer(transform_ctx),
58            transform_ctx,
59        )?;
60    } else {
61        // Logical optimization pass after view inlining
62        optimize_dataflow_relations(
63            dataflow,
64            #[allow(deprecated)]
65            &Optimizer::logical_optimizer(transform_ctx),
66            transform_ctx,
67        )?;
68
69        optimize_dataflow_filters(dataflow)?;
70        // TODO: when the linear operator contract ensures that propagated
71        // predicates are always applied, projections and filters can be removed
72        // from where they come from. Once projections and filters can be removed,
73        // TODO: it would be useful for demand to be optimized after filters
74        // that way demand only includes the columns that are still necessary after
75        // the filters are applied.
76        optimize_dataflow_demand(dataflow)?;
77
78        // A smaller logical optimization pass after projections and filters are
79        // pushed down across views.
80        optimize_dataflow_relations(
81            dataflow,
82            &Optimizer::logical_cleanup_pass(transform_ctx, false),
83            transform_ctx,
84        )?;
85
86        // Physical optimization pass
87        optimize_dataflow_relations(
88            dataflow,
89            &Optimizer::physical_optimizer(transform_ctx),
90            transform_ctx,
91        )?;
92
93        optimize_dataflow_monotonic(dataflow, transform_ctx)?;
94    }
95
96    prune_and_annotate_dataflow_index_imports(
97        dataflow,
98        transform_ctx.indexes,
99        transform_ctx.df_meta,
100    )?;
101
102    // Warning: If you want to add a transform call here, consider it very carefully whether it
103    // could accidentally invalidate information that we already derived above in
104    // `optimize_dataflow_monotonic` or `prune_and_annotate_dataflow_index_imports`.
105
106    mz_repr::explain::trace_plan(dataflow);
107
108    Ok(())
109}
110
111/// Inline views used in one other view, and in no exported objects.
112#[mz_ore::instrument(
113    target = "optimizer",
114    level = "debug",
115    fields(path.segment = "inline_views")
116)]
117fn inline_views(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
118    // We cannot inline anything whose `BuildDesc::id` appears in either the
119    // `index_exports` or `sink_exports` of `dataflow`, because we lose our
120    // ability to name it.
121
122    // A view can / should be in-lined in another view if it is only used by
123    // one subsequent view. If there are two distinct views that have not
124    // themselves been merged, then too bad and it doesn't get inlined.
125
126    // Starting from the *last* object to build, walk backwards and inline
127    // any view that is neither referenced by a `index_exports` nor
128    // `sink_exports` nor more than two remaining objects to build.
129
130    for index in (0..dataflow.objects_to_build.len()).rev() {
131        // Capture the name used by others to reference this view.
132        let global_id = dataflow.objects_to_build[index].id;
133        // Determine if any exports directly reference this view.
134        let mut occurs_in_export = false;
135        for (_gid, sink_desc) in dataflow.sink_exports.iter() {
136            if sink_desc.from == global_id {
137                occurs_in_export = true;
138            }
139        }
140        for (_, (index_desc, _)) in dataflow.index_exports.iter() {
141            if index_desc.on_id == global_id {
142                occurs_in_export = true;
143            }
144        }
145        // Count the number of subsequent views that reference this view.
146        let mut occurrences_in_later_views = Vec::new();
147        for other in (index + 1)..dataflow.objects_to_build.len() {
148            if dataflow.objects_to_build[other]
149                .plan
150                .depends_on()
151                .contains(&global_id)
152            {
153                occurrences_in_later_views.push(other);
154            }
155        }
156        // Inline if the view is referenced in one view and no exports.
157        if !occurs_in_export && occurrences_in_later_views.len() == 1 {
158            let other = occurrences_in_later_views[0];
159            // We can remove this view and insert it in the later view,
160            // but are not able to relocate the later view `other`.
161
162            // When splicing in the `index` view, we need to create disjoint
163            // identifiers for the Let's `body` and `value`, as well as a new
164            // identifier for the binding itself. Following `NormalizeLets`, we
165            // go with the binding first, then the value, then the body.
166            let mut id_gen = crate::IdGen::default();
167            let new_local = LocalId::new(id_gen.allocate_id());
168            // Use the same `id_gen` to assign new identifiers to `index`.
169            crate::normalize_lets::renumber_bindings(
170                dataflow.objects_to_build[index].plan.as_inner_mut(),
171                &mut id_gen,
172            )?;
173            // Assign new identifiers to the other relation.
174            crate::normalize_lets::renumber_bindings(
175                dataflow.objects_to_build[other].plan.as_inner_mut(),
176                &mut id_gen,
177            )?;
178            // Install the `new_local` name wherever `global_id` was used.
179            dataflow.objects_to_build[other]
180                .plan
181                .as_inner_mut()
182                .visit_pre_mut(|expr| {
183                    if let MirRelationExpr::Get { id, .. } = expr {
184                        if id == &Id::Global(global_id) {
185                            *id = Id::Local(new_local);
186                        }
187                    }
188                });
189
190            // With identifiers rewritten, we can replace `other` with
191            // a `MirRelationExpr::Let` binding, whose value is `index` and
192            // whose body is `other`.
193            let body = dataflow.objects_to_build[other]
194                .plan
195                .as_inner_mut()
196                .take_dangerous();
197            let value = dataflow.objects_to_build[index]
198                .plan
199                .as_inner_mut()
200                .take_dangerous();
201            *dataflow.objects_to_build[other].plan.as_inner_mut() = MirRelationExpr::Let {
202                id: new_local,
203                value: Box::new(value),
204                body: Box::new(body),
205            };
206            dataflow.objects_to_build.remove(index);
207        }
208    }
209
210    mz_repr::explain::trace_plan(dataflow);
211
212    Ok(())
213}
214
215/// Performs either the logical or the physical optimization pass on the
216/// dataflow using the supplied set of indexes.
217#[mz_ore::instrument(
218    target = "optimizer",
219    level = "debug",
220    fields(path.segment = optimizer.name)
221)]
222fn optimize_dataflow_relations(
223    dataflow: &mut DataflowDesc,
224    optimizer: &Optimizer,
225    ctx: &mut TransformCtx,
226) -> Result<(), TransformError> {
227    // Re-optimize each dataflow
228    for object in dataflow.objects_to_build.iter_mut() {
229        // Re-run all optimizations on the composite views.
230        ctx.set_global_id(object.id);
231        optimizer.transform(object.plan.as_inner_mut(), ctx)?;
232        ctx.reset_global_id();
233    }
234
235    mz_repr::explain::trace_plan(dataflow);
236
237    Ok(())
238}
239
240/// Pushes demand information from published outputs to dataflow inputs,
241/// projecting away unnecessary columns.
242///
243/// Dataflows that exist for the sake of generating plan explanations do not
244/// have published outputs. In this case, we push demand information from views
245/// not depended on by other views to dataflow inputs.
246#[mz_ore::instrument(
247    target = "optimizer",
248    level = "debug",
249    fields(path.segment ="demand")
250)]
251fn optimize_dataflow_demand(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
252    // Maps id -> union of known columns demanded from the source/view with the
253    // corresponding id.
254    let mut demand = BTreeMap::new();
255
256    if dataflow.index_exports.is_empty() && dataflow.sink_exports.is_empty() {
257        // In the absence of any exports, just demand all columns from views
258        // that are not depended on by another view, which is currently the last
259        // object in `objects_to_build`.
260
261        // A DataflowDesc without exports is currently created in the context of
262        // EXPLAIN outputs. This ensures that the output has all the columns of
263        // the original explainee.
264        if let Some(build_desc) = dataflow.objects_to_build.iter_mut().rev().next() {
265            demand
266                .entry(Id::Global(build_desc.id))
267                .or_insert_with(BTreeSet::new)
268                .extend(0..build_desc.plan.as_inner_mut().arity());
269        }
270    } else {
271        // Demand all columns of inputs to sinks.
272        for (_id, sink) in dataflow.sink_exports.iter() {
273            let input_id = sink.from;
274            demand
275                .entry(Id::Global(input_id))
276                .or_insert_with(BTreeSet::new)
277                .extend(0..dataflow.arity_of(&input_id));
278        }
279
280        // Demand all columns of inputs to exported indexes.
281        for (_id, (desc, _typ)) in dataflow.index_exports.iter() {
282            let input_id = desc.on_id;
283            demand
284                .entry(Id::Global(input_id))
285                .or_insert_with(BTreeSet::new)
286                .extend(0..dataflow.arity_of(&input_id));
287        }
288    }
289
290    optimize_dataflow_demand_inner(
291        dataflow
292            .objects_to_build
293            .iter_mut()
294            .rev()
295            .map(|build_desc| (Id::Global(build_desc.id), build_desc.plan.as_inner_mut())),
296        &mut demand,
297    )?;
298
299    mz_repr::explain::trace_plan(dataflow);
300
301    Ok(())
302}
303
304/// Pushes demand through views in `view_sequence` in order, removing
305/// columns not demanded.
306///
307/// This method is made public for the sake of testing.
308/// TODO: make this private once we allow multiple exports per dataflow.
309pub fn optimize_dataflow_demand_inner<'a, I>(
310    view_sequence: I,
311    demand: &mut BTreeMap<Id, BTreeSet<usize>>,
312) -> Result<(), TransformError>
313where
314    I: Iterator<Item = (Id, &'a mut MirRelationExpr)>,
315{
316    // Maps id -> The projection that was pushed down on the view with the
317    // corresponding id.
318    let mut applied_projection = BTreeMap::new();
319    // Collect the mutable references to views after pushing projection down
320    // in order to run cleanup actions on them in a second loop.
321    let mut view_refs = Vec::new();
322    let projection_pushdown = crate::movement::ProjectionPushdown::default();
323    for (id, view) in view_sequence {
324        if let Some(columns) = demand.get(&id) {
325            let projection_pushed_down = columns.iter().map(|c| *c).collect();
326            // Push down the projection consisting of the entries of `columns`
327            // in increasing order.
328            projection_pushdown.action(view, &projection_pushed_down, demand)?;
329            let new_type = view.typ();
330            applied_projection.insert(id, (projection_pushed_down, new_type));
331        }
332        view_refs.push(view);
333    }
334
335    for view in view_refs {
336        // Update `Get` nodes to reflect any columns that have been projected away.
337        projection_pushdown.update_projection_around_get(view, &applied_projection);
338    }
339
340    Ok(())
341}
342
343/// Pushes predicate to dataflow inputs.
344#[mz_ore::instrument(
345    target = "optimizer",
346    level = "debug",
347    fields(path.segment ="filters")
348)]
349fn optimize_dataflow_filters(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
350    // Contains id -> predicates map, describing those predicates that
351    // can (but need not) be applied to the collection named by `id`.
352    let mut predicates = BTreeMap::<Id, BTreeSet<mz_expr::MirScalarExpr>>::new();
353
354    // Propagate predicate information from outputs to inputs.
355    optimize_dataflow_filters_inner(
356        dataflow
357            .objects_to_build
358            .iter_mut()
359            .rev()
360            .map(|build_desc| (Id::Global(build_desc.id), build_desc.plan.as_inner_mut())),
361        &mut predicates,
362    )?;
363
364    // Push predicate information into the SourceDesc.
365    for (source_id, (source, _monotonic, _upper)) in dataflow.source_imports.iter_mut() {
366        if let Some(list) = predicates.remove(&Id::Global(*source_id)) {
367            if !list.is_empty() {
368                // Canonicalize the order of predicates, for stable plans.
369                let mut list = list.into_iter().collect::<Vec<_>>();
370                list.sort();
371                // Install no-op predicate information if none exists.
372                if source.arguments.operators.is_none() {
373                    source.arguments.operators = Some(MapFilterProject::new(source.typ.arity()));
374                }
375                // Add any predicates that can be pushed to the source.
376                if let Some(operator) = source.arguments.operators.take() {
377                    source.arguments.operators = Some(operator.filter(list));
378                    source.arguments.operators.as_mut().map(|x| x.optimize());
379                }
380            }
381        }
382    }
383
384    mz_repr::explain::trace_plan(dataflow);
385
386    Ok(())
387}
388
389/// Pushes filters down through views in `view_sequence` in order.
390///
391/// This method is made public for the sake of testing.
392/// TODO: make this private once we allow multiple exports per dataflow.
393pub fn optimize_dataflow_filters_inner<'a, I>(
394    view_iter: I,
395    predicates: &mut BTreeMap<Id, BTreeSet<mz_expr::MirScalarExpr>>,
396) -> Result<(), TransformError>
397where
398    I: Iterator<Item = (Id, &'a mut MirRelationExpr)>,
399{
400    let transform = crate::predicate_pushdown::PredicatePushdown::default();
401    for (id, view) in view_iter {
402        if let Some(list) = predicates.get(&id).clone() {
403            if !list.is_empty() {
404                *view = view.take_dangerous().filter(list.iter().cloned());
405            }
406        }
407        transform.action(view, predicates)?;
408    }
409    Ok(())
410}
411
412/// Propagates information about monotonic inputs through operators,
413/// using [`mz_repr::optimize::OptimizerFeatures`] from `ctx` for [`crate::analysis::Analysis`].
414#[mz_ore::instrument(
415    target = "optimizer",
416    level = "debug",
417    fields(path.segment ="monotonic")
418)]
419pub fn optimize_dataflow_monotonic(
420    dataflow: &mut DataflowDesc,
421    ctx: &mut TransformCtx,
422) -> Result<(), TransformError> {
423    let mut monotonic_ids = BTreeSet::new();
424    for (source_id, (_source, is_monotonic, _upper)) in dataflow.source_imports.iter() {
425        if *is_monotonic {
426            monotonic_ids.insert(source_id.clone());
427        }
428    }
429    for (
430        _index_id,
431        IndexImport {
432            desc: index_desc,
433            monotonic,
434            ..
435        },
436    ) in dataflow.index_imports.iter()
437    {
438        if *monotonic {
439            monotonic_ids.insert(index_desc.on_id.clone());
440        }
441    }
442
443    let monotonic_flag = MonotonicFlag::default();
444
445    for build_desc in dataflow.objects_to_build.iter_mut() {
446        monotonic_flag.transform(build_desc.plan.as_inner_mut(), ctx, &monotonic_ids)?;
447    }
448
449    mz_repr::explain::trace_plan(dataflow);
450
451    Ok(())
452}
453
454/// Restricts the indexes imported by `dataflow` to only the ones it needs.
455/// It also adds to the `DataflowMetainfo` how each index will be used.
456/// It also annotates global `Get`s with whether they will be reads from Persist or an index, plus
457/// their index usage types.
458///
459/// The input `dataflow` should import all indexes belonging to all views/sources/tables it
460/// references.
461///
462/// The input plans should be normalized with `NormalizeLets`! Otherwise, we might find dangling
463/// `ArrangeBy`s at the top of unused Let bindings.
464#[mz_ore::instrument(
465    target = "optimizer",
466    level = "debug",
467    fields(path.segment = "index_imports")
468)]
469fn prune_and_annotate_dataflow_index_imports(
470    dataflow: &mut DataflowDesc,
471    indexes: &dyn IndexOracle,
472    dataflow_metainfo: &mut DataflowMetainfo,
473) -> Result<(), TransformError> {
474    // Preparation.
475    // Let's save the unique keys of the sources. This will inform which indexes to choose for full
476    // scans. (We can't get this info from `source_imports`, because `source_imports` only has those
477    // sources that are not getting an indexed read.)
478    let mut source_keys = BTreeMap::new();
479    for build_desc in dataflow.objects_to_build.iter() {
480        build_desc
481            .plan
482            .as_inner()
483            .visit_pre(|expr: &MirRelationExpr| match expr {
484                MirRelationExpr::Get {
485                    id: Id::Global(global_id),
486                    typ,
487                    ..
488                } => {
489                    source_keys.entry(*global_id).or_insert_with(|| {
490                        typ.keys
491                            .iter()
492                            .map(|key| {
493                                key.iter()
494                                    // Convert the Vec<usize> key to Vec<MirScalarExpr>, so that
495                                    // later we can more easily compare index keys to these keys.
496                                    .map(|col_idx| MirScalarExpr::column(*col_idx))
497                                    .collect()
498                            })
499                            .collect()
500                    });
501                }
502                _ => {}
503            });
504    }
505
506    // This will be a mapping of
507    // (ids used by exports and objects to build) ->
508    // (arrangement keys and usage types on that id that have been requested)
509    let mut index_reqs_by_id = BTreeMap::new();
510
511    // Go through the MIR plans of `objects_to_build` and collect which arrangements are requested
512    // for which we also have an available index.
513    for build_desc in dataflow.objects_to_build.iter_mut() {
514        CollectIndexRequests::new(&source_keys, indexes, &mut index_reqs_by_id)
515            .collect_index_reqs(build_desc.plan.as_inner_mut())?;
516    }
517
518    // Collect index usages by `sink_exports`.
519    // A sink export sometimes wants to directly use an imported index. I know of one case where
520    // this happens: The dataflow for a SUBSCRIBE on an indexed view won't have any
521    // `objects_to_build`, but will want to directly read from the index and write to a sink.
522    for (_sink_id, sink_desc) in dataflow.sink_exports.iter() {
523        // First, let's see if there exists an index on the id that the sink wants. If not, there is
524        // nothing we can do here.
525        if let Some((idx_id, arbitrary_idx_key)) = indexes.indexes_on(sink_desc.from).next() {
526            // If yes, then we'll add a request of _some_ index: If we already collected an index
527            // request on this id, then use that, otherwise use the above arbitrarily picked index.
528            let requested_idxs = index_reqs_by_id
529                .entry(sink_desc.from)
530                .or_insert_with(Vec::new);
531            if let Some((already_req_idx_id, already_req_key, _)) = requested_idxs.get(0) {
532                requested_idxs.push((
533                    *already_req_idx_id,
534                    already_req_key.clone(),
535                    IndexUsageType::SinkExport,
536                ));
537            } else {
538                requested_idxs.push((
539                    idx_id,
540                    arbitrary_idx_key.to_owned(),
541                    IndexUsageType::SinkExport,
542                ));
543            }
544        }
545    }
546
547    // Collect index usages by `index_exports`.
548    for (_id, (index_desc, _)) in dataflow.index_exports.iter() {
549        // First, let's see if there exists an index on the id that the exported index is on. If
550        // not, there is nothing we can do here.
551        if let Some((idx_id, arbitrary_index_key)) = indexes.indexes_on(index_desc.on_id).next() {
552            // If yes, then we'll add an index request of some index: If we already collected an
553            // index request on this id, then use that, otherwise use the above arbitrarily picked
554            // index.
555            let requested_idxs = index_reqs_by_id
556                .entry(index_desc.on_id)
557                .or_insert_with(Vec::new);
558            if let Some((already_req_idx_id, already_req_key, _)) = requested_idxs.get(0) {
559                requested_idxs.push((
560                    *already_req_idx_id,
561                    already_req_key.clone(),
562                    IndexUsageType::IndexExport,
563                ));
564            } else {
565                // This is surprising: Actually, an index creation dataflow always has a plan in
566                // `objects_to_build` that will have a Get of the object that the index is on (see
567                // `DataflowDescription::export_index`). Therefore, we should have already requested
568                // an index usage when seeing that Get in `CollectIndexRequests`.
569                soft_panic_or_log!(
570                    "We are seeing an index export on an id that's not mentioned in `objects_to_build`"
571                );
572                requested_idxs.push((
573                    idx_id,
574                    arbitrary_index_key.to_owned(),
575                    IndexUsageType::IndexExport,
576                ));
577            }
578        }
579    }
580
581    // By now, `index_reqs_by_id` has all ids that we think might benefit from having an index on.
582    // Moreover, for each of these ids, if any index exists on it, then we should have already
583    // picked one. If not, then we have a bug somewhere. In that case, do a soft panic, and add an
584    // Unknown usage, picking an arbitrary index.
585    for (id, index_reqs) in index_reqs_by_id.iter_mut() {
586        if index_reqs.is_empty() {
587            // Try to pick an arbitrary index to be fully scanned.
588            if let Some((idx_id, key)) = indexes.indexes_on(*id).next() {
589                soft_panic_or_log!(
590                    "prune_and_annotate_dataflow_index_imports didn't find any index for an id, even though one exists
591id: {}, key: {:?}",
592                    id,
593                    key
594                );
595                index_reqs.push((idx_id, key.to_owned(), IndexUsageType::Unknown));
596            }
597        }
598    }
599
600    // Adjust FullScans to not introduce a new index dependency if there is also some non-FullScan
601    // request on the same id.
602    // `full_scan_changes` saves the changes that we do: Each (Get id, index id) entry indicates
603    // that if a Get has that id, then any full scan index accesses on it should be changed to use
604    // the indicated index id.
605    let mut full_scan_changes = BTreeMap::new();
606    for (get_id, index_reqs) in index_reqs_by_id.iter_mut() {
607        // Let's choose a non-FullScan access (if exists).
608        if let Some((picked_idx, picked_idx_key)) = choose_index(
609            &source_keys,
610            get_id,
611            &index_reqs
612                .iter()
613                .filter_map(|(idx_id, key, usage_type)| match usage_type {
614                    IndexUsageType::FullScan => None,
615                    _ => Some((*idx_id, key.clone())),
616                })
617                .collect_vec(),
618        ) {
619            // Found a non-FullScan access. Modify all FullScans to use the same index as that one.
620            for (idx_id, key, usage_type) in index_reqs {
621                match usage_type {
622                    IndexUsageType::FullScan => {
623                        full_scan_changes.insert(get_id, picked_idx);
624                        *idx_id = picked_idx;
625                        key.clone_from(&picked_idx_key);
626                    }
627                    _ => {}
628                }
629            }
630        }
631    }
632    // Apply the above full scan changes to also the Gets.
633    for build_desc in dataflow.objects_to_build.iter_mut() {
634        build_desc
635            .plan
636            .as_inner_mut()
637            .visit_pre_mut(|expr: &mut MirRelationExpr| {
638                match expr {
639                    MirRelationExpr::Get {
640                        id: Id::Global(global_id),
641                        typ: _,
642                        access_strategy: persist_or_index,
643                    } => {
644                        if let Some(new_idx_id) = full_scan_changes.get(global_id) {
645                            match persist_or_index {
646                                AccessStrategy::UnknownOrLocal => {
647                                    // Should have been already filled by `collect_index_reqs`.
648                                    unreachable!()
649                                }
650                                AccessStrategy::Persist => {
651                                    // We already know that it's an indexed access.
652                                    unreachable!()
653                                }
654                                AccessStrategy::SameDataflow => {
655                                    // We have not added such annotations yet.
656                                    unreachable!()
657                                }
658                                AccessStrategy::Index(accesses) => {
659                                    for (idx_id, usage_type) in accesses {
660                                        if matches!(usage_type, IndexUsageType::FullScan) {
661                                            *idx_id = *new_idx_id;
662                                        }
663                                    }
664                                }
665                            }
666                        }
667                    }
668                    _ => {}
669                }
670            });
671    }
672
673    // Annotate index imports by their usage types
674    dataflow_metainfo.index_usage_types = BTreeMap::new();
675    for (
676        index_id,
677        IndexImport {
678            desc: index_desc,
679            typ: _,
680            monotonic: _,
681        },
682    ) in dataflow.index_imports.iter_mut()
683    {
684        // A sanity check that we are not importing an index that we are also exporting.
685        assert!(
686            !dataflow
687                .index_exports
688                .iter()
689                .map(|(exported_index_id, _)| exported_index_id)
690                .any(|exported_index_id| exported_index_id == index_id)
691        );
692
693        let mut new_usage_types = Vec::new();
694        // Let's see whether something has requested an index on this object that this imported
695        // index is on.
696        if let Some(index_reqs) = index_reqs_by_id.get(&index_desc.on_id) {
697            for (req_idx_id, req_key, req_usage_type) in index_reqs {
698                if req_idx_id == index_id {
699                    soft_assert_eq_or_log!(*req_key, index_desc.key);
700                    new_usage_types.push(req_usage_type.clone());
701                }
702            }
703        }
704        if !new_usage_types.is_empty() {
705            dataflow_metainfo
706                .index_usage_types
707                .insert(*index_id, new_usage_types);
708        }
709    }
710
711    // Prune index imports to only those that are used
712    dataflow
713        .index_imports
714        .retain(|id, _index_import| dataflow_metainfo.index_usage_types.contains_key(id));
715
716    // Determine AccessStrategy::SameDataflow accesses. These were classified as
717    // AccessStrategy::Persist inside collect_index_reqs, so now we check these, and if the id is of
718    // a collection that we are building ourselves, then we adjust the access strategy.
719    let mut objects_to_build_ids = BTreeSet::new();
720    for BuildDesc { id, plan: _ } in dataflow.objects_to_build.iter() {
721        objects_to_build_ids.insert(id.clone());
722    }
723    for build_desc in dataflow.objects_to_build.iter_mut() {
724        build_desc
725            .plan
726            .as_inner_mut()
727            .visit_pre_mut(|expr: &mut MirRelationExpr| match expr {
728                MirRelationExpr::Get {
729                    id: Id::Global(global_id),
730                    typ: _,
731                    access_strategy,
732                } => match access_strategy {
733                    AccessStrategy::Persist => {
734                        if objects_to_build_ids.contains(global_id) {
735                            *access_strategy = AccessStrategy::SameDataflow;
736                        }
737                    }
738                    _ => {}
739                },
740                _ => {}
741            });
742    }
743
744    // A sanity check that all Get annotations indicate indexes that are present in `index_imports`.
745    for build_desc in dataflow.objects_to_build.iter() {
746        build_desc
747            .plan
748            .as_inner()
749            .visit_pre(|expr: &MirRelationExpr| match expr {
750                MirRelationExpr::Get {
751                    id: Id::Global(_),
752                    typ: _,
753                    access_strategy: AccessStrategy::Index(accesses),
754                } => {
755                    for (idx_id, _) in accesses {
756                        soft_assert_or_log!(
757                            dataflow.index_imports.contains_key(idx_id),
758                            "Dangling Get index annotation"
759                        );
760                    }
761                }
762                _ => {}
763            });
764    }
765
766    mz_repr::explain::trace_plan(dataflow);
767
768    Ok(())
769}
770
771/// Pick an index from a given Vec of index keys.
772///
773/// Currently, we pick as follows:
774///  - If there is an index on a unique key, then we pick that. (It might be better distributed, and
775///    is less likely to get dropped than other indexes.)
776///  - Otherwise, we pick an arbitrary index.
777///
778/// TODO: There are various edge cases where a better choice would be possible:
779/// - Some indexes might be less skewed than others. (Although, picking a unique key tries to
780///   capture this already.)
781/// - Some indexes might have an error, while others don't.
782///   <https://github.com/MaterializeInc/database-issues/issues/4455>
783/// - Some indexes might have more extra data in their keys (because of being on more complicated
784///   expressions than just column references), which won't be used in a full scan.
785fn choose_index(
786    source_keys: &BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
787    id: &GlobalId,
788    indexes: &Vec<(GlobalId, Vec<MirScalarExpr>)>,
789) -> Option<(GlobalId, Vec<MirScalarExpr>)> {
790    match source_keys.get(id) {
791        None => indexes.iter().next().cloned(), // pick an arbitrary index
792        Some(coll_keys) => match indexes
793            .iter()
794            .find(|(_idx_id, key)| coll_keys.contains(&*key))
795        {
796            Some((idx_id, key)) => Some((*idx_id, key.clone())),
797            None => indexes.iter().next().cloned(), // pick an arbitrary index
798        },
799    }
800}
801
802#[derive(Debug)]
803struct CollectIndexRequests<'a> {
804    /// We were told about these unique keys on sources.
805    source_keys: &'a BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
806    /// We were told about these indexes being available.
807    indexes_available: &'a dyn IndexOracle,
808    /// We'll be collecting index requests here.
809    index_reqs_by_id:
810        &'a mut BTreeMap<GlobalId, Vec<(GlobalId, Vec<MirScalarExpr>, IndexUsageType)>>,
811    /// As we recurse down a MirRelationExpr, we'll need to keep track of the context of the
812    /// current node (see docs on `IndexUsageContext` about what context we keep).
813    /// Moreover, we need to propagate this context from cte uses to cte definitions.
814    /// `context_across_lets` will keep track of the contexts that reached each use of a LocalId
815    /// added together.
816    context_across_lets: BTreeMap<LocalId, Vec<IndexUsageContext>>,
817    recursion_guard: RecursionGuard,
818}
819
820impl<'a> CheckedRecursion for CollectIndexRequests<'a> {
821    fn recursion_guard(&self) -> &RecursionGuard {
822        &self.recursion_guard
823    }
824}
825
826impl<'a> CollectIndexRequests<'a> {
827    fn new(
828        source_keys: &'a BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
829        indexes_available: &'a dyn IndexOracle,
830        index_reqs_by_id: &'a mut BTreeMap<
831            GlobalId,
832            Vec<(GlobalId, Vec<MirScalarExpr>, IndexUsageType)>,
833        >,
834    ) -> CollectIndexRequests<'a> {
835        CollectIndexRequests {
836            source_keys,
837            indexes_available,
838            index_reqs_by_id,
839            context_across_lets: BTreeMap::new(),
840            recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
841        }
842    }
843
844    pub fn collect_index_reqs(
845        &mut self,
846        expr: &mut MirRelationExpr,
847    ) -> Result<(), RecursionLimitError> {
848        assert!(self.context_across_lets.is_empty());
849        self.collect_index_reqs_inner(
850            expr,
851            &IndexUsageContext::from_usage_type(IndexUsageType::PlanRootNoArrangement),
852        )?;
853        assert!(self.context_across_lets.is_empty());
854        // Sanity check that we don't have any `DeltaJoinIndexUsageType::Unknown` remaining.
855        for (_id, index_reqs) in self.index_reqs_by_id.iter() {
856            for (_, _, index_usage_type) in index_reqs {
857                soft_assert_or_log!(
858                    !matches!(
859                        index_usage_type,
860                        IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Unknown)
861                    ),
862                    "Delta join Unknown index usage remained"
863                );
864            }
865        }
866        Ok(())
867    }
868
869    fn collect_index_reqs_inner(
870        &mut self,
871        expr: &mut MirRelationExpr,
872        contexts: &Vec<IndexUsageContext>,
873    ) -> Result<(), RecursionLimitError> {
874        self.checked_recur_mut(|this| {
875
876            // If an index exists on `on_id`, this function picks an index to be fully scanned.
877            let pick_index_for_full_scan = |on_id: &GlobalId| {
878                // Note that the choice we make here might be modified later at the
879                // "Adjust FullScans to not introduce a new index dependency".
880                choose_index(
881                    this.source_keys,
882                    on_id,
883                    &this.indexes_available.indexes_on(*on_id).map(
884                        |(idx_id, key)| (idx_id, key.iter().cloned().collect_vec())
885                    ).collect_vec()
886                )
887            };
888
889            // See comment on `IndexUsageContext`.
890            Ok(match expr {
891                MirRelationExpr::Join {
892                    inputs,
893                    implementation,
894                    ..
895                } => {
896                    match implementation {
897                        JoinImplementation::Differential(..) => {
898                            for input in inputs {
899                                this.collect_index_reqs_inner(
900                                    input,
901                                    &IndexUsageContext::from_usage_type(
902                                        IndexUsageType::DifferentialJoin,
903                                    ),
904                                )?;
905                            }
906                        }
907                        JoinImplementation::DeltaQuery(..) => {
908                            // For Delta joins, the first input is special, see
909                            // https://github.com/MaterializeInc/database-issues/issues/2115
910                            this.collect_index_reqs_inner(
911                                &mut inputs[0],
912                                &IndexUsageContext::from_usage_type(IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Unknown)),
913                            )?;
914                            for input in &mut inputs[1..] {
915                                this.collect_index_reqs_inner(
916                                    input,
917                                    &IndexUsageContext::from_usage_type(IndexUsageType::DeltaJoin(
918                                        DeltaJoinIndexUsageType::Lookup,
919                                    )),
920                                )?;
921                            }
922                        }
923                        JoinImplementation::IndexedFilter(_coll_id, idx_id, ..) => {
924                            for input in inputs {
925                                this.collect_index_reqs_inner(
926                                    input,
927                                    &IndexUsageContext::from_usage_type(IndexUsageType::Lookup(*idx_id)),
928                                )?;
929                            }
930                        }
931                        JoinImplementation::Unimplemented => {
932                            soft_panic_or_log!(
933                                "CollectIndexRequests encountered an Unimplemented join"
934                            );
935                        }
936                    }
937                }
938                MirRelationExpr::ArrangeBy { input, keys } => {
939                    this.collect_index_reqs_inner(input, &IndexUsageContext::add_keys(contexts, keys))?;
940                }
941                MirRelationExpr::Get {
942                    id: Id::Global(global_id),
943                    access_strategy: persist_or_index,
944                    ..
945                } => {
946                    this.index_reqs_by_id
947                        .entry(*global_id)
948                        .or_insert_with(Vec::new);
949                    // If the context is empty, it means we didn't see an operator that would
950                    // specifically want to use an index for this Get. However, let's still try to
951                    // find an index for a full scan.
952                    let mut try_full_scan = contexts.is_empty();
953                    let mut index_accesses = Vec::new();
954                    for context in contexts {
955                        match &context.requested_keys {
956                            None => {
957                                // We have some index usage context, but didn't see an `ArrangeBy`.
958                                try_full_scan = true;
959                                match context.usage_type {
960                                    IndexUsageType::FullScan | IndexUsageType::SinkExport | IndexUsageType::IndexExport => {
961                                        // Not possible, because these don't go through
962                                        // IndexUsageContext at all.
963                                        unreachable!()
964                                    },
965                                    // You can find more info on why the following join cases
966                                    // shouldn't happen in comments of the Join lowering to LIR.
967                                    IndexUsageType::Lookup(_) => soft_panic_or_log!("CollectIndexRequests encountered an IndexedFilter join without an ArrangeBy"),
968                                    IndexUsageType::DifferentialJoin => soft_panic_or_log!("CollectIndexRequests encountered a Differential join without an ArrangeBy"),
969                                    IndexUsageType::DeltaJoin(_) => soft_panic_or_log!("CollectIndexRequests encountered a Delta join without an ArrangeBy"),
970                                    IndexUsageType::PlanRootNoArrangement => {
971                                        // This is ok: the entire plan is a `Get`, with not even an
972                                        // `ArrangeBy`. Note that if an index exists, the usage will
973                                        // be saved as `FullScan` (NOT as `PlanRootNoArrangement`),
974                                        // because we are going into the `try_full_scan` if.
975                                    },
976                                    IndexUsageType::FastPathLimit => {
977                                        // These are created much later, not even inside
978                                        // `prune_and_annotate_dataflow_index_imports`.
979                                        unreachable!()
980                                    }
981                                    IndexUsageType::DanglingArrangeBy => {
982                                        // Not possible, because we create `DanglingArrangeBy`
983                                        // only when we see an `ArrangeBy`.
984                                        unreachable!()
985                                    },
986                                    IndexUsageType::Unknown => {
987                                        // These are added only after `CollectIndexRequests` has run.
988                                        unreachable!()
989                                    }
990                                }
991                            }
992                            Some(requested_keys) => {
993                                for requested_key in requested_keys {
994                                    match this
995                                        .indexes_available
996                                        .indexes_on(*global_id)
997                                        .find(|(available_idx_id, available_key)| {
998                                            match context.usage_type {
999                                                IndexUsageType::Lookup(req_idx_id) => {
1000                                                    // `LiteralConstraints` already picked an index
1001                                                    // by id. Let's use that one.
1002                                                    assert!(!(available_idx_id == &req_idx_id && available_key != &requested_key));
1003                                                    available_idx_id == &req_idx_id
1004                                                },
1005                                                _ => available_key == &requested_key,
1006                                            }
1007                                        })
1008                                    {
1009                                        Some((idx_id, key)) => {
1010                                            this.index_reqs_by_id
1011                                                .get_mut(global_id)
1012                                                .unwrap()
1013                                                .push((idx_id, key.to_owned(), context.usage_type.clone()));
1014                                            index_accesses.push((idx_id, context.usage_type.clone()));
1015                                        },
1016                                        None => {
1017                                            // If there is a key requested for which we don't have an
1018                                            // index, then we might still be able to do a full scan of a
1019                                            // differently keyed index.
1020                                            try_full_scan = true;
1021                                        }
1022                                    }
1023                                }
1024                                if requested_keys.is_empty() {
1025                                    // It's a bit weird if an MIR ArrangeBy is not requesting any
1026                                    // key, but let's try a full scan in that case anyhow.
1027                                    try_full_scan = true;
1028                                }
1029                            }
1030                        }
1031                    }
1032                    if try_full_scan {
1033                        // Keep in mind that when having 2 contexts coming from 2 uses of a Let,
1034                        // this code can't distinguish between the case when there is 1 ArrangeBy at the
1035                        // top of the Let, or when the 2 uses each have an `ArrangeBy`. In both cases,
1036                        // we'll add only 1 full scan, which would be wrong in the latter case. However,
1037                        // the latter case can't currently happen until we do
1038                        // https://github.com/MaterializeInc/database-issues/issues/6363
1039                        // Also note that currently we are deduplicating index usage types when
1040                        // printing index usages in EXPLAIN.
1041                        if let Some((idx_id, key)) = pick_index_for_full_scan(global_id) {
1042                            this.index_reqs_by_id
1043                                .get_mut(global_id)
1044                                .unwrap()
1045                                .push((idx_id, key.to_owned(), IndexUsageType::FullScan));
1046                            index_accesses.push((idx_id, IndexUsageType::FullScan));
1047                        }
1048                    }
1049                    if index_accesses.is_empty() {
1050                        *persist_or_index = AccessStrategy::Persist;
1051                    } else {
1052                        *persist_or_index = AccessStrategy::Index(index_accesses);
1053                    }
1054                }
1055                MirRelationExpr::Get {
1056                    id: Id::Local(local_id),
1057                    ..
1058                } => {
1059                    // Add the current context to the vector of contexts of `local_id`.
1060                    // (The unwrap is safe, because the Let and LetRec cases start with inserting an
1061                    // empty entry.)
1062                    this.context_across_lets
1063                        .get_mut(local_id)
1064                        .unwrap()
1065                        .extend(contexts.iter().cloned());
1066                    // No recursive call here, because Get has no inputs.
1067                }
1068                MirRelationExpr::Let { id, value, body } => {
1069                    let shadowed_context = this.context_across_lets.insert(id.clone(), Vec::new());
1070                    // No shadowing in MIR
1071                    assert_none!(shadowed_context);
1072                    // We go backwards: Recurse on the body and then the value.
1073                    this.collect_index_reqs_inner(body, contexts)?;
1074                    // The above call filled in the entry for `id` in `context_across_lets` (if it
1075                    // was referenced). Anyhow, at least an empty entry should exist, because we started
1076                    // above with inserting it.
1077                    this.collect_index_reqs_inner(
1078                        value,
1079                        &this.context_across_lets[id].clone(),
1080                    )?;
1081                    // Clean up the id from the saved contexts.
1082                    this.context_across_lets.remove(id);
1083                }
1084                MirRelationExpr::LetRec {
1085                    ids,
1086                    values,
1087                    limits: _,
1088                    body,
1089                } => {
1090                    for id in ids.iter() {
1091                        let shadowed_context = this.context_across_lets.insert(id.clone(), Vec::new());
1092                        assert_none!(shadowed_context); // No shadowing in MIR
1093                    }
1094                    // We go backwards: Recurse on the body first.
1095                    this.collect_index_reqs_inner(body, contexts)?;
1096                    // Reset the contexts of the ids (of the current LetRec), because an arrangement
1097                    // from a value can't be used in the body.
1098                    for id in ids.iter() {
1099                        *this.context_across_lets.get_mut(id).unwrap() = Vec::new();
1100                    }
1101                    // Recurse on the values in reverse order.
1102                    // Note that we do only one pass, i.e., we won't see context through a Get that
1103                    // refers to the previous iteration. But this is ok, because we can't reuse
1104                    // arrangements across iterations anyway.
1105                    for (id, value) in ids.iter().zip(values.iter_mut()).rev() {
1106                        this.collect_index_reqs_inner(
1107                            value,
1108                            &this.context_across_lets[id].clone(),
1109                        )?;
1110                    }
1111                    // Clean up the ids from the saved contexts.
1112                    for id in ids {
1113                        this.context_across_lets.remove(id);
1114                    }
1115                }
1116                _ => {
1117                    // Nothing interesting at this node, recurse with the empty context (regardless of
1118                    // what context we got from above).
1119                    let empty_context = Vec::new();
1120                    for child in expr.children_mut() {
1121                        this.collect_index_reqs_inner(child, &empty_context)?;
1122                    }
1123                }
1124            })
1125        })
1126    }
1127}
1128
1129/// This struct will save info about parent nodes as we are descending down a `MirRelationExpr`.
1130/// We always start with filling in `usage_type` when we see an operation that uses an arrangement,
1131/// and then we fill in `requested_keys` when we see an `ArrangeBy`. So, the pattern that we are
1132/// looking for is
1133/// ```text
1134/// <operation that uses an index>
1135///   ArrangeBy <requested_keys>
1136///     Get <global_id>
1137/// ```
1138/// When we reach a `Get` to a global id, we access this context struct to see if the rest of the
1139/// pattern is present above the `Get`.
1140///
1141/// Note that we usually put this struct in a Vec, because we track context across local let
1142/// bindings, which means that a node can have multiple parents.
1143#[derive(Debug, Clone)]
1144struct IndexUsageContext {
1145    usage_type: IndexUsageType,
1146    requested_keys: Option<BTreeSet<Vec<MirScalarExpr>>>,
1147}
1148
1149impl IndexUsageContext {
1150    pub fn from_usage_type(usage_type: IndexUsageType) -> Vec<Self> {
1151        vec![IndexUsageContext {
1152            usage_type,
1153            requested_keys: None,
1154        }]
1155    }
1156
1157    // Add the keys of an ArrangeBy into the contexts.
1158    // Soft_panics if haven't already seen something that indicates what the index will be used for.
1159    pub fn add_keys(
1160        old_contexts: &Vec<IndexUsageContext>,
1161        keys_to_add: &Vec<Vec<MirScalarExpr>>,
1162    ) -> Vec<IndexUsageContext> {
1163        let old_contexts = if old_contexts.is_empty() {
1164            // No join above us, and we are not at the root. Why does this ArrangeBy even exist?
1165            soft_panic_or_log!("CollectIndexRequests encountered a dangling ArrangeBy");
1166            // Anyhow, let's create a context with a `DanglingArrangeBy` index usage, so that we
1167            // have a place to note down the requested keys below.
1168            IndexUsageContext::from_usage_type(IndexUsageType::DanglingArrangeBy)
1169        } else {
1170            old_contexts.clone()
1171        };
1172        old_contexts
1173            .into_iter()
1174            .flat_map(|old_context| {
1175                if !matches!(
1176                    old_context.usage_type,
1177                    IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Unknown)
1178                ) {
1179                    // If it's not an unknown delta join usage, then we simply note down the new
1180                    // keys into `requested_keys`.
1181                    let mut context = old_context.clone();
1182                    if context.requested_keys.is_none() {
1183                        context.requested_keys = Some(BTreeSet::new());
1184                    }
1185                    context
1186                        .requested_keys
1187                        .as_mut()
1188                        .unwrap()
1189                        .extend(keys_to_add.iter().cloned());
1190                    Some(context).into_iter().chain(None)
1191                } else {
1192                    // If it's an unknown delta join usage, then we need to figure out whether this
1193                    // is a full scan or a lookup.
1194                    //
1195                    // `source_key` in `DeltaPathPlan` determines which arrangement we are going to
1196                    // scan when starting the rendering of a delta path. This is the one for which
1197                    // we want a `DeltaJoinIndexUsageType::FirstInputFullScan`.
1198                    //
1199                    // However, `DeltaPathPlan` is an LIR concept, and here we need to figure out
1200                    // the `source_key` based on the MIR plan. We do this by doing the same as
1201                    // `DeltaJoinPlan::create_from`: choose the smallest key (by `Ord`).
1202                    let source_key = keys_to_add
1203                        .iter()
1204                        .min()
1205                        .expect("ArrangeBy below a delta join has at least one key");
1206                    let full_scan_context = IndexUsageContext {
1207                        requested_keys: Some(BTreeSet::from([source_key.clone()])),
1208                        usage_type: IndexUsageType::DeltaJoin(
1209                            DeltaJoinIndexUsageType::FirstInputFullScan,
1210                        ),
1211                    };
1212                    let lookup_keys = keys_to_add
1213                        .into_iter()
1214                        .filter(|key| *key != source_key)
1215                        .cloned()
1216                        .collect_vec();
1217                    if lookup_keys.is_empty() {
1218                        Some(full_scan_context).into_iter().chain(None)
1219                    } else {
1220                        let lookup_context = IndexUsageContext {
1221                            requested_keys: Some(lookup_keys.into_iter().collect()),
1222                            usage_type: IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Lookup),
1223                        };
1224                        Some(full_scan_context)
1225                            .into_iter()
1226                            .chain(Some(lookup_context))
1227                    }
1228                }
1229            })
1230            .collect()
1231    }
1232}
1233
1234/// Extra information about the dataflow. This is not going to be shipped, but has to be processed
1235/// in other ways, e.g., showing notices to the user, or saving meta-information to the catalog.
1236#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
1237pub struct DataflowMetainfo<Notice = RawOptimizerNotice> {
1238    /// Notices that the optimizer wants to show to users.
1239    /// For pushing a new element, use [`Self::push_optimizer_notice_dedup`].
1240    pub optimizer_notices: Vec<Notice>,
1241    /// What kind of operation (full scan, lookup, ...) will access each index. Computed by
1242    /// `prune_and_annotate_dataflow_index_imports`.
1243    pub index_usage_types: BTreeMap<GlobalId, Vec<IndexUsageType>>,
1244}
1245
1246impl Default for DataflowMetainfo {
1247    fn default() -> Self {
1248        DataflowMetainfo {
1249            optimizer_notices: Vec::new(),
1250            index_usage_types: BTreeMap::new(),
1251        }
1252    }
1253}
1254
1255impl<Notice> DataflowMetainfo<Notice> {
1256    /// Create a [`UsedIndexes`] instance by resolving each `id` in the
1257    /// `index_ids` iterator against an entry expected to exist in the
1258    /// [`DataflowMetainfo::index_usage_types`].
1259    pub fn used_indexes<T>(&self, df_desc: &DataflowDescription<T>) -> UsedIndexes {
1260        UsedIndexes::new(
1261            df_desc
1262                .index_imports
1263                .iter()
1264                .map(|(id, _)| {
1265                    let entry = self.index_usage_types.get(id).cloned();
1266                    // If an entry does not exist, mark the usage type for this
1267                    // index as `Unknown`.
1268                    //
1269                    // This should never happen if this method is called after
1270                    // running `prune_and_annotate_dataflow_index_imports` on
1271                    // the dataflow (this happens at the end of the
1272                    // `optimize_dataflow` call).
1273                    let index_usage_type = entry.unwrap_or_else(|| vec![IndexUsageType::Unknown]);
1274
1275                    (*id, index_usage_type)
1276                })
1277                .collect(),
1278        )
1279    }
1280}
1281
1282impl DataflowMetainfo<RawOptimizerNotice> {
1283    /// Pushes a [`RawOptimizerNotice`] into [`Self::optimizer_notices`], but
1284    /// only if the exact same notice is not already present.
1285    pub fn push_optimizer_notice_dedup<T>(&mut self, notice: T)
1286    where
1287        T: Into<RawOptimizerNotice>,
1288    {
1289        let notice = notice.into();
1290        if !self.optimizer_notices.contains(&notice) {
1291            self.optimizer_notices.push(notice);
1292        }
1293    }
1294}