mz_transform/
dataflow.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Whole-dataflow optimization
11//!
12//! A dataflow may contain multiple views, each of which may only be
13//! optimized locally. However, information like demand and predicate
14//! pushdown can be applied across views once we understand the context
15//! in which the views will be executed.
16
17use std::collections::{BTreeMap, BTreeSet};
18
19use itertools::Itertools;
20use mz_compute_types::dataflows::{BuildDesc, DataflowDesc, DataflowDescription, IndexImport};
21use mz_expr::{
22    AccessStrategy, CollectionPlan, Id, JoinImplementation, LocalId, MapFilterProject,
23    MirRelationExpr, MirScalarExpr, RECURSION_LIMIT,
24};
25use mz_ore::stack::{CheckedRecursion, RecursionGuard, RecursionLimitError};
26use mz_ore::{assert_none, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
27use mz_repr::GlobalId;
28use mz_repr::explain::{DeltaJoinIndexUsageType, IndexUsageType, UsedIndexes};
29use proptest_derive::Arbitrary;
30use serde::{Deserialize, Serialize};
31
32use crate::monotonic::MonotonicFlag;
33use crate::notice::RawOptimizerNotice;
34use crate::{IndexOracle, Optimizer, TransformCtx, TransformError};
35
36/// Optimizes the implementation of each dataflow.
37///
38/// Inlines views, performs a full optimization pass including physical
39/// planning using the supplied indexes, propagates filtering and projection
40/// information to dataflow sources and lifts monotonicity information.
41#[mz_ore::instrument(
42    target = "optimizer",
43    level = "debug",
44    fields(path.segment ="global")
45)]
46pub fn optimize_dataflow(
47    dataflow: &mut DataflowDesc,
48    transform_ctx: &mut TransformCtx,
49    fast_path_optimizer: bool,
50) -> Result<(), TransformError> {
51    // Inline views that are used in only one other view.
52    inline_views(dataflow)?;
53
54    if fast_path_optimizer {
55        optimize_dataflow_relations(
56            dataflow,
57            &Optimizer::fast_path_optimizer(transform_ctx),
58            transform_ctx,
59        )?;
60    } else {
61        // Logical optimization pass after view inlining
62        optimize_dataflow_relations(
63            dataflow,
64            #[allow(deprecated)]
65            &Optimizer::logical_optimizer(transform_ctx),
66            transform_ctx,
67        )?;
68
69        optimize_dataflow_filters(dataflow)?;
70        // TODO: when the linear operator contract ensures that propagated
71        // predicates are always applied, projections and filters can be removed
72        // from where they come from. Once projections and filters can be removed,
73        // TODO: it would be useful for demand to be optimized after filters
74        // that way demand only includes the columns that are still necessary after
75        // the filters are applied.
76        optimize_dataflow_demand(dataflow)?;
77
78        // A smaller logical optimization pass after projections and filters are
79        // pushed down across views.
80        optimize_dataflow_relations(
81            dataflow,
82            &Optimizer::logical_cleanup_pass(transform_ctx, false),
83            transform_ctx,
84        )?;
85
86        // Physical optimization pass
87        optimize_dataflow_relations(
88            dataflow,
89            &Optimizer::physical_optimizer(transform_ctx),
90            transform_ctx,
91        )?;
92
93        optimize_dataflow_monotonic(dataflow, transform_ctx)?;
94    }
95
96    prune_and_annotate_dataflow_index_imports(
97        dataflow,
98        transform_ctx.indexes,
99        transform_ctx.df_meta,
100    )?;
101
102    mz_repr::explain::trace_plan(dataflow);
103
104    Ok(())
105}
106
107/// Inline views used in one other view, and in no exported objects.
108#[mz_ore::instrument(
109    target = "optimizer",
110    level = "debug",
111    fields(path.segment = "inline_views")
112)]
113fn inline_views(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
114    // We cannot inline anything whose `BuildDesc::id` appears in either the
115    // `index_exports` or `sink_exports` of `dataflow`, because we lose our
116    // ability to name it.
117
118    // A view can / should be in-lined in another view if it is only used by
119    // one subsequent view. If there are two distinct views that have not
120    // themselves been merged, then too bad and it doesn't get inlined.
121
122    // Starting from the *last* object to build, walk backwards and inline
123    // any view that is neither referenced by a `index_exports` nor
124    // `sink_exports` nor more than two remaining objects to build.
125
126    for index in (0..dataflow.objects_to_build.len()).rev() {
127        // Capture the name used by others to reference this view.
128        let global_id = dataflow.objects_to_build[index].id;
129        // Determine if any exports directly reference this view.
130        let mut occurs_in_export = false;
131        for (_gid, sink_desc) in dataflow.sink_exports.iter() {
132            if sink_desc.from == global_id {
133                occurs_in_export = true;
134            }
135        }
136        for (_, (index_desc, _)) in dataflow.index_exports.iter() {
137            if index_desc.on_id == global_id {
138                occurs_in_export = true;
139            }
140        }
141        // Count the number of subsequent views that reference this view.
142        let mut occurrences_in_later_views = Vec::new();
143        for other in (index + 1)..dataflow.objects_to_build.len() {
144            if dataflow.objects_to_build[other]
145                .plan
146                .depends_on()
147                .contains(&global_id)
148            {
149                occurrences_in_later_views.push(other);
150            }
151        }
152        // Inline if the view is referenced in one view and no exports.
153        if !occurs_in_export && occurrences_in_later_views.len() == 1 {
154            let other = occurrences_in_later_views[0];
155            // We can remove this view and insert it in the later view,
156            // but are not able to relocate the later view `other`.
157
158            // When splicing in the `index` view, we need to create disjoint
159            // identifiers for the Let's `body` and `value`, as well as a new
160            // identifier for the binding itself. Following `NormalizeLets`, we
161            // go with the binding first, then the value, then the body.
162            let mut id_gen = crate::IdGen::default();
163            let new_local = LocalId::new(id_gen.allocate_id());
164            // Use the same `id_gen` to assign new identifiers to `index`.
165            crate::normalize_lets::renumber_bindings(
166                dataflow.objects_to_build[index].plan.as_inner_mut(),
167                &mut id_gen,
168            )?;
169            // Assign new identifiers to the other relation.
170            crate::normalize_lets::renumber_bindings(
171                dataflow.objects_to_build[other].plan.as_inner_mut(),
172                &mut id_gen,
173            )?;
174            // Install the `new_local` name wherever `global_id` was used.
175            dataflow.objects_to_build[other]
176                .plan
177                .as_inner_mut()
178                .visit_pre_mut(|expr| {
179                    if let MirRelationExpr::Get { id, .. } = expr {
180                        if id == &Id::Global(global_id) {
181                            *id = Id::Local(new_local);
182                        }
183                    }
184                });
185
186            // With identifiers rewritten, we can replace `other` with
187            // a `MirRelationExpr::Let` binding, whose value is `index` and
188            // whose body is `other`.
189            let body = dataflow.objects_to_build[other]
190                .plan
191                .as_inner_mut()
192                .take_dangerous();
193            let value = dataflow.objects_to_build[index]
194                .plan
195                .as_inner_mut()
196                .take_dangerous();
197            *dataflow.objects_to_build[other].plan.as_inner_mut() = MirRelationExpr::Let {
198                id: new_local,
199                value: Box::new(value),
200                body: Box::new(body),
201            };
202            dataflow.objects_to_build.remove(index);
203        }
204    }
205
206    mz_repr::explain::trace_plan(dataflow);
207
208    Ok(())
209}
210
211/// Performs either the logical or the physical optimization pass on the
212/// dataflow using the supplied set of indexes.
213#[mz_ore::instrument(
214    target = "optimizer",
215    level = "debug",
216    fields(path.segment = optimizer.name)
217)]
218fn optimize_dataflow_relations(
219    dataflow: &mut DataflowDesc,
220    optimizer: &Optimizer,
221    ctx: &mut TransformCtx,
222) -> Result<(), TransformError> {
223    // Re-optimize each dataflow
224    for object in dataflow.objects_to_build.iter_mut() {
225        // Re-run all optimizations on the composite views.
226        ctx.set_global_id(object.id);
227        optimizer.transform(object.plan.as_inner_mut(), ctx)?;
228        ctx.reset_global_id();
229    }
230
231    mz_repr::explain::trace_plan(dataflow);
232
233    Ok(())
234}
235
236/// Pushes demand information from published outputs to dataflow inputs,
237/// projecting away unnecessary columns.
238///
239/// Dataflows that exist for the sake of generating plan explanations do not
240/// have published outputs. In this case, we push demand information from views
241/// not depended on by other views to dataflow inputs.
242#[mz_ore::instrument(
243    target = "optimizer",
244    level = "debug",
245    fields(path.segment ="demand")
246)]
247fn optimize_dataflow_demand(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
248    // Maps id -> union of known columns demanded from the source/view with the
249    // corresponding id.
250    let mut demand = BTreeMap::new();
251
252    if dataflow.index_exports.is_empty() && dataflow.sink_exports.is_empty() {
253        // In the absence of any exports, just demand all columns from views
254        // that are not depended on by another view, which is currently the last
255        // object in `objects_to_build`.
256
257        // A DataflowDesc without exports is currently created in the context of
258        // EXPLAIN outputs. This ensures that the output has all the columns of
259        // the original explainee.
260        if let Some(build_desc) = dataflow.objects_to_build.iter_mut().rev().next() {
261            demand
262                .entry(Id::Global(build_desc.id))
263                .or_insert_with(BTreeSet::new)
264                .extend(0..build_desc.plan.as_inner_mut().arity());
265        }
266    } else {
267        // Demand all columns of inputs to sinks.
268        for (_id, sink) in dataflow.sink_exports.iter() {
269            let input_id = sink.from;
270            demand
271                .entry(Id::Global(input_id))
272                .or_insert_with(BTreeSet::new)
273                .extend(0..dataflow.arity_of(&input_id));
274        }
275
276        // Demand all columns of inputs to exported indexes.
277        for (_id, (desc, _typ)) in dataflow.index_exports.iter() {
278            let input_id = desc.on_id;
279            demand
280                .entry(Id::Global(input_id))
281                .or_insert_with(BTreeSet::new)
282                .extend(0..dataflow.arity_of(&input_id));
283        }
284    }
285
286    optimize_dataflow_demand_inner(
287        dataflow
288            .objects_to_build
289            .iter_mut()
290            .rev()
291            .map(|build_desc| (Id::Global(build_desc.id), build_desc.plan.as_inner_mut())),
292        &mut demand,
293    )?;
294
295    mz_repr::explain::trace_plan(dataflow);
296
297    Ok(())
298}
299
300/// Pushes demand through views in `view_sequence` in order, removing
301/// columns not demanded.
302///
303/// This method is made public for the sake of testing.
304/// TODO: make this private once we allow multiple exports per dataflow.
305pub fn optimize_dataflow_demand_inner<'a, I>(
306    view_sequence: I,
307    demand: &mut BTreeMap<Id, BTreeSet<usize>>,
308) -> Result<(), TransformError>
309where
310    I: Iterator<Item = (Id, &'a mut MirRelationExpr)>,
311{
312    // Maps id -> The projection that was pushed down on the view with the
313    // corresponding id.
314    let mut applied_projection = BTreeMap::new();
315    // Collect the mutable references to views after pushing projection down
316    // in order to run cleanup actions on them in a second loop.
317    let mut view_refs = Vec::new();
318    let projection_pushdown = crate::movement::ProjectionPushdown::default();
319    for (id, view) in view_sequence {
320        if let Some(columns) = demand.get(&id) {
321            let projection_pushed_down = columns.iter().map(|c| *c).collect();
322            // Push down the projection consisting of the entries of `columns`
323            // in increasing order.
324            projection_pushdown.action(view, &projection_pushed_down, demand)?;
325            let new_type = view.typ();
326            applied_projection.insert(id, (projection_pushed_down, new_type));
327        }
328        view_refs.push(view);
329    }
330
331    for view in view_refs {
332        // Update `Get` nodes to reflect any columns that have been projected away.
333        projection_pushdown.update_projection_around_get(view, &applied_projection);
334    }
335
336    Ok(())
337}
338
339/// Pushes predicate to dataflow inputs.
340#[mz_ore::instrument(
341    target = "optimizer",
342    level = "debug",
343    fields(path.segment ="filters")
344)]
345fn optimize_dataflow_filters(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
346    // Contains id -> predicates map, describing those predicates that
347    // can (but need not) be applied to the collection named by `id`.
348    let mut predicates = BTreeMap::<Id, BTreeSet<mz_expr::MirScalarExpr>>::new();
349
350    // Propagate predicate information from outputs to inputs.
351    optimize_dataflow_filters_inner(
352        dataflow
353            .objects_to_build
354            .iter_mut()
355            .rev()
356            .map(|build_desc| (Id::Global(build_desc.id), build_desc.plan.as_inner_mut())),
357        &mut predicates,
358    )?;
359
360    // Push predicate information into the SourceDesc.
361    for (source_id, (source, _monotonic, _upper)) in dataflow.source_imports.iter_mut() {
362        if let Some(list) = predicates.remove(&Id::Global(*source_id)) {
363            if !list.is_empty() {
364                // Canonicalize the order of predicates, for stable plans.
365                let mut list = list.into_iter().collect::<Vec<_>>();
366                list.sort();
367                // Install no-op predicate information if none exists.
368                if source.arguments.operators.is_none() {
369                    source.arguments.operators = Some(MapFilterProject::new(source.typ.arity()));
370                }
371                // Add any predicates that can be pushed to the source.
372                if let Some(operator) = source.arguments.operators.take() {
373                    source.arguments.operators = Some(operator.filter(list));
374                    source.arguments.operators.as_mut().map(|x| x.optimize());
375                }
376            }
377        }
378    }
379
380    mz_repr::explain::trace_plan(dataflow);
381
382    Ok(())
383}
384
385/// Pushes filters down through views in `view_sequence` in order.
386///
387/// This method is made public for the sake of testing.
388/// TODO: make this private once we allow multiple exports per dataflow.
389pub fn optimize_dataflow_filters_inner<'a, I>(
390    view_iter: I,
391    predicates: &mut BTreeMap<Id, BTreeSet<mz_expr::MirScalarExpr>>,
392) -> Result<(), TransformError>
393where
394    I: Iterator<Item = (Id, &'a mut MirRelationExpr)>,
395{
396    let transform = crate::predicate_pushdown::PredicatePushdown::default();
397    for (id, view) in view_iter {
398        if let Some(list) = predicates.get(&id).clone() {
399            if !list.is_empty() {
400                *view = view.take_dangerous().filter(list.iter().cloned());
401            }
402        }
403        transform.action(view, predicates)?;
404    }
405    Ok(())
406}
407
408/// Propagates information about monotonic inputs through operators,
409/// using [`mz_repr::optimize::OptimizerFeatures`] from `ctx` for [`crate::analysis::Analysis`].
410#[mz_ore::instrument(
411    target = "optimizer",
412    level = "debug",
413    fields(path.segment ="monotonic")
414)]
415pub fn optimize_dataflow_monotonic(
416    dataflow: &mut DataflowDesc,
417    ctx: &mut TransformCtx,
418) -> Result<(), TransformError> {
419    let mut monotonic_ids = BTreeSet::new();
420    for (source_id, (_source, is_monotonic, _upper)) in dataflow.source_imports.iter() {
421        if *is_monotonic {
422            monotonic_ids.insert(source_id.clone());
423        }
424    }
425    for (
426        _index_id,
427        IndexImport {
428            desc: index_desc,
429            monotonic,
430            ..
431        },
432    ) in dataflow.index_imports.iter()
433    {
434        if *monotonic {
435            monotonic_ids.insert(index_desc.on_id.clone());
436        }
437    }
438
439    let monotonic_flag = MonotonicFlag::default();
440
441    for build_desc in dataflow.objects_to_build.iter_mut() {
442        monotonic_flag.transform(build_desc.plan.as_inner_mut(), ctx, &monotonic_ids)?;
443    }
444
445    mz_repr::explain::trace_plan(dataflow);
446
447    Ok(())
448}
449
450/// Restricts the indexes imported by `dataflow` to only the ones it needs.
451/// It also adds to the `DataflowMetainfo` how each index will be used.
452/// It also annotates global `Get`s with whether they will be reads from Persist or an index, plus
453/// their index usage types.
454///
455/// The input `dataflow` should import all indexes belonging to all views/sources/tables it
456/// references.
457///
458/// The input plans should be normalized with `NormalizeLets`! Otherwise, we might find dangling
459/// `ArrangeBy`s at the top of unused Let bindings.
460#[mz_ore::instrument(
461    target = "optimizer",
462    level = "debug",
463    fields(path.segment = "index_imports")
464)]
465fn prune_and_annotate_dataflow_index_imports(
466    dataflow: &mut DataflowDesc,
467    indexes: &dyn IndexOracle,
468    dataflow_metainfo: &mut DataflowMetainfo,
469) -> Result<(), TransformError> {
470    // Preparation.
471    // Let's save the unique keys of the sources. This will inform which indexes to choose for full
472    // scans. (We can't get this info from `source_imports`, because `source_imports` only has those
473    // sources that are not getting an indexed read.)
474    let mut source_keys = BTreeMap::new();
475    for build_desc in dataflow.objects_to_build.iter() {
476        build_desc
477            .plan
478            .as_inner()
479            .visit_pre(|expr: &MirRelationExpr| match expr {
480                MirRelationExpr::Get {
481                    id: Id::Global(global_id),
482                    typ,
483                    ..
484                } => {
485                    source_keys.entry(*global_id).or_insert_with(|| {
486                        typ.keys
487                            .iter()
488                            .map(|key| {
489                                key.iter()
490                                    // Convert the Vec<usize> key to Vec<MirScalarExpr>, so that
491                                    // later we can more easily compare index keys to these keys.
492                                    .map(|col_idx| MirScalarExpr::Column(*col_idx))
493                                    .collect()
494                            })
495                            .collect()
496                    });
497                }
498                _ => {}
499            });
500    }
501
502    // This will be a mapping of
503    // (ids used by exports and objects to build) ->
504    // (arrangement keys and usage types on that id that have been requested)
505    let mut index_reqs_by_id = BTreeMap::new();
506
507    // Go through the MIR plans of `objects_to_build` and collect which arrangements are requested
508    // for which we also have an available index.
509    for build_desc in dataflow.objects_to_build.iter_mut() {
510        CollectIndexRequests::new(&source_keys, indexes, &mut index_reqs_by_id)
511            .collect_index_reqs(build_desc.plan.as_inner_mut())?;
512    }
513
514    // Collect index usages by `sink_exports`.
515    // A sink export sometimes wants to directly use an imported index. I know of one case where
516    // this happens: The dataflow for a SUBSCRIBE on an indexed view won't have any
517    // `objects_to_build`, but will want to directly read from the index and write to a sink.
518    for (_sink_id, sink_desc) in dataflow.sink_exports.iter() {
519        // First, let's see if there exists an index on the id that the sink wants. If not, there is
520        // nothing we can do here.
521        if let Some((idx_id, arbitrary_idx_key)) = indexes.indexes_on(sink_desc.from).next() {
522            // If yes, then we'll add a request of _some_ index: If we already collected an index
523            // request on this id, then use that, otherwise use the above arbitrarily picked index.
524            let requested_idxs = index_reqs_by_id
525                .entry(sink_desc.from)
526                .or_insert_with(Vec::new);
527            if let Some((already_req_idx_id, already_req_key, _)) = requested_idxs.get(0) {
528                requested_idxs.push((
529                    *already_req_idx_id,
530                    already_req_key.clone(),
531                    IndexUsageType::SinkExport,
532                ));
533            } else {
534                requested_idxs.push((
535                    idx_id,
536                    arbitrary_idx_key.to_owned(),
537                    IndexUsageType::SinkExport,
538                ));
539            }
540        }
541    }
542
543    // Collect index usages by `index_exports`.
544    for (_id, (index_desc, _)) in dataflow.index_exports.iter() {
545        // First, let's see if there exists an index on the id that the exported index is on. If
546        // not, there is nothing we can do here.
547        if let Some((idx_id, arbitrary_index_key)) = indexes.indexes_on(index_desc.on_id).next() {
548            // If yes, then we'll add an index request of some index: If we already collected an
549            // index request on this id, then use that, otherwise use the above arbitrarily picked
550            // index.
551            let requested_idxs = index_reqs_by_id
552                .entry(index_desc.on_id)
553                .or_insert_with(Vec::new);
554            if let Some((already_req_idx_id, already_req_key, _)) = requested_idxs.get(0) {
555                requested_idxs.push((
556                    *already_req_idx_id,
557                    already_req_key.clone(),
558                    IndexUsageType::IndexExport,
559                ));
560            } else {
561                // This is surprising: Actually, an index creation dataflow always has a plan in
562                // `objects_to_build` that will have a Get of the object that the index is on (see
563                // `DataflowDescription::export_index`). Therefore, we should have already requested
564                // an index usage when seeing that Get in `CollectIndexRequests`.
565                soft_panic_or_log!(
566                    "We are seeing an index export on an id that's not mentioned in `objects_to_build`"
567                );
568                requested_idxs.push((
569                    idx_id,
570                    arbitrary_index_key.to_owned(),
571                    IndexUsageType::IndexExport,
572                ));
573            }
574        }
575    }
576
577    // By now, `index_reqs_by_id` has all ids that we think might benefit from having an index on.
578    // Moreover, for each of these ids, if any index exists on it, then we should have already
579    // picked one. If not, then we have a bug somewhere. In that case, do a soft panic, and add an
580    // Unknown usage, picking an arbitrary index.
581    for (id, index_reqs) in index_reqs_by_id.iter_mut() {
582        if index_reqs.is_empty() {
583            // Try to pick an arbitrary index to be fully scanned.
584            if let Some((idx_id, key)) = indexes.indexes_on(*id).next() {
585                soft_panic_or_log!(
586                    "prune_and_annotate_dataflow_index_imports didn't find any index for an id, even though one exists
587id: {}, key: {:?}",
588                    id,
589                    key
590                );
591                index_reqs.push((idx_id, key.to_owned(), IndexUsageType::Unknown));
592            }
593        }
594    }
595
596    // Adjust FullScans to not introduce a new index dependency if there is also some non-FullScan
597    // request on the same id.
598    // `full_scan_changes` saves the changes that we do: Each (Get id, index id) entry indicates
599    // that if a Get has that id, then any full scan index accesses on it should be changed to use
600    // the indicated index id.
601    let mut full_scan_changes = BTreeMap::new();
602    for (get_id, index_reqs) in index_reqs_by_id.iter_mut() {
603        // Let's choose a non-FullScan access (if exists).
604        if let Some((picked_idx, picked_idx_key)) = choose_index(
605            &source_keys,
606            get_id,
607            &index_reqs
608                .iter()
609                .filter_map(|(idx_id, key, usage_type)| match usage_type {
610                    IndexUsageType::FullScan => None,
611                    _ => Some((*idx_id, key.clone())),
612                })
613                .collect_vec(),
614        ) {
615            // Found a non-FullScan access. Modify all FullScans to use the same index as that one.
616            for (idx_id, key, usage_type) in index_reqs {
617                match usage_type {
618                    IndexUsageType::FullScan => {
619                        full_scan_changes.insert(get_id, picked_idx);
620                        *idx_id = picked_idx;
621                        key.clone_from(&picked_idx_key);
622                    }
623                    _ => {}
624                }
625            }
626        }
627    }
628    // Apply the above full scan changes to also the Gets.
629    for build_desc in dataflow.objects_to_build.iter_mut() {
630        build_desc
631            .plan
632            .as_inner_mut()
633            .visit_pre_mut(|expr: &mut MirRelationExpr| {
634                match expr {
635                    MirRelationExpr::Get {
636                        id: Id::Global(global_id),
637                        typ: _,
638                        access_strategy: persist_or_index,
639                    } => {
640                        if let Some(new_idx_id) = full_scan_changes.get(global_id) {
641                            match persist_or_index {
642                                AccessStrategy::UnknownOrLocal => {
643                                    // Should have been already filled by `collect_index_reqs`.
644                                    unreachable!()
645                                }
646                                AccessStrategy::Persist => {
647                                    // We already know that it's an indexed access.
648                                    unreachable!()
649                                }
650                                AccessStrategy::SameDataflow => {
651                                    // We have not added such annotations yet.
652                                    unreachable!()
653                                }
654                                AccessStrategy::Index(accesses) => {
655                                    for (idx_id, usage_type) in accesses {
656                                        if matches!(usage_type, IndexUsageType::FullScan) {
657                                            *idx_id = *new_idx_id;
658                                        }
659                                    }
660                                }
661                            }
662                        }
663                    }
664                    _ => {}
665                }
666            });
667    }
668
669    // Annotate index imports by their usage types
670    dataflow_metainfo.index_usage_types = BTreeMap::new();
671    for (
672        index_id,
673        IndexImport {
674            desc: index_desc,
675            typ: _,
676            monotonic: _,
677        },
678    ) in dataflow.index_imports.iter_mut()
679    {
680        // A sanity check that we are not importing an index that we are also exporting.
681        assert!(
682            !dataflow
683                .index_exports
684                .iter()
685                .map(|(exported_index_id, _)| exported_index_id)
686                .any(|exported_index_id| exported_index_id == index_id)
687        );
688
689        let mut new_usage_types = Vec::new();
690        // Let's see whether something has requested an index on this object that this imported
691        // index is on.
692        if let Some(index_reqs) = index_reqs_by_id.get(&index_desc.on_id) {
693            for (req_idx_id, req_key, req_usage_type) in index_reqs {
694                if req_idx_id == index_id {
695                    soft_assert_eq_or_log!(*req_key, index_desc.key);
696                    new_usage_types.push(req_usage_type.clone());
697                }
698            }
699        }
700        if !new_usage_types.is_empty() {
701            dataflow_metainfo
702                .index_usage_types
703                .insert(*index_id, new_usage_types);
704        }
705    }
706
707    // Prune index imports to only those that are used
708    dataflow
709        .index_imports
710        .retain(|id, _index_import| dataflow_metainfo.index_usage_types.contains_key(id));
711
712    // Determine AccessStrategy::SameDataflow accesses. These were classified as
713    // AccessStrategy::Persist inside collect_index_reqs, so now we check these, and if the id is of
714    // a collection that we are building ourselves, then we adjust the access strategy.
715    let mut objects_to_build_ids = BTreeSet::new();
716    for BuildDesc { id, plan: _ } in dataflow.objects_to_build.iter() {
717        objects_to_build_ids.insert(id.clone());
718    }
719    for build_desc in dataflow.objects_to_build.iter_mut() {
720        build_desc
721            .plan
722            .as_inner_mut()
723            .visit_pre_mut(|expr: &mut MirRelationExpr| match expr {
724                MirRelationExpr::Get {
725                    id: Id::Global(global_id),
726                    typ: _,
727                    access_strategy,
728                } => match access_strategy {
729                    AccessStrategy::Persist => {
730                        if objects_to_build_ids.contains(global_id) {
731                            *access_strategy = AccessStrategy::SameDataflow;
732                        }
733                    }
734                    _ => {}
735                },
736                _ => {}
737            });
738    }
739
740    // A sanity check that all Get annotations indicate indexes that are present in `index_imports`.
741    for build_desc in dataflow.objects_to_build.iter() {
742        build_desc
743            .plan
744            .as_inner()
745            .visit_pre(|expr: &MirRelationExpr| match expr {
746                MirRelationExpr::Get {
747                    id: Id::Global(_),
748                    typ: _,
749                    access_strategy: AccessStrategy::Index(accesses),
750                } => {
751                    for (idx_id, _) in accesses {
752                        soft_assert_or_log!(
753                            dataflow.index_imports.contains_key(idx_id),
754                            "Dangling Get index annotation"
755                        );
756                    }
757                }
758                _ => {}
759            });
760    }
761
762    mz_repr::explain::trace_plan(dataflow);
763
764    Ok(())
765}
766
767/// Pick an index from a given Vec of index keys.
768///
769/// Currently, we pick as follows:
770///  - If there is an index on a unique key, then we pick that. (It might be better distributed, and
771///    is less likely to get dropped than other indexes.)
772///  - Otherwise, we pick an arbitrary index.
773///
774/// TODO: There are various edge cases where a better choice would be possible:
775/// - Some indexes might be less skewed than others. (Although, picking a unique key tries to
776///   capture this already.)
777/// - Some indexes might have an error, while others don't.
778///   <https://github.com/MaterializeInc/database-issues/issues/4455>
779/// - Some indexes might have more extra data in their keys (because of being on more complicated
780///   expressions than just column references), which won't be used in a full scan.
781fn choose_index(
782    source_keys: &BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
783    id: &GlobalId,
784    indexes: &Vec<(GlobalId, Vec<MirScalarExpr>)>,
785) -> Option<(GlobalId, Vec<MirScalarExpr>)> {
786    match source_keys.get(id) {
787        None => indexes.iter().next().cloned(), // pick an arbitrary index
788        Some(coll_keys) => match indexes
789            .iter()
790            .find(|(_idx_id, key)| coll_keys.contains(&*key))
791        {
792            Some((idx_id, key)) => Some((*idx_id, key.clone())),
793            None => indexes.iter().next().cloned(), // pick an arbitrary index
794        },
795    }
796}
797
798#[derive(Debug)]
799struct CollectIndexRequests<'a> {
800    /// We were told about these unique keys on sources.
801    source_keys: &'a BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
802    /// We were told about these indexes being available.
803    indexes_available: &'a dyn IndexOracle,
804    /// We'll be collecting index requests here.
805    index_reqs_by_id:
806        &'a mut BTreeMap<GlobalId, Vec<(GlobalId, Vec<MirScalarExpr>, IndexUsageType)>>,
807    /// As we recurse down a MirRelationExpr, we'll need to keep track of the context of the
808    /// current node (see docs on `IndexUsageContext` about what context we keep).
809    /// Moreover, we need to propagate this context from cte uses to cte definitions.
810    /// `context_across_lets` will keep track of the contexts that reached each use of a LocalId
811    /// added together.
812    context_across_lets: BTreeMap<LocalId, Vec<IndexUsageContext>>,
813    recursion_guard: RecursionGuard,
814}
815
816impl<'a> CheckedRecursion for CollectIndexRequests<'a> {
817    fn recursion_guard(&self) -> &RecursionGuard {
818        &self.recursion_guard
819    }
820}
821
822impl<'a> CollectIndexRequests<'a> {
823    fn new(
824        source_keys: &'a BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
825        indexes_available: &'a dyn IndexOracle,
826        index_reqs_by_id: &'a mut BTreeMap<
827            GlobalId,
828            Vec<(GlobalId, Vec<MirScalarExpr>, IndexUsageType)>,
829        >,
830    ) -> CollectIndexRequests<'a> {
831        CollectIndexRequests {
832            source_keys,
833            indexes_available,
834            index_reqs_by_id,
835            context_across_lets: BTreeMap::new(),
836            recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
837        }
838    }
839
840    pub fn collect_index_reqs(
841        &mut self,
842        expr: &mut MirRelationExpr,
843    ) -> Result<(), RecursionLimitError> {
844        assert!(self.context_across_lets.is_empty());
845        self.collect_index_reqs_inner(
846            expr,
847            &IndexUsageContext::from_usage_type(IndexUsageType::PlanRootNoArrangement),
848        )?;
849        assert!(self.context_across_lets.is_empty());
850        // Sanity check that we don't have any `DeltaJoinIndexUsageType::Unknown` remaining.
851        for (_id, index_reqs) in self.index_reqs_by_id.iter() {
852            for (_, _, index_usage_type) in index_reqs {
853                soft_assert_or_log!(
854                    !matches!(
855                        index_usage_type,
856                        IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Unknown)
857                    ),
858                    "Delta join Unknown index usage remained"
859                );
860            }
861        }
862        Ok(())
863    }
864
865    fn collect_index_reqs_inner(
866        &mut self,
867        expr: &mut MirRelationExpr,
868        contexts: &Vec<IndexUsageContext>,
869    ) -> Result<(), RecursionLimitError> {
870        self.checked_recur_mut(|this| {
871
872            // If an index exists on `on_id`, this function picks an index to be fully scanned.
873            let pick_index_for_full_scan = |on_id: &GlobalId| {
874                // Note that the choice we make here might be modified later at the
875                // "Adjust FullScans to not introduce a new index dependency".
876                choose_index(
877                    this.source_keys,
878                    on_id,
879                    &this.indexes_available.indexes_on(*on_id).map(
880                        |(idx_id, key)| (idx_id, key.iter().cloned().collect_vec())
881                    ).collect_vec()
882                )
883            };
884
885            // See comment on `IndexUsageContext`.
886            Ok(match expr {
887                MirRelationExpr::Join {
888                    inputs,
889                    implementation,
890                    ..
891                } => {
892                    match implementation {
893                        JoinImplementation::Differential(..) => {
894                            for input in inputs {
895                                this.collect_index_reqs_inner(
896                                    input,
897                                    &IndexUsageContext::from_usage_type(
898                                        IndexUsageType::DifferentialJoin,
899                                    ),
900                                )?;
901                            }
902                        }
903                        JoinImplementation::DeltaQuery(..) => {
904                            // For Delta joins, the first input is special, see
905                            // https://github.com/MaterializeInc/database-issues/issues/2115
906                            this.collect_index_reqs_inner(
907                                &mut inputs[0],
908                                &IndexUsageContext::from_usage_type(IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Unknown)),
909                            )?;
910                            for input in &mut inputs[1..] {
911                                this.collect_index_reqs_inner(
912                                    input,
913                                    &IndexUsageContext::from_usage_type(IndexUsageType::DeltaJoin(
914                                        DeltaJoinIndexUsageType::Lookup,
915                                    )),
916                                )?;
917                            }
918                        }
919                        JoinImplementation::IndexedFilter(_coll_id, idx_id, ..) => {
920                            for input in inputs {
921                                this.collect_index_reqs_inner(
922                                    input,
923                                    &IndexUsageContext::from_usage_type(IndexUsageType::Lookup(*idx_id)),
924                                )?;
925                            }
926                        }
927                        JoinImplementation::Unimplemented => {
928                            soft_panic_or_log!(
929                                "CollectIndexRequests encountered an Unimplemented join"
930                            );
931                        }
932                    }
933                }
934                MirRelationExpr::ArrangeBy { input, keys } => {
935                    this.collect_index_reqs_inner(input, &IndexUsageContext::add_keys(contexts, keys))?;
936                }
937                MirRelationExpr::Get {
938                    id: Id::Global(global_id),
939                    access_strategy: persist_or_index,
940                    ..
941                } => {
942                    this.index_reqs_by_id
943                        .entry(*global_id)
944                        .or_insert_with(Vec::new);
945                    // If the context is empty, it means we didn't see an operator that would
946                    // specifically want to use an index for this Get. However, let's still try to
947                    // find an index for a full scan.
948                    let mut try_full_scan = contexts.is_empty();
949                    let mut index_accesses = Vec::new();
950                    for context in contexts {
951                        match &context.requested_keys {
952                            None => {
953                                // We have some index usage context, but didn't see an `ArrangeBy`.
954                                try_full_scan = true;
955                                match context.usage_type {
956                                    IndexUsageType::FullScan | IndexUsageType::SinkExport | IndexUsageType::IndexExport => {
957                                        // Not possible, because these don't go through
958                                        // IndexUsageContext at all.
959                                        unreachable!()
960                                    },
961                                    // You can find more info on why the following join cases
962                                    // shouldn't happen in comments of the Join lowering to LIR.
963                                    IndexUsageType::Lookup(_) => soft_panic_or_log!("CollectIndexRequests encountered an IndexedFilter join without an ArrangeBy"),
964                                    IndexUsageType::DifferentialJoin => soft_panic_or_log!("CollectIndexRequests encountered a Differential join without an ArrangeBy"),
965                                    IndexUsageType::DeltaJoin(_) => soft_panic_or_log!("CollectIndexRequests encountered a Delta join without an ArrangeBy"),
966                                    IndexUsageType::PlanRootNoArrangement => {
967                                        // This is ok: the entire plan is a `Get`, with not even an
968                                        // `ArrangeBy`. Note that if an index exists, the usage will
969                                        // be saved as `FullScan` (NOT as `PlanRootNoArrangement`),
970                                        // because we are going into the `try_full_scan` if.
971                                    },
972                                    IndexUsageType::FastPathLimit => {
973                                        // These are created much later, not even inside
974                                        // `prune_and_annotate_dataflow_index_imports`.
975                                        unreachable!()
976                                    }
977                                    IndexUsageType::DanglingArrangeBy => {
978                                        // Not possible, because we create `DanglingArrangeBy`
979                                        // only when we see an `ArrangeBy`.
980                                        unreachable!()
981                                    },
982                                    IndexUsageType::Unknown => {
983                                        // These are added only after `CollectIndexRequests` has run.
984                                        unreachable!()
985                                    }
986                                }
987                            }
988                            Some(requested_keys) => {
989                                for requested_key in requested_keys {
990                                    match this
991                                        .indexes_available
992                                        .indexes_on(*global_id)
993                                        .find(|(available_idx_id, available_key)| {
994                                            match context.usage_type {
995                                                IndexUsageType::Lookup(req_idx_id) => {
996                                                    // `LiteralConstraints` already picked an index
997                                                    // by id. Let's use that one.
998                                                    assert!(!(available_idx_id == &req_idx_id && available_key != &requested_key));
999                                                    available_idx_id == &req_idx_id
1000                                                },
1001                                                _ => available_key == &requested_key,
1002                                            }
1003                                        })
1004                                    {
1005                                        Some((idx_id, key)) => {
1006                                            this.index_reqs_by_id
1007                                                .get_mut(global_id)
1008                                                .unwrap()
1009                                                .push((idx_id, key.to_owned(), context.usage_type.clone()));
1010                                            index_accesses.push((idx_id, context.usage_type.clone()));
1011                                        },
1012                                        None => {
1013                                            // If there is a key requested for which we don't have an
1014                                            // index, then we might still be able to do a full scan of a
1015                                            // differently keyed index.
1016                                            try_full_scan = true;
1017                                        }
1018                                    }
1019                                }
1020                                if requested_keys.is_empty() {
1021                                    // It's a bit weird if an MIR ArrangeBy is not requesting any
1022                                    // key, but let's try a full scan in that case anyhow.
1023                                    try_full_scan = true;
1024                                }
1025                            }
1026                        }
1027                    }
1028                    if try_full_scan {
1029                        // Keep in mind that when having 2 contexts coming from 2 uses of a Let,
1030                        // this code can't distinguish between the case when there is 1 ArrangeBy at the
1031                        // top of the Let, or when the 2 uses each have an `ArrangeBy`. In both cases,
1032                        // we'll add only 1 full scan, which would be wrong in the latter case. However,
1033                        // the latter case can't currently happen until we do
1034                        // https://github.com/MaterializeInc/database-issues/issues/6363
1035                        // Also note that currently we are deduplicating index usage types when
1036                        // printing index usages in EXPLAIN.
1037                        if let Some((idx_id, key)) = pick_index_for_full_scan(global_id) {
1038                            this.index_reqs_by_id
1039                                .get_mut(global_id)
1040                                .unwrap()
1041                                .push((idx_id, key.to_owned(), IndexUsageType::FullScan));
1042                            index_accesses.push((idx_id, IndexUsageType::FullScan));
1043                        }
1044                    }
1045                    if index_accesses.is_empty() {
1046                        *persist_or_index = AccessStrategy::Persist;
1047                    } else {
1048                        *persist_or_index = AccessStrategy::Index(index_accesses);
1049                    }
1050                }
1051                MirRelationExpr::Get {
1052                    id: Id::Local(local_id),
1053                    ..
1054                } => {
1055                    // Add the current context to the vector of contexts of `local_id`.
1056                    // (The unwrap is safe, because the Let and LetRec cases start with inserting an
1057                    // empty entry.)
1058                    this.context_across_lets
1059                        .get_mut(local_id)
1060                        .unwrap()
1061                        .extend(contexts.iter().cloned());
1062                    // No recursive call here, because Get has no inputs.
1063                }
1064                MirRelationExpr::Let { id, value, body } => {
1065                    let shadowed_context = this.context_across_lets.insert(id.clone(), Vec::new());
1066                    // No shadowing in MIR
1067                    assert_none!(shadowed_context);
1068                    // We go backwards: Recurse on the body and then the value.
1069                    this.collect_index_reqs_inner(body, contexts)?;
1070                    // The above call filled in the entry for `id` in `context_across_lets` (if it
1071                    // was referenced). Anyhow, at least an empty entry should exist, because we started
1072                    // above with inserting it.
1073                    this.collect_index_reqs_inner(
1074                        value,
1075                        &this.context_across_lets[id].clone(),
1076                    )?;
1077                    // Clean up the id from the saved contexts.
1078                    this.context_across_lets.remove(id);
1079                }
1080                MirRelationExpr::LetRec {
1081                    ids,
1082                    values,
1083                    limits: _,
1084                    body,
1085                } => {
1086                    for id in ids.iter() {
1087                        let shadowed_context = this.context_across_lets.insert(id.clone(), Vec::new());
1088                        assert_none!(shadowed_context); // No shadowing in MIR
1089                    }
1090                    // We go backwards: Recurse on the body first.
1091                    this.collect_index_reqs_inner(body, contexts)?;
1092                    // Reset the contexts of the ids (of the current LetRec), because an arrangement
1093                    // from a value can't be used in the body.
1094                    for id in ids.iter() {
1095                        *this.context_across_lets.get_mut(id).unwrap() = Vec::new();
1096                    }
1097                    // Recurse on the values in reverse order.
1098                    // Note that we do only one pass, i.e., we won't see context through a Get that
1099                    // refers to the previous iteration. But this is ok, because we can't reuse
1100                    // arrangements across iterations anyway.
1101                    for (id, value) in ids.iter().zip(values.iter_mut()).rev() {
1102                        this.collect_index_reqs_inner(
1103                            value,
1104                            &this.context_across_lets[id].clone(),
1105                        )?;
1106                    }
1107                    // Clean up the ids from the saved contexts.
1108                    for id in ids {
1109                        this.context_across_lets.remove(id);
1110                    }
1111                }
1112                _ => {
1113                    // Nothing interesting at this node, recurse with the empty context (regardless of
1114                    // what context we got from above).
1115                    let empty_context = Vec::new();
1116                    for child in expr.children_mut() {
1117                        this.collect_index_reqs_inner(child, &empty_context)?;
1118                    }
1119                }
1120            })
1121        })
1122    }
1123}
1124
1125/// This struct will save info about parent nodes as we are descending down a `MirRelationExpr`.
1126/// We always start with filling in `usage_type` when we see an operation that uses an arrangement,
1127/// and then we fill in `requested_keys` when we see an `ArrangeBy`. So, the pattern that we are
1128/// looking for is
1129/// ```text
1130/// <operation that uses an index>
1131///   ArrangeBy <requested_keys>
1132///     Get <global_id>
1133/// ```
1134/// When we reach a `Get` to a global id, we access this context struct to see if the rest of the
1135/// pattern is present above the `Get`.
1136///
1137/// Note that we usually put this struct in a Vec, because we track context across local let
1138/// bindings, which means that a node can have multiple parents.
1139#[derive(Debug, Clone)]
1140struct IndexUsageContext {
1141    usage_type: IndexUsageType,
1142    requested_keys: Option<BTreeSet<Vec<MirScalarExpr>>>,
1143}
1144
1145impl IndexUsageContext {
1146    pub fn from_usage_type(usage_type: IndexUsageType) -> Vec<Self> {
1147        vec![IndexUsageContext {
1148            usage_type,
1149            requested_keys: None,
1150        }]
1151    }
1152
1153    // Add the keys of an ArrangeBy into the contexts.
1154    // Soft_panics if haven't already seen something that indicates what the index will be used for.
1155    pub fn add_keys(
1156        old_contexts: &Vec<IndexUsageContext>,
1157        keys_to_add: &Vec<Vec<MirScalarExpr>>,
1158    ) -> Vec<IndexUsageContext> {
1159        let old_contexts = if old_contexts.is_empty() {
1160            // No join above us, and we are not at the root. Why does this ArrangeBy even exist?
1161            soft_panic_or_log!("CollectIndexRequests encountered a dangling ArrangeBy");
1162            // Anyhow, let's create a context with a `DanglingArrangeBy` index usage, so that we
1163            // have a place to note down the requested keys below.
1164            IndexUsageContext::from_usage_type(IndexUsageType::DanglingArrangeBy)
1165        } else {
1166            old_contexts.clone()
1167        };
1168        old_contexts
1169            .into_iter()
1170            .flat_map(|old_context| {
1171                if !matches!(
1172                    old_context.usage_type,
1173                    IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Unknown)
1174                ) {
1175                    // If it's not an unknown delta join usage, then we simply note down the new
1176                    // keys into `requested_keys`.
1177                    let mut context = old_context.clone();
1178                    if context.requested_keys.is_none() {
1179                        context.requested_keys = Some(BTreeSet::new());
1180                    }
1181                    context
1182                        .requested_keys
1183                        .as_mut()
1184                        .unwrap()
1185                        .extend(keys_to_add.iter().cloned());
1186                    Some(context).into_iter().chain(None)
1187                } else {
1188                    // If it's an unknown delta join usage, then we need to figure out whether this
1189                    // is a full scan or a lookup.
1190                    //
1191                    // `source_key` in `DeltaPathPlan` determines which arrangement we are going to
1192                    // scan when starting the rendering of a delta path. This is the one for which
1193                    // we want a `DeltaJoinIndexUsageType::FirstInputFullScan`.
1194                    //
1195                    // However, `DeltaPathPlan` is an LIR concept, and here we need to figure out
1196                    // the `source_key` based on the MIR plan. We do this by doing the same as
1197                    // `DeltaJoinPlan::create_from`: choose the smallest key (by `Ord`).
1198                    let source_key = keys_to_add
1199                        .iter()
1200                        .min()
1201                        .expect("ArrangeBy below a delta join has at least one key");
1202                    let full_scan_context = IndexUsageContext {
1203                        requested_keys: Some(BTreeSet::from([source_key.clone()])),
1204                        usage_type: IndexUsageType::DeltaJoin(
1205                            DeltaJoinIndexUsageType::FirstInputFullScan,
1206                        ),
1207                    };
1208                    let lookup_keys = keys_to_add
1209                        .into_iter()
1210                        .filter(|key| *key != source_key)
1211                        .cloned()
1212                        .collect_vec();
1213                    if lookup_keys.is_empty() {
1214                        Some(full_scan_context).into_iter().chain(None)
1215                    } else {
1216                        let lookup_context = IndexUsageContext {
1217                            requested_keys: Some(lookup_keys.into_iter().collect()),
1218                            usage_type: IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Lookup),
1219                        };
1220                        Some(full_scan_context)
1221                            .into_iter()
1222                            .chain(Some(lookup_context))
1223                    }
1224                }
1225            })
1226            .collect()
1227    }
1228}
1229
1230/// Extra information about the dataflow. This is not going to be shipped, but has to be processed
1231/// in other ways, e.g., showing notices to the user, or saving meta-information to the catalog.
1232#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
1233pub struct DataflowMetainfo<Notice = RawOptimizerNotice> {
1234    /// Notices that the optimizer wants to show to users.
1235    /// For pushing a new element, use [`Self::push_optimizer_notice_dedup`].
1236    pub optimizer_notices: Vec<Notice>,
1237    /// What kind of operation (full scan, lookup, ...) will access each index. Computed by
1238    /// `prune_and_annotate_dataflow_index_imports`.
1239    pub index_usage_types: BTreeMap<GlobalId, Vec<IndexUsageType>>,
1240}
1241
1242impl Default for DataflowMetainfo {
1243    fn default() -> Self {
1244        DataflowMetainfo {
1245            optimizer_notices: Vec::new(),
1246            index_usage_types: BTreeMap::new(),
1247        }
1248    }
1249}
1250
1251impl<Notice> DataflowMetainfo<Notice> {
1252    /// Create a [`UsedIndexes`] instance by resolving each `id` in the
1253    /// `index_ids` iterator against an entry expected to exist in the
1254    /// [`DataflowMetainfo::index_usage_types`].
1255    pub fn used_indexes<T>(&self, df_desc: &DataflowDescription<T>) -> UsedIndexes {
1256        UsedIndexes::new(
1257            df_desc
1258                .index_imports
1259                .iter()
1260                .map(|(id, _)| {
1261                    let entry = self.index_usage_types.get(id).cloned();
1262                    // If an entry does not exist, mark the usage type for this
1263                    // index as `Unknown`.
1264                    //
1265                    // This should never happen if this method is called after
1266                    // running `prune_and_annotate_dataflow_index_imports` on
1267                    // the dataflow (this happens at the end of the
1268                    // `optimize_dataflow` call).
1269                    let index_usage_type = entry.unwrap_or_else(|| vec![IndexUsageType::Unknown]);
1270
1271                    (*id, index_usage_type)
1272                })
1273                .collect(),
1274        )
1275    }
1276}
1277
1278impl DataflowMetainfo<RawOptimizerNotice> {
1279    /// Pushes a [`RawOptimizerNotice`] into [`Self::optimizer_notices`], but
1280    /// only if the exact same notice is not already present.
1281    pub fn push_optimizer_notice_dedup<T>(&mut self, notice: T)
1282    where
1283        T: Into<RawOptimizerNotice>,
1284    {
1285        let notice = notice.into();
1286        if !self.optimizer_notices.contains(&notice) {
1287            self.optimizer_notices.push(notice);
1288        }
1289    }
1290}